View Javadoc
1   package org.sim0mq.demo.mm1;
2   
3   import java.util.ArrayList;
4   import java.util.Collections;
5   import java.util.HashMap;
6   import java.util.List;
7   import java.util.Map;
8   import java.util.UUID;
9   import java.util.concurrent.atomic.AtomicInteger;
10  import java.util.concurrent.atomic.AtomicLong;
11  
12  import org.sim0mq.Sim0MQException;
13  import org.sim0mq.federationmanager.ModelState;
14  import org.sim0mq.message.MessageStatus;
15  import org.sim0mq.message.SimulationMessage;
16  import org.zeromq.ZMQ;
17  
18  /**
19   * Example implementation of a FederationManager to start the MM1Queue41Application DSOL model.
20   * <p>
21   * Copyright (c) 2016-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
22   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
23   * </p>
24   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
25   * initial version April 10, 2017 <br>
26   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
27   */
28  public class MM1FederationManager20
29  {
30      /**
31       * @param args parameters for main
32       * @throws Sim0MQException on error
33       */
34      public static void main(String[] args) throws Sim0MQException
35      {
36          if (args.length < 4)
37          {
38              System.err.println(
39                      "Use as FederationManager federationName federationManagerPortNumber federateStarterPortNumber local/sk-3");
40              System.exit(-1);
41          }
42          String federationName = args[0];
43  
44          String fmsPort = args[1];
45          int fmPort = 0;
46          try
47          {
48              fmPort = Integer.parseInt(fmsPort);
49          }
50          catch (NumberFormatException nfe)
51          {
52              System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk-3, where fmPort is a number");
53              System.exit(-1);
54          }
55          if (fmPort == 0 || fmPort > 65535)
56          {
57              System.err.println("fmPort should be between 1 and 65535");
58              System.exit(-1);
59          }
60  
61          String fssPort = args[2];
62          int fsPort = 0;
63          try
64          {
65              fsPort = Integer.parseInt(fssPort);
66          }
67          catch (NumberFormatException nfe)
68          {
69              System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where fsPort is a number");
70              System.exit(-1);
71          }
72          if (fsPort == 0 || fsPort > 65535)
73          {
74              System.err.println("fsPort should be between 1 and 65535");
75              System.exit(-1);
76          }
77  
78          String localSk3 = args[3];
79          if (!localSk3.equals("local") && !localSk3.equals("sk-3"))
80          {
81              System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where last arg is local/sk-3");
82              System.exit(-1);
83          }
84  
85          new MM1FederationManager20(federationName, fmPort, fsPort, localSk3);
86      }
87  
88      /**
89       * Send an FM.1 message to the FederateStarter.
90       * @param federationName the name of the federation
91       * @param fmPort the port number to listen on
92       * @param fsPort the port where the federate starter can be reached
93       * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
94       * @throws Sim0MQException on error
95       */
96      public MM1FederationManager20(final String federationName, final int fmPort, final int fsPort, final String localSk3)
97              throws Sim0MQException
98      {
99          AtomicLong messageCount = new AtomicLong(0L);
100         AtomicInteger nrRunning = new AtomicInteger();
101 
102         Map<Integer, Map<String, Number>> statMap = Collections.synchronizedMap(new HashMap<Integer, Map<String, Number>>());
103 
104         for (int modelNr = 0; modelNr < 20; modelNr++)
105         {
106             new Thread()
107             {
108                 @Override
109                 public void run()
110                 {
111                     final int nr = nrRunning.getAndIncrement();
112                     StateMachine stateMachine = null;
113                     System.out.println("inc modelNr to " + nrRunning.get());
114                     try
115                     {
116                         stateMachine = new StateMachine(messageCount, federationName, fsPort, localSk3, nr);
117                     }
118                     catch (Sim0MQException exception)
119                     {
120                         exception.printStackTrace();
121                     }
122                     nrRunning.decrementAndGet();
123                     System.out.println("dec modelNr to " + nrRunning.get());
124                     statMap.put(nr, stateMachine.getStatistics());
125                 }
126             }.start();
127         }
128 
129         while (nrRunning.get() > 0)
130         {
131             Thread.yield();
132         }
133 
134         for (int nr : statMap.keySet())
135         {
136             Map<String, Number> stats = statMap.get(nr);
137             StringBuilder s = new StringBuilder();
138             s.append(String.format("%2d  ", nr));
139             for (String code : stats.keySet())
140             {
141                 s.append(String.format("%10s=%10.4f   ", code, stats.get(code).doubleValue()));
142             }
143             System.out.println(s.toString());
144         }
145     }
146 
147     /**
148      * State machine to run several models in parallel.
149      * <p>
150      * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
151      * <br>
152      * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
153      * </p>
154      * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
155      * initial version May 5, 2017 <br>
156      * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
157      */
158     static class StateMachine
159     {
160         /** the state of the started model. */
161         private ModelState state;
162 
163         /** the model socket. */
164         private ZMQ.Socket modelSocket;
165 
166         /** the model name. */
167         private String modelName;
168 
169         /** the federate starter socket. */
170         protected ZMQ.Socket fsSocket;
171 
172         /** the context. */
173         private ZMQ.Context fmContext;
174 
175         /** the message counter. */
176         private AtomicLong messageCount;
177 
178         /** statistics. */
179         private Map<String, Number> statistics = new HashMap<>();
180 
181         /**
182          * @param messageCount AtomicLong; message counter
183          * @param federationName the name of the federation
184          * @param fsPort FederateStarter port number
185          * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
186          * @param modelNr sequence number of the model to run
187          * @throws Sim0MQException on error
188          */
189         StateMachine(final AtomicLong messageCount, final String federationName, final int fsPort,
190                 final String localSk3, final int modelNr) throws Sim0MQException
191         {
192             this.fmContext = ZMQ.context(1);
193 
194             this.fsSocket = this.fmContext.socket(ZMQ.REQ);
195             this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
196 
197             this.modelName = "MM1." + modelNr;
198             this.messageCount = messageCount;
199 
200             this.modelSocket = this.fmContext.socket(ZMQ.REQ);
201             this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
202 
203             this.state = ModelState.NOT_STARTED;
204             boolean ready = false;
205             while (!ready)
206             {
207                 System.out.println(this.state);
208 
209                 switch (this.state)
210                 {
211                     case NOT_STARTED:
212                         startModel(federationName, fsPort, localSk3);
213                         break;
214 
215                     case STARTED:
216                         sendSimRunControl(federationName);
217                         break;
218 
219                     case RUNCONTROL:
220                         setParameters(federationName);
221                         break;
222 
223                     case PARAMETERS:
224                         sendSimStart(federationName);
225                         break;
226 
227                     case SIMULATORSTARTED:
228                         waitForSimEnded(federationName);
229                         break;
230 
231                     case SIMULATORENDED:
232                         requestStatistics(federationName);
233                         break;
234 
235                     case STATISTICSGATHERED:
236                         killFederate(federationName);
237                         ready = true;
238                         break;
239 
240                     case ERROR:
241                         killFederate(federationName);
242                         ready = true;
243                         break;
244 
245                     default:
246                         break;
247                 }
248             }
249 
250             this.fsSocket.close();
251             this.modelSocket.close();
252             this.fmContext.term();
253         }
254 
255         /**
256          * Sed the FM.1 message to the FederateStarter to start the MM1 model.
257          * @param federationName the name of the federation
258          * @param fsPort the port where the federate starter can be reached
259          * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
260          * @throws Sim0MQException on error
261          */
262         private void startModel(final String federationName, final int fsPort, final String localSk3) throws Sim0MQException
263         {
264             // Start model mmm1.jar
265             byte[] fm1Message;
266             if (localSk3.equals("sk-3"))
267             {
268                 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", this.messageCount.getAndIncrement(),
269                         MessageStatus.NEW, this.modelName, "java8+", "-jar", "/home/alexandv/sim0mq/MM1/mm1.jar",
270                         this.modelName + " %PORT%", "/home/alexandv/sim0mq/MM1", "",
271                         "/home/alexandv/sim0mq/MM1/out_" + this.modelName + ".txt",
272                         "/home/alexandv/sim0mq/MM1/err_" + this.modelName + ".txt", false, true, true);
273                 this.fsSocket.connect("tcp://130.161.3.179:" + fsPort);
274             }
275             else
276             {
277                 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", this.messageCount.getAndIncrement(),
278                         MessageStatus.NEW, this.modelName, "java8+", "-jar", "e:/MM1/mm1.jar", this.modelName + " %PORT%",
279                         "e:/MM1", "", "e:/MM1/out_" + this.modelName + ".txt", "e:/MM1/err_" + this.modelName + ".txt", false,
280                         true, true);
281                 this.fsSocket.connect("tcp://127.0.0.1:" + fsPort);
282             }
283             this.fsSocket.send(fm1Message);
284 
285             byte[] reply = this.fsSocket.recv(0);
286             Object[] replyMessage = SimulationMessage.decode(reply);
287             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
288 
289             if (replyMessage[4].toString().equals("FS.2") && replyMessage[9].toString().equals("started")
290                     && replyMessage[8].toString().equals(this.modelName))
291             {
292                 this.state = ModelState.STARTED;
293                 this.modelSocket.connect("tcp://127.0.0.1:" + replyMessage[10].toString());
294             }
295             else
296             {
297                 this.state = ModelState.ERROR;
298                 System.err.println("Model not started correctly -- state = " + replyMessage[9]);
299                 System.err.println("Started model = " + replyMessage[8]);
300                 System.err.println("Error message = " + replyMessage[10]);
301             }
302 
303         }
304 
305         /**
306          * Send the SimRunControl message FM.2.
307          * @param federationName the name of the federation
308          * @throws Sim0MQException on error
309          */
310         private void sendSimRunControl(final String federationName) throws Sim0MQException
311         {
312             long messageNumber = this.messageCount.get();
313             byte[] fm2Message;
314             fm2Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.2",
315                     this.messageCount.getAndIncrement(), MessageStatus.NEW, 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0);
316             this.modelSocket.send(fm2Message);
317 
318             byte[] reply = this.modelSocket.recv(0);
319             Object[] replyMessage = SimulationMessage.decode(reply);
320             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
321 
322             if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
323                     && ((Long) replyMessage[8]).longValue() == messageNumber)
324             {
325                 this.state = ModelState.RUNCONTROL;
326             }
327             else
328             {
329                 this.state = ModelState.ERROR;
330                 System.err.println("Model not started correctly -- status = " + replyMessage[9]);
331                 System.err.println("Error message = " + replyMessage[10]);
332             }
333         }
334 
335         /**
336          * Send the Parameters messages FM.3.
337          * @param federationName the name of the federation
338          * @throws Sim0MQException on error
339          */
340         private void setParameters(final String federationName) throws Sim0MQException
341         {
342             Map<String, Object> parameters = new HashMap<>();
343             parameters.put("iat", new Double(1.0));
344             parameters.put("servicetime", new Double(0.85));
345             parameters.put("seed", Math.abs(this.modelName.hashCode()));
346 
347             for (String parameterName : parameters.keySet())
348             {
349                 if (!this.state.isError())
350                 {
351                     long messageNumber = this.messageCount.get();
352                     byte[] fm3Message;
353                     fm3Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.3",
354                             this.messageCount.getAndIncrement(), MessageStatus.NEW, parameterName,
355                             parameters.get(parameterName));
356                     this.modelSocket.send(fm3Message);
357 
358                     byte[] reply = this.modelSocket.recv(0);
359                     Object[] replyMessage = SimulationMessage.decode(reply);
360                     System.out.println("Received\n" + SimulationMessage.print(replyMessage));
361 
362                     if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
363                             && ((Long) replyMessage[8]).longValue() == messageNumber)
364                     {
365                         this.state = ModelState.PARAMETERS;
366                     }
367                     else
368                     {
369                         this.state = ModelState.ERROR;
370                         System.err.println("Model parameter error -- status = " + replyMessage[9]);
371                         System.err.println("Error message = " + replyMessage[10]);
372                     }
373                 }
374             }
375             if (!this.state.isError())
376             {
377                 this.state = ModelState.PARAMETERS;
378             }
379         }
380 
381         /**
382          * Send the SimStart message FM.4.
383          * @param federationName the name of the federation
384          * @throws Sim0MQException on error
385          */
386         private void sendSimStart(final String federationName) throws Sim0MQException
387         {
388             long messageNumber = this.messageCount.get();
389             byte[] fm4Message;
390             fm4Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.4",
391                     this.messageCount.getAndIncrement(), MessageStatus.NEW);
392             this.modelSocket.send(fm4Message);
393 
394             byte[] reply = this.modelSocket.recv(0);
395             Object[] replyMessage = SimulationMessage.decode(reply);
396             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
397 
398             if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
399                     && ((Long) replyMessage[8]).longValue() == messageNumber)
400             {
401                 this.state = ModelState.SIMULATORSTARTED;
402             }
403             else
404             {
405                 this.state = ModelState.ERROR;
406                 System.err.println("Simulation start error -- status = " + replyMessage[9]);
407                 System.err.println("Error message = " + replyMessage[10]);
408             }
409         }
410 
411         /**
412          * Wait for simulation to end using status polling with message FM.5.
413          * @param federationName the name of the federation
414          * @throws Sim0MQException on error
415          */
416         private void waitForSimEnded(final String federationName) throws Sim0MQException
417         {
418             while (!this.state.isSimulatorEnded() && !this.state.isError())
419             {
420                 long messageNumber = this.messageCount.get();
421                 byte[] fm5Message;
422                 fm5Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.5",
423                         this.messageCount.getAndIncrement(), MessageStatus.NEW);
424                 this.modelSocket.send(fm5Message);
425 
426                 byte[] reply = this.modelSocket.recv(0);
427                 Object[] replyMessage = SimulationMessage.decode(reply);
428                 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
429 
430                 if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
431                         && !replyMessage[9].toString().equals("started")
432                         && ((Long) replyMessage[8]).longValue() == messageNumber)
433                 {
434                     if (replyMessage[9].toString().equals("ended"))
435                     {
436                         this.state = ModelState.SIMULATORENDED;
437                     }
438                     else
439                     {
440                         // wait a second
441                         try
442                         {
443                             Thread.sleep(1000);
444                         }
445                         catch (InterruptedException ie)
446                         {
447                             // ignore
448                         }
449                     }
450                 }
451                 else
452                 {
453                     this.state = ModelState.ERROR;
454                     System.err.println("Simulation start error -- status = " + replyMessage[9]);
455                     System.err.println("Error message = " + replyMessage[10]);
456                 }
457             }
458         }
459 
460         /**
461          * Request statistics with message FM.6.
462          * @param federationName the name of the federation
463          * @throws Sim0MQException on error
464          */
465         private void requestStatistics(final String federationName) throws Sim0MQException
466         {
467             List<String> stats = new ArrayList<>();
468             stats.add("dN.average");
469             stats.add("qN.max");
470             stats.add("uN.average");
471 
472             for (String statName : stats)
473             {
474                 if (!this.state.isError())
475                 {
476                     byte[] fm6Message;
477                     fm6Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.6",
478                             this.messageCount.getAndIncrement(), MessageStatus.NEW, statName);
479                     this.modelSocket.send(fm6Message);
480 
481                     byte[] reply = this.modelSocket.recv(0);
482                     Object[] replyMessage = SimulationMessage.decode(reply);
483                     System.out.println("Received\n" + SimulationMessage.print(replyMessage));
484 
485                     if (replyMessage[4].toString().equals("MC.3"))
486                     {
487                         if (replyMessage[9].toString().equals(statName))
488                         {
489                             System.out.println("Received statistic for " + statName + " = " + replyMessage[10].toString());
490                             this.statistics.put(statName, (Number) replyMessage[10]);
491                         }
492                         else
493                         {
494                             this.state = ModelState.ERROR;
495                             System.err.println(
496                                     "Statistics Error: Stat variable expected = " + statName + ", got: " + replyMessage[8]);
497                         }
498                     }
499                     else if (replyMessage[4].toString().equals("MC.4"))
500                     {
501                         this.state = ModelState.ERROR;
502                         System.err.println("Statistics Error: Stat variable = " + replyMessage[8]);
503                         System.err.println("Error message = " + replyMessage[9]);
504                     }
505                     else
506                     {
507                         this.state = ModelState.ERROR;
508                         System.err.println("Statistics Error: Received unknown message as reply to FM6: " + replyMessage[4]);
509                     }
510                 }
511             }
512             if (!this.state.isError())
513             {
514                 this.state = ModelState.STATISTICSGATHERED;
515             }
516         }
517 
518         /**
519          * Send the FM.8 message to the FederateStarter to kill the MM1 model.
520          * @param federationName the name of the federation
521          * @throws Sim0MQException on error
522          */
523         private void killFederate(final String federationName) throws Sim0MQException
524         {
525             byte[] fm8Message;
526             fm8Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.8", this.messageCount.getAndIncrement(),
527                     MessageStatus.NEW, this.modelName);
528             this.fsSocket.send(fm8Message);
529 
530             byte[] reply = this.fsSocket.recv(0);
531             Object[] replyMessage = SimulationMessage.decode(reply);
532             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
533 
534             if (replyMessage[4].toString().equals("FS.4") && (boolean) replyMessage[9]
535                     && replyMessage[8].toString().equals(this.modelName))
536             {
537                 this.state = ModelState.TERMINATED;
538             }
539             else
540             {
541                 this.state = ModelState.ERROR;
542                 System.err.println("Model not killed correctly");
543                 System.err.println("Tried to kill model = " + replyMessage[8]);
544                 System.err.println("Error message = " + replyMessage[10]);
545             }
546         }
547 
548         /**
549          * @return statistics
550          */
551         public final Map<String, Number> getStatistics()
552         {
553             return this.statistics;
554         }
555 
556     }
557 
558 }