View Javadoc
1   package org.sim0mq.federatestarter;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.nio.file.Files;
7   import java.nio.file.Path;
8   import java.nio.file.Paths;
9   import java.util.ArrayList;
10  import java.util.Arrays;
11  import java.util.Collections;
12  import java.util.LinkedHashMap;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Properties;
16  import java.util.UUID;
17  
18  import org.djutils.io.URLResource;
19  import org.djutils.serialization.SerializationException;
20  import org.sim0mq.Sim0MQException;
21  import org.sim0mq.message.Sim0MQMessage;
22  import org.sim0mq.message.federatestarter.FS1RequestStatusMessage;
23  import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
24  import org.sim0mq.message.federatestarter.FS4FederateKilledMessage;
25  import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
26  import org.sim0mq.message.federationmanager.FM8KillFederateMessage;
27  import org.sim0mq.message.modelcontroller.MC1StatusMessage;
28  import org.zeromq.SocketType;
29  import org.zeromq.ZContext;
30  import org.zeromq.ZMQ;
31  import org.zeromq.ZMQException;
32  
33  /**
34   * The FederateStarter start listening on the given port for messages to start components. Report back via the call-back port on
35   * the status of the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
36   * <p>
37   * Copyright (c) 2016-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
38   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
39   * </p>
40   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
41   * initial version Mar 1, 2017 <br>
42   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
43   */
44  public class FederateStarter
45  {
46      /** the port number to listen on. */
47      private final int fsPort;
48  
49      /** the first port to be used for the models, inclusive. */
50      private final int startPort;
51  
52      /** the last port to be used for the models, inclusive. */
53      private final int endPort;
54  
55      /** the running programs this FederateStarter started. The String identifies the process (e.g., a UUID or a model id). */
56      @SuppressWarnings("checkstyle:visibilitymodifier")
57      protected Map<Object, Process> runningProcessMap = Collections.synchronizedMap(new LinkedHashMap<>());
58  
59      /** the ports where the models listen. The String identifies the process (e.g., a UUID or a model id). */
60      private Map<Object, Integer> modelPortMap = Collections.synchronizedMap(new LinkedHashMap<>());
61  
62      /** the StartFederate messages. */
63      private Map<Object, FM1StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new LinkedHashMap<>());
64  
65      /** the software properties. */
66      @SuppressWarnings("checkstyle:visibilitymodifier")
67      final Properties softwareProperties;
68  
69      /** the 0mq socket. */
70      private ZMQ.Socket fsSocket;
71  
72      /** the 0mq context. */
73      private ZContext fsContext;
74  
75      /** message count. */
76      private long messageCount = 0;
77  
78      /** does the Federate Starter concern models with an MC or just processes? */
79      private final boolean modelController;
80  
81      /**
82       * @param fsPort the port number to listen on
83       * @param softwareProperties the software properties to use
84       * @param startPort first port to be used for the models, inclusive
85       * @param endPort last port to be used for the models, inclusive
86       * @param modelController does the Federate Starter concern models with an MC or just processes?
87       * @throws Sim0MQException on error
88       * @throws SerializationException on error
89       */
90      public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort,
91              final boolean modelController) throws Sim0MQException, SerializationException
92      {
93          this.softwareProperties = softwareProperties;
94          this.fsPort = fsPort;
95          this.startPort = startPort;
96          this.endPort = endPort;
97          this.modelController = modelController;
98  
99          this.fsContext = new ZContext(1);
100 
101         this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
102         this.fsSocket.bind("tcp://*:" + this.fsPort);
103 
104         while (!Thread.currentThread().isInterrupted())
105         {
106             // Wait for next request from the client -- first the identity (String) and the delimiter (#0)
107             try
108             {
109                 String identity = this.fsSocket.recvStr();
110                 this.fsSocket.recvStr();
111 
112                 byte[] request = this.fsSocket.recv(0);
113                 Sim0MQMessage message = Sim0MQMessage.decode(request);
114                 String messageTypeId = message.getMessageTypeId().toString();
115                 String receiverId = message.getReceiverId().toString();
116 
117                 System.out.println("Received " + Sim0MQMessage.print(message.createObjectArray()));
118 
119                 if (receiverId.equals("FS"))
120                 {
121                     switch (messageTypeId)
122                     {
123                         case "FM.1":
124                             processStartFederate(identity, message);
125                             break;
126 
127                         case "FM.8":
128                             processKillFederate(identity, message);
129                             break;
130 
131                         case "FM.9":
132                             // processKillAllFederates(senderId, uniqueId);
133                             break;
134 
135                         default:
136                             // wrong message
137                             System.err.println("Received unknown message -- not processed: " + messageTypeId);
138                     }
139                 }
140                 else
141                 {
142                     // wrong receiver
143                     System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
144                 }
145             }
146             catch (ZMQException e)
147             {
148                 System.err.println(e.getMessage());
149             }
150         }
151 
152         try
153         {
154             this.fsSocket.close();
155             this.fsContext.destroy();
156         }
157         catch (Exception e)
158         {
159             System.err.println(e.getMessage());
160         }
161     }
162 
163     /**
164      * Process FM.2 message and send MC.2 message back.
165      * @param identity reply id for REQ-ROUTER pattern
166      * @param message Message; the message
167      * @throws Sim0MQException on error
168      * @throws SerializationException on error
169      */
170     private void processStartFederate(final String identity, final Sim0MQMessage message)
171             throws Sim0MQException, SerializationException
172     {
173         FM1StartFederateMessage startFederateMessage = new FM1StartFederateMessage(message.createObjectArray());
174         String error = "";
175 
176         int modelPort = findFreePortNumber();
177 
178         if (modelPort == -1)
179         {
180             error = "No free port number";
181         }
182 
183         else
184 
185         {
186             try
187             {
188                 ProcessBuilder pb = new ProcessBuilder();
189 
190                 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
191                 pb.directory(workingPath.toFile());
192 
193                 String softwareCode = "";
194                 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
195                 {
196                     System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
197                             + " in software properties file");
198                 }
199                 else
200                 {
201                     softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
202 
203                     List<String> pbArgs = new ArrayList<>();
204                     pbArgs.add(softwareCode);
205                     pbArgs.add(startFederateMessage.getArgsBefore());
206                     pbArgs.add(startFederateMessage.getModelPath());
207                     pbArgs.addAll(Arrays.asList(
208                             startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
209                     pb.command(pbArgs);
210 
211                     String stdIn = startFederateMessage.getRedirectStdin();
212                     String stdOut = startFederateMessage.getRedirectStdout();
213                     String stdErr = startFederateMessage.getRedirectStderr();
214 
215                     if (stdIn.length() > 0)
216                     {
217                         // TODO working dir path if not absolute?
218                         File stdInFile = new File(stdIn);
219                         pb.redirectInput(stdInFile);
220                     }
221 
222                     if (stdOut.length() > 0)
223                     {
224                         // TODO working dir path if not absolute?
225                         File stdOutFile = new File(stdOut);
226                         pb.redirectOutput(stdOutFile);
227                     }
228 
229                     if (stdErr.length() > 0)
230                     {
231                         // TODO working dir path if not absolute?
232                         File stdErrFile = new File(stdErr);
233                         pb.redirectError(stdErrFile);
234                     }
235 
236                     new Thread()
237                     {
238                         /** {@inheritDoc} */
239                         @Override
240                         public void run()
241                         {
242                             try
243                             {
244                                 Process process = pb.start();
245                                 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
246                                 System.err.println("Process started:" + process.isAlive());
247                             }
248                             catch (IOException exception)
249                             {
250                                 exception.printStackTrace();
251                             }
252                         }
253                     }.start();
254 
255                     this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
256                     this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
257 
258                     // Thread.sleep(1000);
259 
260                     // wait till the model is ready...
261                     System.out.println("modelController : " + this.modelController);
262                     if (this.modelController)
263                     {
264                         error = waitForModelStarted(startFederateMessage.getFederationId(),
265                                 startFederateMessage.getInstanceId(), modelPort);
266                     }
267                 }
268             }
269             catch (IOException exception)
270             {
271                 exception.printStackTrace();
272                 error = exception.getMessage();
273             }
274         }
275 
276         System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
277 
278         // Send reply back to client
279         this.fsSocket.sendMore(identity);
280         this.fsSocket.sendMore("");
281         //@formatter:off
282         byte[] fs2Message = new FS2FederateStartedMessage.Builder()
283                 .setSimulationRunId(startFederateMessage.getFederationId())
284                 .setInstanceId(startFederateMessage.getInstanceId())
285                 .setSenderId("FS")
286                 .setReceiverId(startFederateMessage.getSenderId())
287                 .setMessageId(++this.messageCount)
288                 .setStatus(error.isEmpty() ? "started" : "error")
289                 .setError(error)
290                 .setModelPort(modelPort)
291                 .build()
292                 .createByteArray();
293         this.fsSocket.send(fs2Message, 0);
294         //@formatter:on
295     }
296 
297     /**
298      * Find a free port for the model.
299      * @return the first free fort number in the range startPort - endPort, inclusive
300      */
301     private int findFreePortNumber()
302     {
303         for (int port = this.startPort; port <= this.endPort; port++)
304         {
305             if (!this.modelPortMap.containsValue(port))
306             {
307                 // try if the port is really free
308                 ZMQ.Socket testSocket = null;
309                 try
310                 {
311                     testSocket = this.fsContext.createSocket(SocketType.REP);
312                     testSocket.bind("tcp://127.0.0.1:" + port);
313                     testSocket.unbind("tcp://127.0.0.1:" + port);
314                     testSocket.close();
315                     return port;
316                 }
317                 catch (Exception exception)
318                 {
319                     // port was not free
320                     if (testSocket != null)
321                     {
322                         try
323                         {
324                             testSocket.close();
325                         }
326                         catch (Exception e)
327                         {
328                             // ignore.
329                         }
330                     }
331                 }
332             }
333         }
334         return -1;
335     }
336 
337     /**
338      * Wait for simulation to end using status polling with message FM.5.
339      * @param federationRunId the name of the federation
340      * @param modelId the String id of the model
341      * @param modelPort port on which the model is listening
342      * @return empty String for no error, filled String for error
343      * @throws Sim0MQException on error
344      * @throws SerializationException on error
345      */
346     private String waitForModelStarted(final Object federationRunId, final Object modelId, final int modelPort)
347             throws Sim0MQException, SerializationException
348     {
349         boolean ok = true;
350         String error = "";
351         ZMQ.Socket modelSocket = null;
352         try
353         {
354             modelSocket = this.fsContext.createSocket(SocketType.REQ);
355             modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
356             modelSocket.connect("tcp://127.0.0.1:" + modelPort);
357         }
358         catch (Exception exception)
359         {
360             exception.printStackTrace();
361             ok = false;
362             error = exception.getMessage();
363         }
364 
365         boolean started = false;
366         while (ok && !started)
367         {
368             byte[] fs1Message =
369                     new FS1RequestStatusMessage(federationRunId, "FS", modelId, ++this.messageCount).createByteArray();
370             modelSocket.send(fs1Message, 0);
371             System.out.println("Sent: FS.1 to " + modelId + ", waiting on MC1");
372 
373             byte[] reply = modelSocket.recv(0);
374             Object[] objectArray = Sim0MQMessage.decodeToArray(reply);
375             System.out.println("Received\n" + Sim0MQMessage.print(objectArray));
376             MC1StatusMessage replyMessage = new MC1StatusMessage(objectArray);
377 
378             // Synchronous and single-threaded, so the messageCount cannot change between send and receive
379             if (!replyMessage.getStatus().equals("error") && !replyMessage.getStatus().equals("ended")
380                     && ((Long) replyMessage.getReplyToId()).longValue() == this.messageCount)
381             {
382                 if (replyMessage.getStatus().equals("started"))
383                 {
384                     started = true;
385                 }
386                 else
387                 {
388                     // wait a second
389                     try
390                     {
391                         Thread.sleep(100);
392                     }
393                     catch (InterruptedException ie)
394                     {
395                         // ignore
396                     }
397                 }
398             }
399             else
400             {
401                 ok = false;
402                 error = replyMessage.getError();
403                 System.err.println("Simulation start error -- status = " + replyMessage.getStatus());
404                 System.err.println("Error message = " + error);
405             }
406         }
407 
408         if (modelSocket != null)
409         {
410             modelSocket.close();
411         }
412 
413         return error;
414     }
415 
416     /**
417      * Process FM.8 message and send FS.4 message back.
418      * @param identity reply id for REQ-ROUTER pattern
419      * @param message the message
420      * @throws Sim0MQException on error
421      * @throws SerializationException on error
422      */
423     private void processKillFederate(final String identity, final Sim0MQMessage message)
424             throws Sim0MQException, SerializationException
425     {
426         boolean status = true;
427         String error = "";
428 
429         Object federationRunId = message.getFederationId();
430         Object senderId = message.getSenderId();
431 
432         FM8KillFederateMessage killMessage = new FM8KillFederateMessage(message.createObjectArray());
433         Object modelId = killMessage.getInstanceId();
434         if (!this.modelPortMap.containsKey(modelId))
435         {
436             status = false;
437             error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
438         }
439         else
440         {
441             int modelPort = this.modelPortMap.remove(modelId);
442             Process process = this.runningProcessMap.remove(modelId);
443 
444             try
445             {
446                 try
447                 {
448                     ZMQ.Socket modelSocket = this.fsContext.createSocket(SocketType.REQ);
449                     modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
450                     modelSocket.connect("tcp://127.0.0.1:" + modelPort);
451 
452                     byte[] fs3Message =
453                             Sim0MQMessage.encodeUTF8(true, federationRunId, "FS", modelId, "FS.3", ++this.messageCount);
454                     modelSocket.send(fs3Message, 0);
455 
456                     modelSocket.close();
457                 }
458                 catch (Exception exception)
459                 {
460                     exception.printStackTrace();
461                     status = true;
462                     error = exception.getMessage();
463                 }
464 
465                 try
466                 {
467                     Thread.sleep(100);
468                 }
469                 catch (InterruptedException ie)
470                 {
471                     // ignore
472                 }
473 
474                 if (process != null && process.isAlive())
475                 {
476                     process.destroyForcibly();
477                 }
478 
479                 FM1StartFederateMessage sfm = this.startFederateMessages.get(modelId);
480                 if (sfm.isDeleteStdout())
481                 {
482                     if (sfm.getRedirectStdout().length() > 0)
483                     {
484                         File stdOutFile = new File(sfm.getRedirectStdout());
485                         stdOutFile.delete();
486                     }
487                 }
488 
489                 if (sfm.isDeleteStderr())
490                 {
491                     if (sfm.getRedirectStderr().length() > 0)
492                     {
493                         File stdErrFile = new File(sfm.getRedirectStderr());
494                         stdErrFile.delete();
495                     }
496                 }
497 
498                 if (sfm.isDeleteWorkingDirectory())
499                 {
500                     File workingDir = new File(sfm.getWorkingDirectory());
501                     workingDir.delete();
502                 }
503             }
504             catch (Exception exception)
505             {
506                 exception.printStackTrace();
507                 status = false;
508                 error = exception.getMessage();
509             }
510 
511             byte[] fs4Message =
512                     new FS4FederateKilledMessage(federationRunId, "FS", senderId, ++this.messageCount, modelId, status, error)
513                             .createByteArray();
514             this.fsSocket.sendMore(identity);
515             this.fsSocket.sendMore("");
516             this.fsSocket.send(fs4Message, 0);
517         }
518     }
519 
520     /**
521      * @return modelController
522      */
523     public boolean isModelController()
524     {
525         return this.modelController;
526     }
527 
528     /**
529      * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
530      * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
531      * @param args the federation name and port on which the FederateStarter is listening
532      * @throws Sim0MQException on error
533      * @throws SerializationException on error
534      */
535     public static void main(final String[] args) throws Sim0MQException, SerializationException
536     {
537         if (args.length < 4)
538         {
539             System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
540             System.exit(-1);
541         }
542 
543         String sPort = args[0];
544         int port = 0;
545         try
546         {
547             port = Integer.parseInt(sPort);
548         }
549         catch (NumberFormatException nfe)
550         {
551             System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
552             System.exit(-1);
553         }
554         if (port == 0 || port > 65535)
555         {
556             System.err.println("PortNumber should be between 1 and 65535");
557             System.exit(-1);
558         }
559 
560         String propertiesFile = args[1];
561         Properties softwareProperties = new Properties();
562         InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
563         try
564         {
565             softwareProperties.load(propertiesStream);
566         }
567         catch (IOException | NullPointerException e)
568         {
569             System.err.println("Could not find or read software properties file " + propertiesFile);
570             System.exit(-1);
571         }
572 
573         String sStartPort = args[2];
574         int startPort = 0;
575         try
576         {
577             startPort = Integer.parseInt(sStartPort);
578         }
579         catch (NumberFormatException nfe)
580         {
581             System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
582             System.exit(-1);
583         }
584         if (startPort == 0 || startPort > 65535)
585         {
586             System.err.println("startPort should be between 1 and 65535");
587             System.exit(-1);
588         }
589 
590         String sEndPort = args[3];
591         int endPort = 0;
592         try
593         {
594             endPort = Integer.parseInt(sEndPort);
595         }
596         catch (NumberFormatException nfe)
597         {
598             System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
599             System.exit(-1);
600         }
601         if (endPort == 0 || endPort > 65535)
602         {
603             System.err.println("endPort should be between 1 and 65535");
604             System.exit(-1);
605         }
606 
607         new FederateStarter(port, softwareProperties, startPort, endPort, true);
608     }
609 
610 }