View Javadoc
1   package org.sim0mq.federatestarter;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.nio.file.Files;
7   import java.nio.file.Path;
8   import java.nio.file.Paths;
9   import java.util.ArrayList;
10  import java.util.Arrays;
11  import java.util.Collections;
12  import java.util.HashMap;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Properties;
16  import java.util.UUID;
17  
18  import org.djutils.io.URLResource;
19  import org.djutils.serialization.SerializationException;
20  import org.sim0mq.Sim0MQException;
21  import org.sim0mq.message.MessageStatus;
22  import org.sim0mq.message.SimulationMessage;
23  import org.sim0mq.message.federatestarter.FederateStartedMessage;
24  import org.sim0mq.message.federationmanager.StartFederateMessage;
25  import org.zeromq.SocketType;
26  import org.zeromq.ZContext;
27  import org.zeromq.ZMQ;
28  
29  /**
30   * The FederateStarter start listening on the given port for messages to start components. Report back via the call-back port on
31   * the status of the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
32   * <p>
33   * Copyright (c) 2016-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
34   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
35   * </p>
36   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
37   * initial version Mar 1, 2017 <br>
38   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
39   */
40  public class FederateStarter
41  {
42      /** the port number to listen on. */
43      private final int fsPort;
44  
45      /** the first port to be used for the models, inclusive. */
46      private final int startPort;
47  
48      /** the last port to be used for the models, inclusive. */
49      private final int endPort;
50  
51      /** the running programs this FederateStarter started. The String identifies the process (e.g., a UUID or a model id). */
52      @SuppressWarnings("checkstyle:visibilitymodifier")
53      protected Map<String, Process> runningProcessMap = Collections.synchronizedMap(new HashMap<>());
54  
55      /** the ports where the models listen. The String identifies the process (e.g., a UUID or a model id). */
56      private Map<String, Integer> modelPortMap = Collections.synchronizedMap(new HashMap<>());
57  
58      /** the StartFederate messages. */
59      private Map<String, StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new HashMap<>());
60  
61      /** the software properties. */
62      @SuppressWarnings("checkstyle:visibilitymodifier")
63      final Properties softwareProperties;
64  
65      /** the 0mq socket. */
66      private ZMQ.Socket fsSocket;
67  
68      /** the 0mq context. */
69      private ZContext fsContext;
70  
71      /** message count. */
72      private long messageCount = 0;
73  
74      /**
75       * @param fsPort the port number to listen on
76       * @param softwareProperties the software properties to use
77       * @param startPort first port to be used for the models, inclusive
78       * @param endPort last port to be used for the models, inclusive
79       * @throws Sim0MQException on error
80       * @throws SerializationException on error
81       */
82      public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort)
83              throws Sim0MQException, SerializationException
84      {
85          super();
86          this.softwareProperties = softwareProperties;
87          this.fsPort = fsPort;
88          this.startPort = startPort;
89          this.endPort = endPort;
90  
91          this.fsContext = new ZContext(1);
92  
93          this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
94          this.fsSocket.bind("tcp://*:" + this.fsPort);
95  
96          while (!Thread.currentThread().isInterrupted())
97          {
98              // Wait for next request from the client -- first the identity (String) and the delimiter (#0)
99              String identity = this.fsSocket.recvStr();
100             this.fsSocket.recvStr();
101 
102             byte[] request = this.fsSocket.recv(0);
103             Object[] fields = SimulationMessage.decode(request);
104             String messageTypeId = fields[4].toString();
105             String receiverId = fields[3].toString();
106 
107             System.out.println("Received " + SimulationMessage.print(fields));
108 
109             if (receiverId.equals("FS"))
110             {
111                 switch (messageTypeId)
112                 {
113                     case "FM.1":
114                         processStartFederate(identity, fields);
115                         break;
116 
117                     case "FM.8":
118                         processKillFederate(identity, fields);
119                         break;
120 
121                     case "FM.9":
122                         // processKillAllFederates(senderId, uniqueId);
123                         break;
124 
125                     default:
126                         // wrong message
127                         System.err.println("Received unknown message -- not processed: " + messageTypeId);
128                 }
129             }
130             else
131             {
132                 // wrong receiver
133                 System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
134             }
135         }
136         this.fsSocket.close();
137         this.fsContext.destroy();
138     }
139 
140     /**
141      * Process FM.2 message and send MC.2 message back.
142      * @param identity reply id for REQ-ROUTER pattern
143      * @param fields the message
144      * @throws Sim0MQException on error
145      * @throws SerializationException on error
146      */
147     private void processStartFederate(final String identity, final Object[] fields)
148             throws Sim0MQException, SerializationException
149     {
150         StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "FS");
151         String error = "";
152 
153         int modelPort = findFreePortNumber();
154 
155         if (modelPort == -1)
156         {
157             error = "No free port number";
158         }
159 
160         else
161 
162         {
163             try
164             {
165                 ProcessBuilder pb = new ProcessBuilder();
166 
167                 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
168                 pb.directory(workingPath.toFile());
169 
170                 String softwareCode = "";
171                 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
172                 {
173                     System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
174                             + " in software properties file");
175                 }
176                 else
177                 {
178                     softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
179 
180                     List<String> pbArgs = new ArrayList<>();
181                     pbArgs.add(softwareCode);
182                     pbArgs.add(startFederateMessage.getArgsBefore());
183                     pbArgs.add(startFederateMessage.getModelPath());
184                     pbArgs.addAll(Arrays.asList(
185                             startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
186                     pb.command(pbArgs);
187 
188                     String stdIn = startFederateMessage.getRedirectStdin();
189                     String stdOut = startFederateMessage.getRedirectStdout();
190                     String stdErr = startFederateMessage.getRedirectStderr();
191 
192                     if (stdIn.length() > 0)
193                     {
194                         // TODO working dir path if not absolute?
195                         File stdInFile = new File(stdIn);
196                         pb.redirectInput(stdInFile);
197                     }
198 
199                     if (stdOut.length() > 0)
200                     {
201                         // TODO working dir path if not absolute?
202                         File stdOutFile = new File(stdOut);
203                         pb.redirectOutput(stdOutFile);
204                     }
205 
206                     if (stdErr.length() > 0)
207                     {
208                         // TODO working dir path if not absolute?
209                         File stdErrFile = new File(stdErr);
210                         pb.redirectError(stdErrFile);
211                     }
212 
213                     new Thread()
214                     {
215                         /** {@inheritDoc} */
216                         @Override
217                         public void run()
218                         {
219                             try
220                             {
221                                 Process process = pb.start();
222                                 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
223                                 System.err.println("Process started:" + process.isAlive());
224                             }
225                             catch (IOException exception)
226                             {
227                                 exception.printStackTrace();
228                             }
229                         }
230                     }.start();
231 
232                     this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
233                     this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
234 
235                     // Thread.sleep(1000);
236 
237                     // wait till the model is ready...
238                     error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(),
239                             modelPort);
240                 }
241             }
242             catch (IOException exception)
243             {
244                 exception.printStackTrace();
245                 error = exception.getMessage();
246             }
247         }
248 
249         System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
250 
251         // Send reply back to client
252         this.fsSocket.sendMore(identity);
253         this.fsSocket.sendMore("");
254         //@formatter:off
255         byte[] fs2Message = new FederateStartedMessage.Builder()
256                 .setSimulationRunId(startFederateMessage.getSimulationRunId())
257                 .setInstanceId(startFederateMessage.getInstanceId())
258                 .setSenderId("FS")
259                 .setReceiverId(startFederateMessage.getSenderId())
260                 .setMessageId(++this.messageCount)
261                 .setStatus(error.isEmpty() ? "started" : "error")
262                 .setError(error)
263                 .setModelPort(modelPort)
264                 .build()
265                 .createByteArray();
266         this.fsSocket.send(fs2Message, 0);
267         //@formatter:on
268     }
269 
270     /**
271      * Find a free port for the model.
272      * @return the first free fort number in the range startPort - endPort, inclusive
273      */
274     private int findFreePortNumber()
275     {
276         for (int port = this.startPort; port <= this.endPort; port++)
277         {
278             if (!this.modelPortMap.containsValue(port))
279             {
280                 // try if the port is really free
281                 ZMQ.Socket testSocket = null;
282                 try
283                 {
284                     testSocket = this.fsContext.createSocket(SocketType.REP);
285                     testSocket.bind("tcp://127.0.0.1:" + port);
286                     testSocket.unbind("tcp://127.0.0.1:" + port);
287                     testSocket.close();
288                     return port;
289                 }
290                 catch (Exception exception)
291                 {
292                     // port was not free
293                     if (testSocket != null)
294                     {
295                         try
296                         {
297                             testSocket.close();
298                         }
299                         catch (Exception e)
300                         {
301                             // ignore.
302                         }
303                     }
304                 }
305             }
306         }
307         return -1;
308     }
309 
310     /**
311      * Wait for simulation to end using status polling with message FM.5.
312      * @param federationRunId the name of the federation
313      * @param modelId the String id of the model
314      * @param modelPort port on which the model is listening
315      * @return empty String for no error, filled String for error
316      * @throws Sim0MQException on error
317      * @throws SerializationException on error
318      */
319     private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
320             throws Sim0MQException, SerializationException
321     {
322         boolean ok = true;
323         String error = "";
324         ZMQ.Socket modelSocket = null;
325         try
326         {
327             modelSocket = this.fsContext.createSocket(SocketType.REQ);
328             modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
329             modelSocket.connect("tcp://127.0.0.1:" + modelPort);
330         }
331         catch (Exception exception)
332         {
333             exception.printStackTrace();
334             ok = false;
335             error = exception.getMessage();
336         }
337 
338         boolean started = false;
339         while (ok && !started)
340         {
341             byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
342                     MessageStatus.NEW);
343             modelSocket.send(fs1Message, 0);
344 
345             byte[] reply = modelSocket.recv(0);
346             Object[] replyMessage = SimulationMessage.decode(reply);
347             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
348 
349             if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
350                     && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
351             {
352                 if (replyMessage[9].toString().equals("started"))
353                 {
354                     started = true;
355                 }
356                 else
357                 {
358                     // wait a second
359                     try
360                     {
361                         Thread.sleep(100);
362                     }
363                     catch (InterruptedException ie)
364                     {
365                         // ignore
366                     }
367                 }
368             }
369             else
370             {
371                 ok = false;
372                 error = replyMessage[10].toString();
373                 System.err.println("Simulation start error -- status = " + replyMessage[9]);
374                 System.err.println("Error message = " + replyMessage[10]);
375             }
376         }
377 
378         if (modelSocket != null)
379         {
380             modelSocket.close();
381         }
382 
383         return error;
384     }
385 
386     /**
387      * Process FM.8 message and send FS.4 message back.
388      * @param identity reply id for REQ-ROUTER pattern
389      * @param fields the message
390      * @throws Sim0MQException on error
391      * @throws SerializationException on error
392      */
393     private void processKillFederate(final String identity, final Object[] fields)
394             throws Sim0MQException, SerializationException
395     {
396         boolean status = true;
397         String error = "";
398 
399         Object federationRunId = fields[1];
400         String senderId = fields[2].toString();
401 
402         String modelId = fields[8].toString();
403         if (!this.modelPortMap.containsKey(modelId))
404         {
405             status = false;
406             error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
407         }
408         else
409         {
410             int modelPort = this.modelPortMap.remove(modelId);
411             Process process = this.runningProcessMap.remove(modelId);
412 
413             try
414             {
415                 try
416                 {
417                     ZMQ.Socket modelSocket = this.fsContext.createSocket(SocketType.REQ);
418                     modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
419                     modelSocket.connect("tcp://127.0.0.1:" + modelPort);
420 
421                     byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
422                             ++this.messageCount, MessageStatus.NEW);
423                     modelSocket.send(fs3Message, 0);
424 
425                     modelSocket.close();
426                 }
427                 catch (Exception exception)
428                 {
429                     exception.printStackTrace();
430                     status = true;
431                     error = exception.getMessage();
432                 }
433 
434                 try
435                 {
436                     Thread.sleep(100);
437                 }
438                 catch (InterruptedException ie)
439                 {
440                     // ignore
441                 }
442 
443                 if (process != null && process.isAlive())
444                 {
445                     process.destroyForcibly();
446                 }
447 
448                 StartFederateMessage sfm = this.startFederateMessages.get(modelId);
449                 if (sfm.isDeleteStdout())
450                 {
451                     if (sfm.getRedirectStdout().length() > 0)
452                     {
453                         File stdOutFile = new File(sfm.getRedirectStdout());
454                         stdOutFile.delete();
455                     }
456                 }
457 
458                 if (sfm.isDeleteStderr())
459                 {
460                     if (sfm.getRedirectStderr().length() > 0)
461                     {
462                         File stdErrFile = new File(sfm.getRedirectStderr());
463                         stdErrFile.delete();
464                     }
465                 }
466 
467                 if (sfm.isDeleteWorkingDirectory())
468                 {
469                     File workingDir = new File(sfm.getWorkingDirectory());
470                     workingDir.delete();
471                 }
472             }
473             catch (Exception exception)
474             {
475                 exception.printStackTrace();
476                 status = false;
477                 error = exception.getMessage();
478             }
479 
480             byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
481                     MessageStatus.NEW, modelId, status, error);
482             this.fsSocket.sendMore(identity);
483             this.fsSocket.sendMore("");
484             this.fsSocket.send(fs4Message, 0);
485         }
486     }
487 
488     /**
489      * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
490      * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
491      * @param args the federation name and port on which the FederateStarter is listening
492      * @throws Sim0MQException on error
493      * @throws SerializationException on error
494      */
495     public static void main(final String[] args) throws Sim0MQException, SerializationException
496     {
497         if (args.length < 4)
498         {
499             System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
500             System.exit(-1);
501         }
502 
503         String sPort = args[0];
504         int port = 0;
505         try
506         {
507             port = Integer.parseInt(sPort);
508         }
509         catch (NumberFormatException nfe)
510         {
511             System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
512             System.exit(-1);
513         }
514         if (port == 0 || port > 65535)
515         {
516             System.err.println("PortNumber should be between 1 and 65535");
517             System.exit(-1);
518         }
519 
520         String propertiesFile = args[1];
521         Properties softwareProperties = new Properties();
522         InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
523         try
524         {
525             softwareProperties.load(propertiesStream);
526         }
527         catch (IOException | NullPointerException e)
528         {
529             System.err.println("Could not find or read software properties file " + propertiesFile);
530             System.exit(-1);
531         }
532 
533         String sStartPort = args[2];
534         int startPort = 0;
535         try
536         {
537             startPort = Integer.parseInt(sStartPort);
538         }
539         catch (NumberFormatException nfe)
540         {
541             System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
542             System.exit(-1);
543         }
544         if (startPort == 0 || startPort > 65535)
545         {
546             System.err.println("startPort should be between 1 and 65535");
547             System.exit(-1);
548         }
549 
550         String sEndPort = args[3];
551         int endPort = 0;
552         try
553         {
554             endPort = Integer.parseInt(sEndPort);
555         }
556         catch (NumberFormatException nfe)
557         {
558             System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
559             System.exit(-1);
560         }
561         if (endPort == 0 || endPort > 65535)
562         {
563             System.err.println("endPort should be between 1 and 65535");
564             System.exit(-1);
565         }
566 
567         new FederateStarter(port, softwareProperties, startPort, endPort);
568     }
569 
570 }