View Javadoc
1   package org.sim0mq.federatestarter;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.nio.file.Files;
7   import java.nio.file.Path;
8   import java.nio.file.Paths;
9   import java.util.ArrayList;
10  import java.util.Arrays;
11  import java.util.Collections;
12  import java.util.LinkedHashMap;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Properties;
16  import java.util.UUID;
17  
18  import org.djutils.io.URLResource;
19  import org.djutils.serialization.SerializationException;
20  import org.sim0mq.Sim0MQException;
21  import org.sim0mq.message.Sim0MQMessage;
22  import org.sim0mq.message.federatestarter.FS1RequestStatusMessage;
23  import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
24  import org.sim0mq.message.federatestarter.FS4FederateKilledMessage;
25  import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
26  import org.sim0mq.message.federationmanager.FM8KillFederateMessage;
27  import org.sim0mq.message.modelcontroller.MC1StatusMessage;
28  import org.zeromq.SocketType;
29  import org.zeromq.ZContext;
30  import org.zeromq.ZMQ;
31  import org.zeromq.ZMQException;
32  
33  /**
34   * The FederateStarter start listening on the given port for messages to start components. Report back via the call-back port on
35   * the status of the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
36   * <p>
37   * Copyright (c) 2016-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
38   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
39   * </p>
40   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
41   * initial version Mar 1, 2017 <br>
42   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
43   */
44  public class FederateStarter
45  {
46      /** the port number to listen on. */
47      private final int fsPort;
48  
49      /** the first port to be used for the models, inclusive. */
50      private final int startPort;
51  
52      /** the last port to be used for the models, inclusive. */
53      private final int endPort;
54  
55      /** the running programs this FederateStarter started. The String identifies the process (e.g., a UUID or a model id). */
56      @SuppressWarnings("checkstyle:visibilitymodifier")
57      protected Map<Object, Process> runningProcessMap = Collections.synchronizedMap(new LinkedHashMap<>());
58  
59      /** the ports where the models listen. The String identifies the process (e.g., a UUID or a model id). */
60      private Map<Object, Integer> modelPortMap = Collections.synchronizedMap(new LinkedHashMap<>());
61  
62      /** the StartFederate messages. */
63      private Map<Object, FM1StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new LinkedHashMap<>());
64  
65      /** the software properties. */
66      @SuppressWarnings("checkstyle:visibilitymodifier")
67      final Properties softwareProperties;
68  
69      /** the 0mq socket. */
70      private ZMQ.Socket fsSocket;
71  
72      /** the 0mq context. */
73      private ZContext fsContext;
74  
75      /** message count. */
76      private long messageCount = 0;
77  
78      /** does the Federate Starter concern models with an MC or just processes? */
79      private final boolean modelController;
80  
81      /**
82       * @param fsPort the port number to listen on
83       * @param softwareProperties the software properties to use
84       * @param startPort first port to be used for the models, inclusive
85       * @param endPort last port to be used for the models, inclusive
86       * @param modelController does the Federate Starter concern models with an MC or just processes?
87       * @throws Sim0MQException on error
88       * @throws SerializationException on error
89       */
90      public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort,
91              final boolean modelController) throws Sim0MQException, SerializationException
92      {
93          this.softwareProperties = softwareProperties;
94          this.fsPort = fsPort;
95          this.startPort = startPort;
96          this.endPort = endPort;
97          this.modelController = modelController;
98  
99          this.fsContext = new ZContext(1);
100 
101         this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
102         this.fsSocket.bind("tcp://*:" + this.fsPort);
103 
104         while (!Thread.currentThread().isInterrupted())
105         {
106             // Wait for next request from the client -- first the identity (String) and the delimiter (#0)
107             try
108             {
109                 String identity = this.fsSocket.recvStr();
110                 this.fsSocket.recvStr();
111 
112                 byte[] request = this.fsSocket.recv(0);
113                 Sim0MQMessage message = Sim0MQMessage.decode(request);
114                 String messageTypeId = message.getMessageTypeId().toString();
115                 String receiverId = message.getReceiverId().toString();
116 
117                 System.out.println("Received " + Sim0MQMessage.print(message.createObjectArray()));
118 
119                 if (receiverId.equals("FS"))
120                 {
121                     switch (messageTypeId)
122                     {
123                         case "FM.1":
124                             processStartFederate(identity, message);
125                             break;
126 
127                         case "FM.8":
128                             processKillFederate(identity, message);
129                             break;
130 
131                         case "FM.9":
132                             // processKillAllFederates(senderId, uniqueId);
133                             break;
134 
135                         default:
136                             // wrong message
137                             System.err.println("Received unknown message -- not processed: " + messageTypeId);
138                     }
139                 }
140                 else
141                 {
142                     // wrong receiver
143                     System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
144                 }
145             }
146             catch (ZMQException e)
147             {
148                 System.err.println(e.getMessage());
149             }
150         }
151 
152         try
153         {
154             this.fsSocket.close();
155             this.fsContext.destroy();
156         }
157         catch (Exception e)
158         {
159             System.err.println(e.getMessage());
160         }
161     }
162 
163     /**
164      * Process FM.2 message and send MC.2 message back.
165      * @param identity reply id for REQ-ROUTER pattern
166      * @param message Message; the message
167      * @throws Sim0MQException on error
168      * @throws SerializationException on error
169      */
170     private void processStartFederate(final String identity, final Sim0MQMessage message)
171             throws Sim0MQException, SerializationException
172     {
173         FM1StartFederateMessage startFederateMessage = new FM1StartFederateMessage(message.createObjectArray());
174         String error = "";
175 
176         int modelPort = findFreePortNumber();
177 
178         if (modelPort == -1)
179         {
180             error = "No free port number";
181         }
182 
183         else
184 
185         {
186             try
187             {
188                 ProcessBuilder pb = new ProcessBuilder();
189 
190                 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
191                 pb.directory(workingPath.toFile());
192 
193                 String softwareCode = "";
194                 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
195                 {
196                     System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
197                             + " in software properties file");
198                 }
199                 else
200                 {
201                     softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
202 
203                     List<String> pbArgs = new ArrayList<>();
204                     pbArgs.add(softwareCode);
205                     pbArgs.add(startFederateMessage.getArgsBefore());
206                     pbArgs.add(startFederateMessage.getModelPath());
207                     pbArgs.addAll(Arrays.asList(
208                             startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
209                     pb.command(pbArgs);
210 
211                     String stdIn = startFederateMessage.getRedirectStdin();
212                     String stdOut = startFederateMessage.getRedirectStdout();
213                     String stdErr = startFederateMessage.getRedirectStderr();
214 
215                     if (stdIn.length() > 0)
216                     {
217                         // TODO working dir path if not absolute?
218                         File stdInFile = new File(stdIn);
219                         pb.redirectInput(stdInFile);
220                     }
221 
222                     if (stdOut.length() > 0)
223                     {
224                         // TODO working dir path if not absolute?
225                         File stdOutFile = new File(stdOut);
226                         pb.redirectOutput(stdOutFile);
227                     }
228 
229                     if (stdErr.length() > 0)
230                     {
231                         // TODO working dir path if not absolute?
232                         File stdErrFile = new File(stdErr);
233                         pb.redirectError(stdErrFile);
234                     }
235 
236                     new Thread()
237                     {
238                         @Override
239                         public void run()
240                         {
241                             try
242                             {
243                                 Process process = pb.start();
244                                 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
245                                 System.err.println("Process started:" + process.isAlive());
246                             }
247                             catch (IOException exception)
248                             {
249                                 exception.printStackTrace();
250                             }
251                         }
252                     }.start();
253 
254                     this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
255                     this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
256 
257                     // Thread.sleep(1000);
258 
259                     // wait till the model is ready...
260                     System.out.println("modelController : " + this.modelController);
261                     if (this.modelController)
262                     {
263                         error = waitForModelStarted(startFederateMessage.getFederationId(),
264                                 startFederateMessage.getInstanceId(), modelPort);
265                     }
266                 }
267             }
268             catch (IOException exception)
269             {
270                 exception.printStackTrace();
271                 error = exception.getMessage();
272             }
273         }
274 
275         System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
276 
277         // Send reply back to client
278         this.fsSocket.sendMore(identity);
279         this.fsSocket.sendMore("");
280         //@formatter:off
281         byte[] fs2Message = new FS2FederateStartedMessage.Builder()
282                 .setSimulationRunId(startFederateMessage.getFederationId())
283                 .setInstanceId(startFederateMessage.getInstanceId())
284                 .setSenderId("FS")
285                 .setReceiverId(startFederateMessage.getSenderId())
286                 .setMessageId(++this.messageCount)
287                 .setStatus(error.isEmpty() ? "started" : "error")
288                 .setError(error)
289                 .setModelPort(modelPort)
290                 .build()
291                 .createByteArray();
292         this.fsSocket.send(fs2Message, 0);
293         //@formatter:on
294     }
295 
296     /**
297      * Find a free port for the model.
298      * @return the first free fort number in the range startPort - endPort, inclusive
299      */
300     private int findFreePortNumber()
301     {
302         for (int port = this.startPort; port <= this.endPort; port++)
303         {
304             if (!this.modelPortMap.containsValue(port))
305             {
306                 // try if the port is really free
307                 ZMQ.Socket testSocket = null;
308                 try
309                 {
310                     testSocket = this.fsContext.createSocket(SocketType.REP);
311                     testSocket.bind("tcp://127.0.0.1:" + port);
312                     testSocket.unbind("tcp://127.0.0.1:" + port);
313                     testSocket.close();
314                     return port;
315                 }
316                 catch (Exception exception)
317                 {
318                     // port was not free
319                     if (testSocket != null)
320                     {
321                         try
322                         {
323                             testSocket.close();
324                         }
325                         catch (Exception e)
326                         {
327                             // ignore.
328                         }
329                     }
330                 }
331             }
332         }
333         return -1;
334     }
335 
336     /**
337      * Wait for simulation to end using status polling with message FM.5.
338      * @param federationRunId the name of the federation
339      * @param modelId the String id of the model
340      * @param modelPort port on which the model is listening
341      * @return empty String for no error, filled String for error
342      * @throws Sim0MQException on error
343      * @throws SerializationException on error
344      */
345     private String waitForModelStarted(final Object federationRunId, final Object modelId, final int modelPort)
346             throws Sim0MQException, SerializationException
347     {
348         boolean ok = true;
349         String error = "";
350         ZMQ.Socket modelSocket = null;
351         try
352         {
353             modelSocket = this.fsContext.createSocket(SocketType.REQ);
354             modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
355             modelSocket.connect("tcp://127.0.0.1:" + modelPort);
356         }
357         catch (Exception exception)
358         {
359             exception.printStackTrace();
360             ok = false;
361             error = exception.getMessage();
362         }
363 
364         boolean started = false;
365         while (ok && !started)
366         {
367             byte[] fs1Message =
368                     new FS1RequestStatusMessage(federationRunId, "FS", modelId, ++this.messageCount).createByteArray();
369             modelSocket.send(fs1Message, 0);
370             System.out.println("Sent: FS.1 to " + modelId + ", waiting on MC1");
371 
372             byte[] reply = modelSocket.recv(0);
373             Object[] objectArray = Sim0MQMessage.decodeToArray(reply);
374             System.out.println("Received\n" + Sim0MQMessage.print(objectArray));
375             MC1StatusMessage replyMessage = new MC1StatusMessage(objectArray);
376 
377             // Synchronous and single-threaded, so the messageCount cannot change between send and receive
378             if (!replyMessage.getStatus().equals("error") && !replyMessage.getStatus().equals("ended")
379                     && ((Long) replyMessage.getReplyToId()).longValue() == this.messageCount)
380             {
381                 if (replyMessage.getStatus().equals("started"))
382                 {
383                     started = true;
384                 }
385                 else
386                 {
387                     // wait a second
388                     try
389                     {
390                         Thread.sleep(100);
391                     }
392                     catch (InterruptedException ie)
393                     {
394                         // ignore
395                     }
396                 }
397             }
398             else
399             {
400                 ok = false;
401                 error = replyMessage.getError();
402                 System.err.println("Simulation start error -- status = " + replyMessage.getStatus());
403                 System.err.println("Error message = " + error);
404             }
405         }
406 
407         if (modelSocket != null)
408         {
409             modelSocket.close();
410         }
411 
412         return error;
413     }
414 
415     /**
416      * Process FM.8 message and send FS.4 message back.
417      * @param identity reply id for REQ-ROUTER pattern
418      * @param message the message
419      * @throws Sim0MQException on error
420      * @throws SerializationException on error
421      */
422     private void processKillFederate(final String identity, final Sim0MQMessage message)
423             throws Sim0MQException, SerializationException
424     {
425         boolean status = true;
426         String error = "";
427 
428         Object federationRunId = message.getFederationId();
429         Object senderId = message.getSenderId();
430 
431         FM8KillFederateMessage killMessage = new FM8KillFederateMessage(message.createObjectArray());
432         Object modelId = killMessage.getInstanceId();
433         if (!this.modelPortMap.containsKey(modelId))
434         {
435             status = false;
436             error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
437         }
438         else
439         {
440             int modelPort = this.modelPortMap.remove(modelId);
441             Process process = this.runningProcessMap.remove(modelId);
442 
443             try
444             {
445                 try
446                 {
447                     ZMQ.Socket modelSocket = this.fsContext.createSocket(SocketType.REQ);
448                     modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
449                     modelSocket.connect("tcp://127.0.0.1:" + modelPort);
450 
451                     byte[] fs3Message =
452                             Sim0MQMessage.encodeUTF8(true, federationRunId, "FS", modelId, "FS.3", ++this.messageCount);
453                     modelSocket.send(fs3Message, 0);
454 
455                     modelSocket.close();
456                 }
457                 catch (Exception exception)
458                 {
459                     exception.printStackTrace();
460                     status = true;
461                     error = exception.getMessage();
462                 }
463 
464                 try
465                 {
466                     Thread.sleep(100);
467                 }
468                 catch (InterruptedException ie)
469                 {
470                     // ignore
471                 }
472 
473                 if (process != null && process.isAlive())
474                 {
475                     process.destroyForcibly();
476                 }
477 
478                 FM1StartFederateMessage sfm = this.startFederateMessages.get(modelId);
479                 if (sfm.isDeleteStdout())
480                 {
481                     if (sfm.getRedirectStdout().length() > 0)
482                     {
483                         File stdOutFile = new File(sfm.getRedirectStdout());
484                         stdOutFile.delete();
485                     }
486                 }
487 
488                 if (sfm.isDeleteStderr())
489                 {
490                     if (sfm.getRedirectStderr().length() > 0)
491                     {
492                         File stdErrFile = new File(sfm.getRedirectStderr());
493                         stdErrFile.delete();
494                     }
495                 }
496 
497                 if (sfm.isDeleteWorkingDirectory())
498                 {
499                     File workingDir = new File(sfm.getWorkingDirectory());
500                     workingDir.delete();
501                 }
502             }
503             catch (Exception exception)
504             {
505                 exception.printStackTrace();
506                 status = false;
507                 error = exception.getMessage();
508             }
509 
510             byte[] fs4Message =
511                     new FS4FederateKilledMessage(federationRunId, "FS", senderId, ++this.messageCount, modelId, status, error)
512                             .createByteArray();
513             this.fsSocket.sendMore(identity);
514             this.fsSocket.sendMore("");
515             this.fsSocket.send(fs4Message, 0);
516         }
517     }
518 
519     /**
520      * @return modelController
521      */
522     public boolean isModelController()
523     {
524         return this.modelController;
525     }
526 
527     /**
528      * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
529      * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
530      * @param args the federation name and port on which the FederateStarter is listening
531      * @throws Sim0MQException on error
532      * @throws SerializationException on error
533      */
534     public static void main(final String[] args) throws Sim0MQException, SerializationException
535     {
536         if (args.length < 4)
537         {
538             System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
539             System.exit(-1);
540         }
541 
542         String sPort = args[0];
543         int port = 0;
544         try
545         {
546             port = Integer.parseInt(sPort);
547         }
548         catch (NumberFormatException nfe)
549         {
550             System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
551             System.exit(-1);
552         }
553         if (port == 0 || port > 65535)
554         {
555             System.err.println("PortNumber should be between 1 and 65535");
556             System.exit(-1);
557         }
558 
559         String propertiesFile = args[1];
560         Properties softwareProperties = new Properties();
561         InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
562         try
563         {
564             softwareProperties.load(propertiesStream);
565         }
566         catch (IOException | NullPointerException e)
567         {
568             System.err.println("Could not find or read software properties file " + propertiesFile);
569             System.exit(-1);
570         }
571 
572         String sStartPort = args[2];
573         int startPort = 0;
574         try
575         {
576             startPort = Integer.parseInt(sStartPort);
577         }
578         catch (NumberFormatException nfe)
579         {
580             System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
581             System.exit(-1);
582         }
583         if (startPort == 0 || startPort > 65535)
584         {
585             System.err.println("startPort should be between 1 and 65535");
586             System.exit(-1);
587         }
588 
589         String sEndPort = args[3];
590         int endPort = 0;
591         try
592         {
593             endPort = Integer.parseInt(sEndPort);
594         }
595         catch (NumberFormatException nfe)
596         {
597             System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
598             System.exit(-1);
599         }
600         if (endPort == 0 || endPort > 65535)
601         {
602             System.err.println("endPort should be between 1 and 65535");
603             System.exit(-1);
604         }
605 
606         new FederateStarter(port, softwareProperties, startPort, endPort, true);
607     }
608 
609 }