View Javadoc
1   package org.sim0mq.federatestarter;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.InputStream;
6   import java.nio.file.Files;
7   import java.nio.file.Path;
8   import java.nio.file.Paths;
9   import java.util.ArrayList;
10  import java.util.Arrays;
11  import java.util.Collections;
12  import java.util.LinkedHashMap;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Properties;
16  import java.util.UUID;
17  
18  import org.djutils.io.URLResource;
19  import org.djutils.serialization.SerializationException;
20  import org.sim0mq.Sim0MQException;
21  import org.sim0mq.message.MessageStatus;
22  import org.sim0mq.message.SimulationMessage;
23  import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
24  import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
25  import org.zeromq.SocketType;
26  import org.zeromq.ZContext;
27  import org.zeromq.ZMQ;
28  import org.zeromq.ZMQException;
29  
30  /**
31   * The FederateStarter start listening on the given port for messages to start components. Report back via the call-back port on
32   * the status of the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
33   * <p>
34   * Copyright (c) 2016-2019 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
35   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
36   * </p>
37   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
38   * initial version Mar 1, 2017 <br>
39   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
40   */
41  public class FederateStarter
42  {
43      /** the port number to listen on. */
44      private final int fsPort;
45  
46      /** the first port to be used for the models, inclusive. */
47      private final int startPort;
48  
49      /** the last port to be used for the models, inclusive. */
50      private final int endPort;
51  
52      /** the running programs this FederateStarter started. The String identifies the process (e.g., a UUID or a model id). */
53      @SuppressWarnings("checkstyle:visibilitymodifier")
54      protected Map<String, Process> runningProcessMap = Collections.synchronizedMap(new LinkedHashMap<>());
55  
56      /** the ports where the models listen. The String identifies the process (e.g., a UUID or a model id). */
57      private Map<String, Integer> modelPortMap = Collections.synchronizedMap(new LinkedHashMap<>());
58  
59      /** the StartFederate messages. */
60      private Map<String, FM1StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new LinkedHashMap<>());
61  
62      /** the software properties. */
63      @SuppressWarnings("checkstyle:visibilitymodifier")
64      final Properties softwareProperties;
65  
66      /** the 0mq socket. */
67      private ZMQ.Socket fsSocket;
68  
69      /** the 0mq context. */
70      private ZContext fsContext;
71  
72      /** message count. */
73      private long messageCount = 0;
74  
75      /** does the Federate Starter concern models with an MC or just processes? */
76      private final boolean modelController;
77  
78      /**
79       * @param fsPort the port number to listen on
80       * @param softwareProperties the software properties to use
81       * @param startPort first port to be used for the models, inclusive
82       * @param endPort last port to be used for the models, inclusive
83       * @param modelController does the Federate Starter concern models with an MC or just processes?
84       * @throws Sim0MQException on error
85       * @throws SerializationException on error
86       */
87      public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort,
88              final boolean modelController) throws Sim0MQException, SerializationException
89      {
90          super();
91          this.softwareProperties = softwareProperties;
92          this.fsPort = fsPort;
93          this.startPort = startPort;
94          this.endPort = endPort;
95          this.modelController = modelController;
96  
97          this.fsContext = new ZContext(1);
98  
99          this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
100         this.fsSocket.bind("tcp://*:" + this.fsPort);
101 
102         while (!Thread.currentThread().isInterrupted())
103         {
104             // Wait for next request from the client -- first the identity (String) and the delimiter (#0)
105             try
106             {
107                 String identity = this.fsSocket.recvStr();
108                 this.fsSocket.recvStr();
109 
110                 byte[] request = this.fsSocket.recv(0);
111                 Object[] fields = SimulationMessage.decode(request);
112                 String messageTypeId = fields[4].toString();
113                 String receiverId = fields[3].toString();
114 
115                 System.out.println("Received " + SimulationMessage.print(fields));
116 
117                 if (receiverId.equals("FS"))
118                 {
119                     switch (messageTypeId)
120                     {
121                         case "FM.1":
122                             processStartFederate(identity, fields);
123                             break;
124 
125                         case "FM.8":
126                             processKillFederate(identity, fields);
127                             break;
128 
129                         case "FM.9":
130                             // processKillAllFederates(senderId, uniqueId);
131                             break;
132 
133                         default:
134                             // wrong message
135                             System.err.println("Received unknown message -- not processed: " + messageTypeId);
136                     }
137                 }
138                 else
139                 {
140                     // wrong receiver
141                     System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
142                 }
143             }
144             catch (ZMQException e)
145             {
146                 System.err.println(e.getMessage());
147             }
148         }
149         
150         try
151         {
152             this.fsSocket.close();
153             this.fsContext.destroy();
154         }
155         catch (Exception e)
156         {
157             System.err.println(e.getMessage());
158         }
159     }
160 
161     /**
162      * Process FM.2 message and send MC.2 message back.
163      * @param identity reply id for REQ-ROUTER pattern
164      * @param fields the message
165      * @throws Sim0MQException on error
166      * @throws SerializationException on error
167      */
168     private void processStartFederate(final String identity, final Object[] fields)
169             throws Sim0MQException, SerializationException
170     {
171         FM1StartFederateMessage startFederateMessage = FM1StartFederateMessage.createMessage(fields, "FS");
172         String error = "";
173 
174         int modelPort = findFreePortNumber();
175 
176         if (modelPort == -1)
177         {
178             error = "No free port number";
179         }
180 
181         else
182 
183         {
184             try
185             {
186                 ProcessBuilder pb = new ProcessBuilder();
187 
188                 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
189                 pb.directory(workingPath.toFile());
190 
191                 String softwareCode = "";
192                 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
193                 {
194                     System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
195                             + " in software properties file");
196                 }
197                 else
198                 {
199                     softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
200 
201                     List<String> pbArgs = new ArrayList<>();
202                     pbArgs.add(softwareCode);
203                     pbArgs.add(startFederateMessage.getArgsBefore());
204                     pbArgs.add(startFederateMessage.getModelPath());
205                     pbArgs.addAll(Arrays.asList(
206                             startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
207                     pb.command(pbArgs);
208 
209                     String stdIn = startFederateMessage.getRedirectStdin();
210                     String stdOut = startFederateMessage.getRedirectStdout();
211                     String stdErr = startFederateMessage.getRedirectStderr();
212 
213                     if (stdIn.length() > 0)
214                     {
215                         // TODO working dir path if not absolute?
216                         File stdInFile = new File(stdIn);
217                         pb.redirectInput(stdInFile);
218                     }
219 
220                     if (stdOut.length() > 0)
221                     {
222                         // TODO working dir path if not absolute?
223                         File stdOutFile = new File(stdOut);
224                         pb.redirectOutput(stdOutFile);
225                     }
226 
227                     if (stdErr.length() > 0)
228                     {
229                         // TODO working dir path if not absolute?
230                         File stdErrFile = new File(stdErr);
231                         pb.redirectError(stdErrFile);
232                     }
233 
234                     new Thread()
235                     {
236                         /** {@inheritDoc} */
237                         @Override
238                         public void run()
239                         {
240                             try
241                             {
242                                 Process process = pb.start();
243                                 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
244                                 System.err.println("Process started:" + process.isAlive());
245                             }
246                             catch (IOException exception)
247                             {
248                                 exception.printStackTrace();
249                             }
250                         }
251                     }.start();
252 
253                     this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
254                     this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
255 
256                     // Thread.sleep(1000);
257 
258                     // wait till the model is ready...
259                     System.out.println("modelController : " + this.modelController);
260                     if (this.modelController)
261                     {
262                         error = waitForModelStarted(startFederateMessage.getSimulationRunId(),
263                                 startFederateMessage.getInstanceId(), modelPort);
264                     }
265                 }
266             }
267             catch (IOException exception)
268             {
269                 exception.printStackTrace();
270                 error = exception.getMessage();
271             }
272         }
273 
274         System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
275 
276         // Send reply back to client
277         this.fsSocket.sendMore(identity);
278         this.fsSocket.sendMore("");
279         //@formatter:off
280         byte[] fs2Message = new FS2FederateStartedMessage.Builder()
281                 .setSimulationRunId(startFederateMessage.getSimulationRunId())
282                 .setInstanceId(startFederateMessage.getInstanceId())
283                 .setSenderId("FS")
284                 .setReceiverId(startFederateMessage.getSenderId())
285                 .setMessageId(++this.messageCount)
286                 .setStatus(error.isEmpty() ? "started" : "error")
287                 .setError(error)
288                 .setModelPort(modelPort)
289                 .build()
290                 .createByteArray();
291         this.fsSocket.send(fs2Message, 0);
292         //@formatter:on
293     }
294 
295     /**
296      * Find a free port for the model.
297      * @return the first free fort number in the range startPort - endPort, inclusive
298      */
299     private int findFreePortNumber()
300     {
301         for (int port = this.startPort; port <= this.endPort; port++)
302         {
303             if (!this.modelPortMap.containsValue(port))
304             {
305                 // try if the port is really free
306                 ZMQ.Socket testSocket = null;
307                 try
308                 {
309                     testSocket = this.fsContext.createSocket(SocketType.REP);
310                     testSocket.bind("tcp://127.0.0.1:" + port);
311                     testSocket.unbind("tcp://127.0.0.1:" + port);
312                     testSocket.close();
313                     return port;
314                 }
315                 catch (Exception exception)
316                 {
317                     // port was not free
318                     if (testSocket != null)
319                     {
320                         try
321                         {
322                             testSocket.close();
323                         }
324                         catch (Exception e)
325                         {
326                             // ignore.
327                         }
328                     }
329                 }
330             }
331         }
332         return -1;
333     }
334 
335     /**
336      * Wait for simulation to end using status polling with message FM.5.
337      * @param federationRunId the name of the federation
338      * @param modelId the String id of the model
339      * @param modelPort port on which the model is listening
340      * @return empty String for no error, filled String for error
341      * @throws Sim0MQException on error
342      * @throws SerializationException on error
343      */
344     private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
345             throws Sim0MQException, SerializationException
346     {
347         boolean ok = true;
348         String error = "";
349         ZMQ.Socket modelSocket = null;
350         try
351         {
352             modelSocket = this.fsContext.createSocket(SocketType.REQ);
353             modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
354             modelSocket.connect("tcp://127.0.0.1:" + modelPort);
355         }
356         catch (Exception exception)
357         {
358             exception.printStackTrace();
359             ok = false;
360             error = exception.getMessage();
361         }
362 
363         boolean started = false;
364         while (ok && !started)
365         {
366             byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
367                     MessageStatus.NEW);
368             modelSocket.send(fs1Message, 0);
369 
370             byte[] reply = modelSocket.recv(0);
371             Object[] replyMessage = SimulationMessage.decode(reply);
372             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
373 
374             if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
375                     && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
376             {
377                 if (replyMessage[9].toString().equals("started"))
378                 {
379                     started = true;
380                 }
381                 else
382                 {
383                     // wait a second
384                     try
385                     {
386                         Thread.sleep(100);
387                     }
388                     catch (InterruptedException ie)
389                     {
390                         // ignore
391                     }
392                 }
393             }
394             else
395             {
396                 ok = false;
397                 error = replyMessage[10].toString();
398                 System.err.println("Simulation start error -- status = " + replyMessage[9]);
399                 System.err.println("Error message = " + replyMessage[10]);
400             }
401         }
402 
403         if (modelSocket != null)
404         {
405             modelSocket.close();
406         }
407 
408         return error;
409     }
410 
411     /**
412      * Process FM.8 message and send FS.4 message back.
413      * @param identity reply id for REQ-ROUTER pattern
414      * @param fields the message
415      * @throws Sim0MQException on error
416      * @throws SerializationException on error
417      */
418     private void processKillFederate(final String identity, final Object[] fields)
419             throws Sim0MQException, SerializationException
420     {
421         boolean status = true;
422         String error = "";
423 
424         Object federationRunId = fields[1];
425         String senderId = fields[2].toString();
426 
427         String modelId = fields[8].toString();
428         if (!this.modelPortMap.containsKey(modelId))
429         {
430             status = false;
431             error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
432         }
433         else
434         {
435             int modelPort = this.modelPortMap.remove(modelId);
436             Process process = this.runningProcessMap.remove(modelId);
437 
438             try
439             {
440                 try
441                 {
442                     ZMQ.Socket modelSocket = this.fsContext.createSocket(SocketType.REQ);
443                     modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
444                     modelSocket.connect("tcp://127.0.0.1:" + modelPort);
445 
446                     byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
447                             ++this.messageCount, MessageStatus.NEW);
448                     modelSocket.send(fs3Message, 0);
449 
450                     modelSocket.close();
451                 }
452                 catch (Exception exception)
453                 {
454                     exception.printStackTrace();
455                     status = true;
456                     error = exception.getMessage();
457                 }
458 
459                 try
460                 {
461                     Thread.sleep(100);
462                 }
463                 catch (InterruptedException ie)
464                 {
465                     // ignore
466                 }
467 
468                 if (process != null && process.isAlive())
469                 {
470                     process.destroyForcibly();
471                 }
472 
473                 FM1StartFederateMessage sfm = this.startFederateMessages.get(modelId);
474                 if (sfm.isDeleteStdout())
475                 {
476                     if (sfm.getRedirectStdout().length() > 0)
477                     {
478                         File stdOutFile = new File(sfm.getRedirectStdout());
479                         stdOutFile.delete();
480                     }
481                 }
482 
483                 if (sfm.isDeleteStderr())
484                 {
485                     if (sfm.getRedirectStderr().length() > 0)
486                     {
487                         File stdErrFile = new File(sfm.getRedirectStderr());
488                         stdErrFile.delete();
489                     }
490                 }
491 
492                 if (sfm.isDeleteWorkingDirectory())
493                 {
494                     File workingDir = new File(sfm.getWorkingDirectory());
495                     workingDir.delete();
496                 }
497             }
498             catch (Exception exception)
499             {
500                 exception.printStackTrace();
501                 status = false;
502                 error = exception.getMessage();
503             }
504 
505             byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
506                     MessageStatus.NEW, modelId, status, error);
507             this.fsSocket.sendMore(identity);
508             this.fsSocket.sendMore("");
509             this.fsSocket.send(fs4Message, 0);
510         }
511     }
512 
513     /**
514      * @return modelController
515      */
516     public boolean isModelController()
517     {
518         return this.modelController;
519     }
520 
521     /**
522      * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
523      * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
524      * @param args the federation name and port on which the FederateStarter is listening
525      * @throws Sim0MQException on error
526      * @throws SerializationException on error
527      */
528     public static void main(final String[] args) throws Sim0MQException, SerializationException
529     {
530         if (args.length < 4)
531         {
532             System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
533             System.exit(-1);
534         }
535 
536         String sPort = args[0];
537         int port = 0;
538         try
539         {
540             port = Integer.parseInt(sPort);
541         }
542         catch (NumberFormatException nfe)
543         {
544             System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
545             System.exit(-1);
546         }
547         if (port == 0 || port > 65535)
548         {
549             System.err.println("PortNumber should be between 1 and 65535");
550             System.exit(-1);
551         }
552 
553         String propertiesFile = args[1];
554         Properties softwareProperties = new Properties();
555         InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
556         try
557         {
558             softwareProperties.load(propertiesStream);
559         }
560         catch (IOException | NullPointerException e)
561         {
562             System.err.println("Could not find or read software properties file " + propertiesFile);
563             System.exit(-1);
564         }
565 
566         String sStartPort = args[2];
567         int startPort = 0;
568         try
569         {
570             startPort = Integer.parseInt(sStartPort);
571         }
572         catch (NumberFormatException nfe)
573         {
574             System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
575             System.exit(-1);
576         }
577         if (startPort == 0 || startPort > 65535)
578         {
579             System.err.println("startPort should be between 1 and 65535");
580             System.exit(-1);
581         }
582 
583         String sEndPort = args[3];
584         int endPort = 0;
585         try
586         {
587             endPort = Integer.parseInt(sEndPort);
588         }
589         catch (NumberFormatException nfe)
590         {
591             System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
592             System.exit(-1);
593         }
594         if (endPort == 0 || endPort > 65535)
595         {
596             System.err.println("endPort should be between 1 and 65535");
597             System.exit(-1);
598         }
599 
600         new FederateStarter(port, softwareProperties, startPort, endPort, true);
601     }
602 
603 }