View Javadoc
1   package org.sim0mq.federatestarter;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.nio.file.Files;
7   import java.nio.file.Path;
8   import java.nio.file.Paths;
9   import java.util.ArrayList;
10  import java.util.Arrays;
11  import java.util.Collections;
12  import java.util.HashMap;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Properties;
16  import java.util.UUID;
17  
18  import org.sim0mq.Sim0MQException;
19  import org.sim0mq.message.MessageStatus;
20  import org.sim0mq.message.SimulationMessage;
21  import org.sim0mq.message.federatestarter.FederateStartedMessage;
22  import org.sim0mq.message.federationmanager.StartFederateMessage;
23  import org.zeromq.ZContext;
24  import org.zeromq.ZMQ;
25  
26  import nl.tudelft.simulation.language.io.URLResource;
27  
28  /**
29   * The FederateStarter start listening on the given port for messages to start components. Report back via the call-back port on
30   * the status of the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
31   * <p>
32   * Copyright (c) 2016-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
33   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
34   * </p>
35   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
36   * initial version Mar 1, 2017 <br>
37   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
38   */
39  public class FederateStarter
40  {
41      /** the port number to listen on. */
42      private final int fsPort;
43  
44      /** the first port to be used for the models, inclusive. */
45      private final int startPort;
46  
47      /** the last port to be used for the models, inclusive. */
48      private final int endPort;
49  
50      /** the running programs this FederateStarter started. The String identifies the process (e.g., a UUID or a model id). */
51      @SuppressWarnings("checkstyle:visibilitymodifier")
52      protected Map<String, Process> runningProcessMap = Collections.synchronizedMap(new HashMap<>());
53  
54      /** the ports where the models listen. The String identifies the process (e.g., a UUID or a model id). */
55      private Map<String, Integer> modelPortMap = Collections.synchronizedMap(new HashMap<>());
56  
57      /** the StartFederate messages. */
58      private Map<String, StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new HashMap<>());
59  
60      /** the software properties. */
61      @SuppressWarnings("checkstyle:visibilitymodifier")
62      final Properties softwareProperties;
63  
64      /** the 0mq socket. */
65      private ZMQ.Socket fsSocket;
66  
67      /** the 0mq context. */
68      private ZContext fsContext;
69  
70      /** message count. */
71      private long messageCount = 0;
72  
73      /**
74       * @param fsPort the port number to listen on
75       * @param softwareProperties the software properties to use
76       * @param startPort first port to be used for the models, inclusive
77       * @param endPort last port to be used for the models, inclusive
78       * @throws Sim0MQException on error
79       */
80      public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort)
81              throws Sim0MQException
82      {
83          super();
84          this.softwareProperties = softwareProperties;
85          this.fsPort = fsPort;
86          this.startPort = startPort;
87          this.endPort = endPort;
88  
89          this.fsContext = new ZContext(1);
90  
91          this.fsSocket = this.fsContext.createSocket(ZMQ.ROUTER);
92          this.fsSocket.bind("tcp://*:" + this.fsPort);
93  
94          while (!Thread.currentThread().isInterrupted())
95          {
96              // Wait for next request from the client -- first the identity (String) and the delimiter (#0)
97              String identity = this.fsSocket.recvStr();
98              this.fsSocket.recvStr();
99  
100             byte[] request = this.fsSocket.recv(0);
101             Object[] fields = SimulationMessage.decode(request);
102             String messageTypeId = fields[4].toString();
103             String receiverId = fields[3].toString();
104 
105             System.out.println("Received " + SimulationMessage.print(fields));
106 
107             if (receiverId.equals("FS"))
108             {
109                 switch (messageTypeId)
110                 {
111                     case "FM.1":
112                         processStartFederate(identity, fields);
113                         break;
114 
115                     case "FM.8":
116                         processKillFederate(identity, fields);
117                         break;
118 
119                     case "FM.9":
120                         // processKillAllFederates(senderId, uniqueId);
121                         break;
122 
123                     default:
124                         // wrong message
125                         System.err.println("Received unknown message -- not processed: " + messageTypeId);
126                 }
127             }
128             else
129             {
130                 // wrong receiver
131                 System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
132             }
133         }
134         this.fsSocket.close();
135         this.fsContext.destroy();
136     }
137 
138     /**
139      * Process FM.2 message and send MC.2 message back.
140      * @param identity reply id for REQ-ROUTER pattern
141      * @param fields the message
142      * @throws Sim0MQException on error
143      */
144     private void processStartFederate(final String identity, final Object[] fields) throws Sim0MQException
145     {
146         StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "FS");
147         String error = "";
148 
149         int modelPort = findFreePortNumber();
150 
151         if (modelPort == -1)
152         {
153             error = "No free port number";
154         }
155 
156         else
157 
158         {
159             try
160             {
161                 ProcessBuilder pb = new ProcessBuilder();
162 
163                 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
164                 pb.directory(workingPath.toFile());
165 
166                 String softwareCode = "";
167                 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
168                 {
169                     System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
170                             + " in software properties file");
171                 }
172                 else
173                 {
174                     softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
175 
176                     List<String> pbArgs = new ArrayList<>();
177                     pbArgs.add(softwareCode);
178                     pbArgs.add(startFederateMessage.getArgsBefore());
179                     pbArgs.add(startFederateMessage.getModelPath());
180                     pbArgs.addAll(Arrays.asList(
181                             startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
182                     pb.command(pbArgs);
183 
184                     String stdIn = startFederateMessage.getRedirectStdin();
185                     String stdOut = startFederateMessage.getRedirectStdout();
186                     String stdErr = startFederateMessage.getRedirectStderr();
187 
188                     if (stdIn.length() > 0)
189                     {
190                         // TODO working dir path if not absolute?
191                         File stdInFile = new File(stdIn);
192                         pb.redirectInput(stdInFile);
193                     }
194 
195                     if (stdOut.length() > 0)
196                     {
197                         // TODO working dir path if not absolute?
198                         File stdOutFile = new File(stdOut);
199                         pb.redirectOutput(stdOutFile);
200                     }
201 
202                     if (stdErr.length() > 0)
203                     {
204                         // TODO working dir path if not absolute?
205                         File stdErrFile = new File(stdErr);
206                         pb.redirectError(stdErrFile);
207                     }
208 
209                     new Thread()
210                     {
211                         /** {@inheritDoc} */
212                         @Override
213                         public void run()
214                         {
215                             try
216                             {
217                                 Process process = pb.start();
218                                 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
219                                 System.err.println("Process started:" + process.isAlive());
220                             }
221                             catch (IOException exception)
222                             {
223                                 exception.printStackTrace();
224                             }
225                         }
226                     }.start();
227 
228                     this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
229                     this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
230 
231                     // Thread.sleep(1000);
232 
233                     // wait till the model is ready...
234                     error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(),
235                             modelPort);
236                 }
237             }
238             catch (IOException exception)
239             {
240                 exception.printStackTrace();
241                 error = exception.getMessage();
242             }
243         }
244 
245         System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
246 
247         // Send reply back to client
248         this.fsSocket.sendMore(identity);
249         this.fsSocket.sendMore("");
250         //@formatter:off
251         byte[] fs2Message = new FederateStartedMessage.Builder()
252                 .setSimulationRunId(startFederateMessage.getSimulationRunId())
253                 .setInstanceId(startFederateMessage.getInstanceId())
254                 .setSenderId("FS")
255                 .setReceiverId(startFederateMessage.getSenderId())
256                 .setMessageId(++this.messageCount)
257                 .setStatus(error.isEmpty() ? "started" : "error")
258                 .setError(error)
259                 .setModelPort(modelPort)
260                 .build()
261                 .createByteArray();
262         this.fsSocket.send(fs2Message, 0);
263         //@formatter:on
264     }
265 
266     /**
267      * Find a free port for the model.
268      * @return the first free fort number in the range startPort - endPort, inclusive
269      */
270     private int findFreePortNumber()
271     {
272         for (int port = this.startPort; port <= this.endPort; port++)
273         {
274             if (!this.modelPortMap.containsValue(port))
275             {
276                 // try if the port is really free
277                 ZMQ.Socket testSocket = null;
278                 try
279                 {
280                     testSocket = this.fsContext.createSocket(ZMQ.REP);
281                     testSocket.bind("tcp://127.0.0.1:" + port);
282                     testSocket.unbind("tcp://127.0.0.1:" + port);
283                     testSocket.close();
284                     return port;
285                 }
286                 catch (Exception exception)
287                 {
288                     // port was not free
289                     if (testSocket != null)
290                     {
291                         try
292                         {
293                             testSocket.close();
294                         }
295                         catch (Exception e)
296                         {
297                             // ignore.
298                         }
299                     }
300                 }
301             }
302         }
303         return -1;
304     }
305 
306     /**
307      * Wait for simulation to end using status polling with message FM.5.
308      * @param federationRunId the name of the federation
309      * @param modelId the String id of the model
310      * @param modelPort port on which the model is listening
311      * @return empty String for no error, filled String for error
312      * @throws Sim0MQException on error
313      */
314     private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
315             throws Sim0MQException
316     {
317         boolean ok = true;
318         String error = "";
319         ZMQ.Socket modelSocket = null;
320         try
321         {
322             modelSocket = this.fsContext.createSocket(ZMQ.REQ);
323             modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
324             modelSocket.connect("tcp://127.0.0.1:" + modelPort);
325         }
326         catch (Exception exception)
327         {
328             exception.printStackTrace();
329             ok = false;
330             error = exception.getMessage();
331         }
332 
333         boolean started = false;
334         while (ok && !started)
335         {
336             byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
337                     MessageStatus.NEW);
338             modelSocket.send(fs1Message, 0);
339 
340             byte[] reply = modelSocket.recv(0);
341             Object[] replyMessage = SimulationMessage.decode(reply);
342             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
343 
344             if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
345                     && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
346             {
347                 if (replyMessage[9].toString().equals("started"))
348                 {
349                     started = true;
350                 }
351                 else
352                 {
353                     // wait a second
354                     try
355                     {
356                         Thread.sleep(100);
357                     }
358                     catch (InterruptedException ie)
359                     {
360                         // ignore
361                     }
362                 }
363             }
364             else
365             {
366                 ok = false;
367                 error = replyMessage[10].toString();
368                 System.err.println("Simulation start error -- status = " + replyMessage[9]);
369                 System.err.println("Error message = " + replyMessage[10]);
370             }
371         }
372 
373         if (modelSocket != null)
374         {
375             modelSocket.close();
376         }
377 
378         return error;
379     }
380 
381     /**
382      * Process FM.8 message and send FS.4 message back.
383      * @param identity reply id for REQ-ROUTER pattern
384      * @param fields the message
385      * @throws Sim0MQException on error
386      */
387     private void processKillFederate(final String identity, final Object[] fields) throws Sim0MQException
388     {
389         boolean status = true;
390         String error = "";
391 
392         Object federationRunId = fields[1];
393         String senderId = fields[2].toString();
394 
395         String modelId = fields[8].toString();
396         if (!this.modelPortMap.containsKey(modelId))
397         {
398             status = false;
399             error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
400         }
401         else
402         {
403             int modelPort = this.modelPortMap.remove(modelId);
404             Process process = this.runningProcessMap.remove(modelId);
405 
406             try
407             {
408                 try
409                 {
410                     ZMQ.Socket modelSocket = this.fsContext.createSocket(ZMQ.REQ);
411                     modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
412                     modelSocket.connect("tcp://127.0.0.1:" + modelPort);
413 
414                     byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
415                             ++this.messageCount, MessageStatus.NEW);
416                     modelSocket.send(fs3Message, 0);
417 
418                     modelSocket.close();
419                 }
420                 catch (Exception exception)
421                 {
422                     exception.printStackTrace();
423                     status = true;
424                     error = exception.getMessage();
425                 }
426 
427                 try
428                 {
429                     Thread.sleep(100);
430                 }
431                 catch (InterruptedException ie)
432                 {
433                     // ignore
434                 }
435 
436                 if (process != null && process.isAlive())
437                 {
438                     process.destroyForcibly();
439                 }
440 
441                 StartFederateMessage sfm = this.startFederateMessages.get(modelId);
442                 if (sfm.isDeleteStdout())
443                 {
444                     if (sfm.getRedirectStdout().length() > 0)
445                     {
446                         File stdOutFile = new File(sfm.getRedirectStdout());
447                         stdOutFile.delete();
448                     }
449                 }
450 
451                 if (sfm.isDeleteStderr())
452                 {
453                     if (sfm.getRedirectStderr().length() > 0)
454                     {
455                         File stdErrFile = new File(sfm.getRedirectStderr());
456                         stdErrFile.delete();
457                     }
458                 }
459 
460                 if (sfm.isDeleteWorkingDirectory())
461                 {
462                     File workingDir = new File(sfm.getWorkingDirectory());
463                     workingDir.delete();
464                 }
465             }
466             catch (Exception exception)
467             {
468                 exception.printStackTrace();
469                 status = false;
470                 error = exception.getMessage();
471             }
472 
473             byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
474                     MessageStatus.NEW, modelId, status, error);
475             this.fsSocket.sendMore(identity);
476             this.fsSocket.sendMore("");
477             this.fsSocket.send(fs4Message, 0);
478         }
479     }
480 
481     /**
482      * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
483      * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
484      * @param args the federation name and port on which the FederateStarter is listening
485      * @throws Sim0MQException on error
486      */
487     public static void main(final String[] args) throws Sim0MQException
488     {
489         if (args.length < 4)
490         {
491             System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
492             System.exit(-1);
493         }
494 
495         String sPort = args[0];
496         int port = 0;
497         try
498         {
499             port = Integer.parseInt(sPort);
500         }
501         catch (NumberFormatException nfe)
502         {
503             System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
504             System.exit(-1);
505         }
506         if (port == 0 || port > 65535)
507         {
508             System.err.println("PortNumber should be between 1 and 65535");
509             System.exit(-1);
510         }
511 
512         String propertiesFile = args[1];
513         Properties softwareProperties = new Properties();
514         InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
515         try
516         {
517             softwareProperties.load(propertiesStream);
518         }
519         catch (IOException e)
520         {
521             System.err.println("Could not find software properties file " + propertiesFile);
522             System.exit(-1);
523         }
524 
525         String sStartPort = args[2];
526         int startPort = 0;
527         try
528         {
529             startPort = Integer.parseInt(sStartPort);
530         }
531         catch (NumberFormatException nfe)
532         {
533             System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
534             System.exit(-1);
535         }
536         if (startPort == 0 || startPort > 65535)
537         {
538             System.err.println("startPort should be between 1 and 65535");
539             System.exit(-1);
540         }
541 
542         String sEndPort = args[3];
543         int endPort = 0;
544         try
545         {
546             endPort = Integer.parseInt(sEndPort);
547         }
548         catch (NumberFormatException nfe)
549         {
550             System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
551             System.exit(-1);
552         }
553         if (endPort == 0 || endPort > 65535)
554         {
555             System.err.println("endPort should be between 1 and 65535");
556             System.exit(-1);
557         }
558 
559         new FederateStarter(port, softwareProperties, startPort, endPort);
560     }
561 
562 }