View Javadoc
1   package org.sim0mq.demo.mm1;
2   
3   import java.util.ArrayList;
4   import java.util.Collections;
5   import java.util.HashMap;
6   import java.util.LinkedHashMap;
7   import java.util.List;
8   import java.util.Map;
9   import java.util.UUID;
10  import java.util.concurrent.atomic.AtomicInteger;
11  import java.util.concurrent.atomic.AtomicLong;
12  
13  import org.djutils.serialization.SerializationException;
14  import org.sim0mq.Sim0MQException;
15  import org.sim0mq.federationmanager.ModelState;
16  import org.sim0mq.message.Sim0MQMessage;
17  import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
18  import org.sim0mq.message.federatestarter.FS4FederateKilledMessage;
19  import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
20  import org.sim0mq.message.federationmanager.FM2SimRunControlMessage;
21  import org.sim0mq.message.federationmanager.FM3SetParameterMessage;
22  import org.sim0mq.message.federationmanager.FM4SimStartMessage;
23  import org.sim0mq.message.federationmanager.FM5RequestStatus;
24  import org.sim0mq.message.federationmanager.FM6RequestStatisticsMessage;
25  import org.sim0mq.message.federationmanager.FM8KillFederateMessage;
26  import org.sim0mq.message.modelcontroller.MC1StatusMessage;
27  import org.sim0mq.message.modelcontroller.MC2AckNakMessage;
28  import org.sim0mq.message.modelcontroller.MC3StatisticsMessage;
29  import org.sim0mq.message.modelcontroller.MC4StatisticsErrorMessage;
30  import org.zeromq.SocketType;
31  import org.zeromq.ZContext;
32  import org.zeromq.ZMQ;
33  
34  /**
35   * Example implementation of a FederationManager to start the MM1Queue41Application DSOL model.
36   * <p>
37   * Copyright (c) 2016-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
38   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
39   * </p>
40   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
41   * initial version April 10, 2017 <br>
42   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
43   */
44  public final class MM1FederationManager20
45  {
46      /**
47       * @param args parameters for main
48       * @throws Sim0MQException on error
49       */
50      public static void main(final String[] args) throws Sim0MQException
51      {
52          if (args.length < 5)
53          {
54              System.err.println("Use as FederationManager federationName federationManagerPortNumber "
55                      + "federateStarterIPorName federateStarterPortNumber modelFolder");
56              System.exit(-1);
57          }
58          String federationName = args[0];
59  
60          String fmsPort = args[1];
61          int fmPort = 0;
62          try
63          {
64              fmPort = Integer.parseInt(fmsPort);
65          }
66          catch (NumberFormatException nfe)
67          {
68              System.err.println("Use as FederationManager fedName fmPort fsIP fsPort modelFolder, where fmPort is a number");
69              System.exit(-1);
70          }
71          if (fmPort == 0 || fmPort > 65535)
72          {
73              System.err.println("fmPort should be between 1 and 65535");
74              System.exit(-1);
75          }
76  
77          String fsServerNameOrIP = args[2];
78  
79          String fsPortString = args[3];
80          int fsPort = 0;
81          try
82          {
83              fsPort = Integer.parseInt(fsPortString);
84          }
85          catch (NumberFormatException nfe)
86          {
87              System.err.println("Use as FederationManager fedName fmPort fsIP fsPort modelFolder, where fmPort is a number");
88              System.exit(-1);
89          }
90          if (fsPort == 0 || fsPort > 65535)
91          {
92              System.err.println("fsPort should be between 1 and 65535");
93              System.exit(-1);
94          }
95  
96          String mm1ModelFolder = args[4];
97  
98          new MM1FederationManager20(federationName, fmPort, fsServerNameOrIP, fsPort, mm1ModelFolder);
99      }
100 
101     /**
102      * Send an FM.1 message to the FederateStarter.
103      * @param federationName the name of the federation
104      * @param fmPort the port number to listen on
105      * @param fsServerNameOrIP name or IP address of the federate starter we are using
106      * @param fsPort the port where the federate starter can be reached
107      * @param mm1ModelFolder location on the computer of the federate starter where the model can be found
108      * @throws Sim0MQException on error
109      */
110     private MM1FederationManager20(final String federationName, final int fmPort, final String fsServerNameOrIP,
111             final int fsPort, final String mm1ModelFolder) throws Sim0MQException
112     {
113         AtomicLong messageCount = new AtomicLong(0L);
114         AtomicInteger nrRunning = new AtomicInteger();
115 
116         Map<Integer, Map<String, Number>> statMap =
117                 Collections.synchronizedMap(new LinkedHashMap<Integer, Map<String, Number>>());
118 
119         for (int modelNr = 0; modelNr < 20; modelNr++)
120         {
121             new Thread()
122             {
123                 @Override
124                 public void run()
125                 {
126                     final int nr = nrRunning.getAndIncrement();
127                     StateMachine stateMachine = null;
128                     System.out.println("inc modelNr to " + nr);
129                     try
130                     {
131                         stateMachine =
132                                 new StateMachine(messageCount, federationName, fsServerNameOrIP, fsPort, mm1ModelFolder, nr);
133                     }
134                     catch (Sim0MQException | SerializationException exception)
135                     {
136                         exception.printStackTrace();
137                     }
138                     int decNr = nrRunning.decrementAndGet();
139                     System.out.println("dec modelNr to " + decNr);
140                     synchronized (statMap)
141                     {
142                         statMap.put(nr, stateMachine.getStatistics());
143                     }
144                 }
145             }.start();
146         }
147 
148         while (nrRunning.get() > 0)
149         {
150             Thread.yield();
151         }
152 
153         synchronized (statMap)
154         {
155             for (int nr : statMap.keySet())
156             {
157                 Map<String, Number> stats = statMap.get(nr);
158                 StringBuilder s = new StringBuilder();
159                 s.append(String.format("%2d  ", nr));
160                 for (String code : stats.keySet())
161                 {
162                     s.append(String.format("%10s=%10.4f   ", code, stats.get(code).doubleValue()));
163                 }
164                 System.out.println(s.toString());
165             }
166         }
167     }
168 
169     /**
170      * State machine to run several models in parallel.
171      * <p>
172      * Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
173      * <br>
174      * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
175      * </p>
176      * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
177      * initial version May 5, 2017 <br>
178      * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
179      */
180     static class StateMachine
181     {
182         /** the state of the started model. */
183         private ModelState state;
184 
185         /** the model socket. */
186         private ZMQ.Socket modelSocket;
187 
188         /** the model name. */
189         private String modelName;
190 
191         /** the federate starter socket. */
192         private ZMQ.Socket fsSocket;
193 
194         /** the context. */
195         private ZContext fmContext;
196 
197         /** the message counter. */
198         private AtomicLong messageCount;
199 
200         /** statistics. */
201         private Map<String, Number> statistics = new LinkedHashMap<>();
202 
203         /**
204          * @param messageCount AtomicLong; message counter
205          * @param federationName the name of the federation
206          * @param fsServerNameOrIP name or IP address of the federate starter we are using
207          * @param fsPort the port where the federate starter can be reached
208          * @param mm1ModelFolder location on the computer of the federate starter where the model can be found
209          * @param modelNr sequence number of the model to run
210          * @throws Sim0MQException on error
211          * @throws SerializationException on serialization problem
212          */
213         StateMachine(final AtomicLong messageCount, final String federationName, final String fsServerNameOrIP,
214                 final int fsPort, final String mm1ModelFolder, final int modelNr) throws Sim0MQException, SerializationException
215         {
216             this.fmContext = new ZContext(1);
217 
218             this.fsSocket = this.fmContext.createSocket(SocketType.REQ);
219             this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
220 
221             this.modelName = "MM1." + modelNr;
222             this.messageCount = messageCount;
223 
224             this.modelSocket = this.fmContext.createSocket(SocketType.REQ);
225             this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
226 
227             this.state = ModelState.NOT_STARTED;
228             boolean ready = false;
229             while (!ready)
230             {
231                 System.out.println(this.state);
232 
233                 switch (this.state)
234                 {
235                     case NOT_STARTED:
236                         // federationName, fsServerNameOrIP, fsPort, mm1ModelFolder
237                         startModel(federationName, fsServerNameOrIP, fsPort, mm1ModelFolder);
238                         break;
239 
240                     case STARTED:
241                         sendSimRunControl(federationName);
242                         break;
243 
244                     case RUNCONTROL:
245                         setParameters(federationName);
246                         break;
247 
248                     case PARAMETERS:
249                         sendSimStart(federationName);
250                         break;
251 
252                     case SIMULATORSTARTED:
253                         waitForSimEnded(federationName);
254                         break;
255 
256                     case SIMULATORENDED:
257                         requestStatistics(federationName);
258                         break;
259 
260                     case STATISTICSGATHERED:
261                         killFederate(federationName);
262                         ready = true;
263                         break;
264 
265                     case ERROR:
266                         killFederate(federationName);
267                         ready = true;
268                         break;
269 
270                     default:
271                         break;
272                 }
273             }
274 
275             this.fsSocket.close();
276             this.modelSocket.close();
277             this.fmContext.destroy();
278             this.fmContext.close();
279         }
280 
281         /**
282          * Send the FM.1 message to the FederateStarter to start the MM1 model.
283          * @param federationName the name of the federation
284          * @param fsServerNameOrIP name or IP address of the federate starter we are using
285          * @param fsPort the port where the federate starter can be reached
286          * @param mm1ModelFolder location on the computer of the federate starter where the model can be found
287          * @throws Sim0MQException on error
288          * @throws SerializationException on serialization problem
289          */
290         private void startModel(final String federationName, final String fsServerNameOrIP, final int fsPort,
291                 final String mm1ModelFolder) throws Sim0MQException, SerializationException
292         {
293             // Start model mmm1.jar
294             byte[] fm1Message = new FM1StartFederateMessage(federationName, "FM", "FS", this.messageCount.getAndIncrement(),
295                     this.modelName, "java8+", "-jar", mm1ModelFolder + "/mm1.jar", this.modelName + " %PORT%", mm1ModelFolder,
296                     "", mm1ModelFolder + "/out_" + this.modelName + ".txt", mm1ModelFolder + "/err_" + this.modelName + ".txt",
297                     false, true, true).createByteArray();
298             this.fsSocket.connect("tcp://" + fsServerNameOrIP + ":" + fsPort);
299             this.fsSocket.send(fm1Message);
300 
301             byte[] reply = this.fsSocket.recv(0);
302 
303             try
304             {
305                 Object[] fs2Fields = Sim0MQMessage.decode(reply).createObjectArray();
306                 System.out.println("Received\n" + Sim0MQMessage.print(fs2Fields));
307                 FS2FederateStartedMessage message = new FS2FederateStartedMessage(fs2Fields);
308 
309                 if (message.getStatus().toString().equals("started") && message.getInstanceId().equals(this.modelName))
310                 {
311                     this.state = ModelState.STARTED;
312                     this.modelSocket.connect("tcp://" + fsServerNameOrIP + ":" + message.getModelPort());
313                 }
314                 else
315                 {
316                     this.state = ModelState.ERROR;
317                     System.err.println("Model not started correctly -- state = " + message.getStatus());
318                     System.err.println("Started model = " + message.getInstanceId() + " on port " + message.getModelPort());
319                     System.err.println("Error message = " + message.getError());
320                 }
321             }
322             catch (Exception exception)
323             {
324                 this.state = ModelState.ERROR;
325                 System.err.println("Model not started correctly -- error = " + exception.getClass().getSimpleName());
326                 System.err.println("Started instance of model = " + this.modelName);
327                 System.err.println("Error message = " + exception.getMessage());
328             }
329         }
330 
331         /**
332          * Send the SimRunControl message FM.2.
333          * @param federationName the name of the federation
334          * @throws Sim0MQException on error
335          * @throws SerializationException on serialization problem
336          */
337         private void sendSimRunControl(final String federationName) throws Sim0MQException, SerializationException
338         {
339             long messageNumber = this.messageCount.get();
340             byte[] fm2Message = new FM2SimRunControlMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement(),
341                     100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0, new HashMap<Object, Long>()).createByteArray();
342             this.modelSocket.send(fm2Message);
343 
344             byte[] reply = this.modelSocket.recv(0);
345             try
346             {
347                 Object[] mc2Fields = Sim0MQMessage.decode(reply).createObjectArray();
348                 System.out.println("Received\n" + Sim0MQMessage.print(mc2Fields));
349                 MC2AckNakMessage message = new MC2AckNakMessage(mc2Fields);
350                 if (message.getStatus() && (Long) message.getReplyToId() == messageNumber)
351                 {
352                     this.state = ModelState.RUNCONTROL;
353                 }
354                 else
355                 {
356                     this.state = ModelState.ERROR;
357                     System.err.println("Model not started correctly -- state = " + message.getStatus());
358                     System.err.println("Error message = " + message.getError());
359                 }
360             }
361             catch (Exception exception)
362             {
363                 this.state = ModelState.ERROR;
364                 System.err.println("Model not started correctly -- error = " + exception.getClass().getSimpleName());
365                 System.err.println("Error message = " + exception.getMessage());
366             }
367         }
368 
369         /**
370          * Send the Parameters messages FM.3.
371          * @param federationName the name of the federation
372          * @throws Sim0MQException on error
373          * @throws SerializationException on serialization problem
374          */
375         private void setParameters(final String federationName) throws Sim0MQException, SerializationException
376         {
377             Map<String, Object> parameters = new LinkedHashMap<>();
378             parameters.put("iat", new Double(1.0));
379             parameters.put("servicetime", new Double(0.85));
380             parameters.put("seed", Math.abs(this.modelName.hashCode()));
381 
382             for (String parameterName : parameters.keySet())
383             {
384                 if (!this.state.isError())
385                 {
386                     long messageNumber = this.messageCount.get();
387                     byte[] fm3Message =
388                             new FM3SetParameterMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement(),
389                                     parameterName, parameters.get(parameterName)).createByteArray();
390                     this.modelSocket.send(fm3Message);
391 
392                     byte[] reply = this.modelSocket.recv(0);
393 
394                     try
395                     {
396                         Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
397                         System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
398                         MC2AckNakMessage message = new MC2AckNakMessage(replyFields);
399                         if (message.getStatus() && (Long) message.getReplyToId() == messageNumber)
400                         {
401                             this.state = ModelState.PARAMETERS;
402                         }
403                         else
404                         {
405                             this.state = ModelState.ERROR;
406                             System.err.println("Model parameter error -- status = " + message.getStatus());
407                             System.err.println("Error message = " + message.getError());
408                         }
409                     }
410                     catch (Exception exception)
411                     {
412                         this.state = ModelState.ERROR;
413                         System.err.println("Model parameter error = " + exception.getClass().getSimpleName());
414                         System.err.println("Error message = " + exception.getMessage());
415                     }
416                 }
417             }
418             if (!this.state.isError())
419             {
420                 this.state = ModelState.PARAMETERS;
421             }
422         }
423 
424         /**
425          * Send the SimStart message FM.4.
426          * @param federationName the name of the federation
427          * @throws Sim0MQException on error
428          * @throws SerializationException on serialization problem
429          */
430         private void sendSimStart(final String federationName) throws Sim0MQException, SerializationException
431         {
432             long messageNumber = this.messageCount.get();
433             byte[] fm4Message =
434                     new FM4SimStartMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement()).createByteArray();
435             this.modelSocket.send(fm4Message);
436 
437             byte[] reply = this.modelSocket.recv(0);
438 
439             try
440             {
441                 Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
442                 System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
443                 MC2AckNakMessage message = new MC2AckNakMessage(replyFields);
444                 if (message.getStatus() && (Long) message.getReplyToId() == messageNumber)
445                 {
446                     this.state = ModelState.SIMULATORSTARTED;
447                 }
448                 else
449                 {
450                     this.state = ModelState.ERROR;
451                     System.err.println("Model start error -- status = " + message.getStatus());
452                     System.err.println("Error message = " + message.getError());
453                 }
454             }
455             catch (Exception exception)
456             {
457                 this.state = ModelState.ERROR;
458                 System.err.println("Model start error = " + exception.getClass().getSimpleName());
459                 System.err.println("Error message = " + exception.getMessage());
460             }
461         }
462 
463         /**
464          * Wait for simulation to end using status polling with message FM.5.
465          * @param federationName the name of the federation
466          * @throws Sim0MQException on error
467          * @throws SerializationException on serialization problem
468          */
469         private void waitForSimEnded(final String federationName) throws Sim0MQException, SerializationException
470         {
471             while (!this.state.isSimulatorEnded() && !this.state.isError())
472             {
473                 long messageNumber = this.messageCount.get();
474                 byte[] fm5Message =
475                         new FM5RequestStatus(federationName, "FM", this.modelName, this.messageCount.getAndIncrement()).createByteArray();
476                 this.modelSocket.send(fm5Message);
477 
478                 byte[] reply = this.modelSocket.recv(0);
479 
480                 try
481                 {
482                     Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
483                     System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
484                     MC1StatusMessage message = new MC1StatusMessage(replyFields);
485                     if (!message.getStatus().equals("error") && !message.getStatus().equals("started")
486                             && (Long) message.getReplyToId() == messageNumber)
487                     {
488                         if (message.getStatus().equals("ended"))
489                         {
490                             this.state = ModelState.SIMULATORENDED;
491                         }
492                         else
493                         {
494                             // wait a second
495                             try
496                             {
497                                 Thread.sleep(1000);
498                             }
499                             catch (InterruptedException ie)
500                             {
501                                 // ignore
502                             }
503                         }
504                     }
505                     else
506                     {
507                         this.state = ModelState.ERROR;
508                         System.err.println("Model poll status error -- status = " + message.getStatus());
509                         System.err.println("Error message = " + message.getError());
510                     }
511                 }
512                 catch (Exception exception)
513                 {
514                     this.state = ModelState.ERROR;
515                     System.err.println("Model poll status error = " + exception.getClass().getSimpleName());
516                     System.err.println("Error message = " + exception.getMessage());
517                 }
518             }
519         }
520 
521         /**
522          * Request statistics with message FM.6.
523          * @param federationName the name of the federation
524          * @throws Sim0MQException on error
525          * @throws SerializationException on serialization problem
526          */
527         private void requestStatistics(final String federationName) throws Sim0MQException, SerializationException
528         {
529             List<String> stats = new ArrayList<>();
530             stats.add("dN.average");
531             stats.add("qN.max");
532             stats.add("uN.average");
533 
534             for (String statName : stats)
535             {
536                 if (!this.state.isError())
537                 {
538                     byte[] fm6Message = new FM6RequestStatisticsMessage(federationName, "FM", this.modelName,
539                             this.messageCount.getAndIncrement(), statName).createByteArray();
540                     this.modelSocket.send(fm6Message);
541 
542                     byte[] reply = this.modelSocket.recv(0);
543 
544                     try
545                     {
546                         Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
547                         System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
548 
549                         if (replyFields[5].toString().equals("MC.3"))
550                         {
551                             MC3StatisticsMessage message = new MC3StatisticsMessage(replyFields);
552                             if (message.getVariableName().equals(statName))
553                             {
554                                 System.out.println(
555                                         "Received statistic for " + statName + " = " + message.getVariableValue().toString());
556                                 this.statistics.put(statName, (Number) message.getVariableValue());
557                             }
558                             else
559                             {
560                                 this.state = ModelState.ERROR;
561                                 System.err.println("Statistics Error: Stat variable expected = " + statName + ", got: "
562                                         + message.getVariableName());
563                             }
564                         }
565 
566                         else if (replyFields[5].toString().equals("MC.4"))
567                         {
568                             MC4StatisticsErrorMessage message = new MC4StatisticsErrorMessage(replyFields);
569                             this.state = ModelState.ERROR;
570                             System.err.println("Statistics Error: Stat variable = " + message.getVariableName());
571                             System.err.println("Error message = " + message.getError());
572                         }
573 
574                     }
575                     catch (Exception exception)
576                     {
577                         this.state = ModelState.ERROR;
578                         System.err.println("Model get statistics error = " + exception.getClass().getSimpleName());
579                         System.err.println("Error message = " + exception.getMessage());
580                     }
581                 }
582             }
583             if (!this.state.isError())
584             {
585                 this.state = ModelState.STATISTICSGATHERED;
586             }
587         }
588 
589         /**
590          * Send the FM.8 message to the FederateStarter to kill the MM1 model.
591          * @param federationName the name of the federation
592          * @throws Sim0MQException on error
593          * @throws SerializationException on serialization problem
594          */
595         private void killFederate(final String federationName) throws Sim0MQException, SerializationException
596         {
597             byte[] fm8Message =
598                     new FM8KillFederateMessage(federationName, "FM", "FS", this.messageCount.getAndIncrement(), this.modelName)
599                             .createByteArray();
600             this.fsSocket.send(fm8Message);
601 
602             byte[] reply = this.fsSocket.recv(0);
603 
604             try
605             {
606                 Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
607                 System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
608                 FS4FederateKilledMessage message = new FS4FederateKilledMessage(replyFields);
609                 if (message.isStatus() && message.getInstanceId().equals(this.modelName))
610                 {
611                     this.state = ModelState.TERMINATED;
612                 }
613                 else
614                 {
615                     this.state = ModelState.ERROR;
616                     System.err.println("Model not killed correctly, model = " + this.modelName);
617                     System.err.println("Error message = " + message.getError());
618                 }
619             }
620             catch (Exception exception)
621             {
622                 this.state = ModelState.ERROR;
623                 System.err.println("Model not killed correctly, error = " + exception.getClass().getSimpleName());
624                 System.err.println("Error message = " + exception.getMessage());
625             }
626         }
627 
628         /**
629          * @return statistics
630          */
631         public final Map<String, Number> getStatistics()
632         {
633             return this.statistics;
634         }
635 
636     }
637 
638 }