View Javadoc
1   package org.sim0mq.demo.mm1;
2   
3   import java.util.ArrayList;
4   import java.util.Collections;
5   import java.util.HashMap;
6   import java.util.List;
7   import java.util.Map;
8   import java.util.UUID;
9   import java.util.concurrent.atomic.AtomicInteger;
10  import java.util.concurrent.atomic.AtomicLong;
11  
12  import org.djutils.serialization.SerializationException;
13  import org.sim0mq.Sim0MQException;
14  import org.sim0mq.federationmanager.ModelState;
15  import org.sim0mq.message.MessageStatus;
16  import org.sim0mq.message.SimulationMessage;
17  import org.zeromq.SocketType;
18  import org.zeromq.ZContext;
19  import org.zeromq.ZMQ;
20  
21  /**
22   * Example implementation of a FederationManager to start the MM1Queue41Application DSOL model.
23   * <p>
24   * Copyright (c) 2016-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
25   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
26   * </p>
27   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
28   * initial version April 10, 2017 <br>
29   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
30   */
31  public final class MM1FederationManager20
32  {
33      /**
34       * @param args parameters for main
35       * @throws Sim0MQException on error
36       */
37      public static void main(final String[] args) throws Sim0MQException
38      {
39          if (args.length < 4)
40          {
41              System.err.println(
42                      "Use as FederationManager federationName federationManagerPortNumber federateStarterPortNumber local/sk-3");
43              System.exit(-1);
44          }
45          String federationName = args[0];
46  
47          String fmsPort = args[1];
48          int fmPort = 0;
49          try
50          {
51              fmPort = Integer.parseInt(fmsPort);
52          }
53          catch (NumberFormatException nfe)
54          {
55              System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk-3, where fmPort is a number");
56              System.exit(-1);
57          }
58          if (fmPort == 0 || fmPort > 65535)
59          {
60              System.err.println("fmPort should be between 1 and 65535");
61              System.exit(-1);
62          }
63  
64          String fssPort = args[2];
65          int fsPort = 0;
66          try
67          {
68              fsPort = Integer.parseInt(fssPort);
69          }
70          catch (NumberFormatException nfe)
71          {
72              System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where fsPort is a number");
73              System.exit(-1);
74          }
75          if (fsPort == 0 || fsPort > 65535)
76          {
77              System.err.println("fsPort should be between 1 and 65535");
78              System.exit(-1);
79          }
80  
81          String localSk3 = args[3];
82          if (!localSk3.equals("local") && !localSk3.equals("sk-3"))
83          {
84              System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where last arg is local/sk-3");
85              System.exit(-1);
86          }
87  
88          new MM1FederationManager20(federationName, fmPort, fsPort, localSk3);
89      }
90  
91      /**
92       * Send an FM.1 message to the FederateStarter.
93       * @param federationName the name of the federation
94       * @param fmPort the port number to listen on
95       * @param fsPort the port where the federate starter can be reached
96       * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
97       * @throws Sim0MQException on error
98       */
99      private MM1FederationManager20(final String federationName, final int fmPort, final int fsPort, final String localSk3)
100             throws Sim0MQException
101     {
102         AtomicLong messageCount = new AtomicLong(0L);
103         AtomicInteger nrRunning = new AtomicInteger();
104 
105         Map<Integer, Map<String, Number>> statMap = Collections.synchronizedMap(new HashMap<Integer, Map<String, Number>>());
106 
107         for (int modelNr = 0; modelNr < 20; modelNr++)
108         {
109             new Thread()
110             {
111                 @Override
112                 public void run()
113                 {
114                     final int nr = nrRunning.getAndIncrement();
115                     StateMachine stateMachine = null;
116                     System.out.println("inc modelNr to " + nr);
117                     try
118                     {
119                         stateMachine = new StateMachine(messageCount, federationName, fsPort, localSk3, nr);
120                     }
121                     catch (Sim0MQException | SerializationException exception)
122                     {
123                         exception.printStackTrace();
124                     }
125                     int decNr = nrRunning.decrementAndGet();
126                     System.out.println("dec modelNr to " + decNr);
127                     synchronized (statMap)
128                     {
129                         statMap.put(nr, stateMachine.getStatistics());
130                     }
131                 }
132             }.start();
133         }
134 
135         while (nrRunning.get() > 0)
136         {
137             Thread.yield();
138         }
139 
140         synchronized (statMap)
141         {
142             for (int nr : statMap.keySet())
143             {
144                 Map<String, Number> stats = statMap.get(nr);
145                 StringBuilder s = new StringBuilder();
146                 s.append(String.format("%2d  ", nr));
147                 for (String code : stats.keySet())
148                 {
149                     s.append(String.format("%10s=%10.4f   ", code, stats.get(code).doubleValue()));
150                 }
151                 System.out.println(s.toString());
152             }
153         }
154     }
155 
156     /**
157      * State machine to run several models in parallel.
158      * <p>
159      * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
160      * <br>
161      * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
162      * </p>
163      * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
164      * initial version May 5, 2017 <br>
165      * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
166      */
167     static class StateMachine
168     {
169         /** the state of the started model. */
170         private ModelState state;
171 
172         /** the model socket. */
173         private ZMQ.Socket modelSocket;
174 
175         /** the model name. */
176         private String modelName;
177 
178         /** the federate starter socket. */
179         private ZMQ.Socket fsSocket;
180 
181         /** the context. */
182         private ZContext fmContext;
183 
184         /** the message counter. */
185         private AtomicLong messageCount;
186 
187         /** statistics. */
188         private Map<String, Number> statistics = new HashMap<>();
189 
190         /**
191          * @param messageCount AtomicLong; message counter
192          * @param federationName the name of the federation
193          * @param fsPort FederateStarter port number
194          * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
195          * @param modelNr sequence number of the model to run
196          * @throws Sim0MQException on error
197          * @throws SerializationException on serialization problem
198          */
199         StateMachine(final AtomicLong messageCount, final String federationName, final int fsPort, final String localSk3,
200                 final int modelNr) throws Sim0MQException, SerializationException
201         {
202             this.fmContext = new ZContext(1);
203 
204             this.fsSocket = this.fmContext.createSocket(SocketType.REQ);
205             this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
206 
207             this.modelName = "MM1." + modelNr;
208             this.messageCount = messageCount;
209 
210             this.modelSocket = this.fmContext.createSocket(SocketType.REQ);
211             this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
212 
213             this.state = ModelState.NOT_STARTED;
214             boolean ready = false;
215             while (!ready)
216             {
217                 System.out.println(this.state);
218 
219                 switch (this.state)
220                 {
221                     case NOT_STARTED:
222                         startModel(federationName, fsPort, localSk3);
223                         break;
224 
225                     case STARTED:
226                         sendSimRunControl(federationName);
227                         break;
228 
229                     case RUNCONTROL:
230                         setParameters(federationName);
231                         break;
232 
233                     case PARAMETERS:
234                         sendSimStart(federationName);
235                         break;
236 
237                     case SIMULATORSTARTED:
238                         waitForSimEnded(federationName);
239                         break;
240 
241                     case SIMULATORENDED:
242                         requestStatistics(federationName);
243                         break;
244 
245                     case STATISTICSGATHERED:
246                         killFederate(federationName);
247                         ready = true;
248                         break;
249 
250                     case ERROR:
251                         killFederate(federationName);
252                         ready = true;
253                         break;
254 
255                     default:
256                         break;
257                 }
258             }
259 
260             this.fsSocket.close();
261             this.modelSocket.close();
262             this.fmContext.destroy();
263             this.fmContext.close();
264         }
265 
266         /**
267          * Sed the FM.1 message to the FederateStarter to start the MM1 model.
268          * @param federationName the name of the federation
269          * @param fsPort the port where the federate starter can be reached
270          * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
271          * @throws Sim0MQException on error
272          * @throws SerializationException on serialization problem
273          */
274         private void startModel(final String federationName, final int fsPort, final String localSk3)
275                 throws Sim0MQException, SerializationException
276         {
277             // Start model mmm1.jar
278             byte[] fm1Message;
279             if (localSk3.equals("sk-3"))
280             {
281                 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1",
282                         this.messageCount.getAndIncrement(), MessageStatus.NEW, this.modelName, "java8+", "-jar",
283                         "/home/alexandv/sim0mq/MM1/mm1.jar", this.modelName + " %PORT%", "/home/alexandv/sim0mq/MM1", "",
284                         "/home/alexandv/sim0mq/MM1/out_" + this.modelName + ".txt",
285                         "/home/alexandv/sim0mq/MM1/err_" + this.modelName + ".txt", false, true, true);
286                 this.fsSocket.connect("tcp://130.161.3.179:" + fsPort);
287             }
288             else
289             {
290                 fm1Message =
291                         SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", this.messageCount.getAndIncrement(),
292                                 MessageStatus.NEW, this.modelName, "java8+", "-jar", "e:/sim0mq/MM1/mm1.jar",
293                                 this.modelName + " %PORT%", "e:/sim0mq/MM1", "", "e:/sim0mq/MM1/out_" + this.modelName + ".txt",
294                                 "e:/sim0mq/MM1/err_" + this.modelName + ".txt", false, true, true);
295                 this.fsSocket.connect("tcp://127.0.0.1:" + fsPort);
296             }
297             this.fsSocket.send(fm1Message);
298 
299             byte[] reply = this.fsSocket.recv(0);
300             Object[] replyMessage = SimulationMessage.decode(reply);
301             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
302 
303             if (replyMessage[4].toString().equals("FS.2") && replyMessage[9].toString().equals("started")
304                     && replyMessage[8].toString().equals(this.modelName))
305             {
306                 this.state = ModelState.STARTED;
307                 this.modelSocket.connect("tcp://127.0.0.1:" + replyMessage[10].toString());
308             }
309             else
310             {
311                 this.state = ModelState.ERROR;
312                 System.err.println("Model not started correctly -- state = " + replyMessage[9]);
313                 System.err.println("Started model = " + replyMessage[8]);
314                 System.err.println("Error message = " + replyMessage[10]);
315             }
316 
317         }
318 
319         /**
320          * Send the SimRunControl message FM.2.
321          * @param federationName the name of the federation
322          * @throws Sim0MQException on error
323          * @throws SerializationException on serialization problem
324          */
325         private void sendSimRunControl(final String federationName) throws Sim0MQException, SerializationException
326         {
327             long messageNumber = this.messageCount.get();
328             byte[] fm2Message;
329             fm2Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.2",
330                     this.messageCount.getAndIncrement(), MessageStatus.NEW, 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0);
331             this.modelSocket.send(fm2Message);
332 
333             byte[] reply = this.modelSocket.recv(0);
334             Object[] replyMessage = SimulationMessage.decode(reply);
335             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
336 
337             if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
338                     && ((Long) replyMessage[8]).longValue() == messageNumber)
339             {
340                 this.state = ModelState.RUNCONTROL;
341             }
342             else
343             {
344                 this.state = ModelState.ERROR;
345                 System.err.println("Model not started correctly -- status = " + replyMessage[9]);
346                 System.err.println("Error message = " + replyMessage[10]);
347             }
348         }
349 
350         /**
351          * Send the Parameters messages FM.3.
352          * @param federationName the name of the federation
353          * @throws Sim0MQException on error
354          * @throws SerializationException on serialization problem
355          */
356         private void setParameters(final String federationName) throws Sim0MQException, SerializationException
357         {
358             Map<String, Object> parameters = new HashMap<>();
359             parameters.put("iat", new Double(1.0));
360             parameters.put("servicetime", new Double(0.85));
361             parameters.put("seed", Math.abs(this.modelName.hashCode()));
362 
363             for (String parameterName : parameters.keySet())
364             {
365                 if (!this.state.isError())
366                 {
367                     long messageNumber = this.messageCount.get();
368                     byte[] fm3Message;
369                     fm3Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.3",
370                             this.messageCount.getAndIncrement(), MessageStatus.NEW, parameterName,
371                             parameters.get(parameterName));
372                     this.modelSocket.send(fm3Message);
373 
374                     byte[] reply = this.modelSocket.recv(0);
375                     Object[] replyMessage = SimulationMessage.decode(reply);
376                     System.out.println("Received\n" + SimulationMessage.print(replyMessage));
377 
378                     if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
379                             && ((Long) replyMessage[8]).longValue() == messageNumber)
380                     {
381                         this.state = ModelState.PARAMETERS;
382                     }
383                     else
384                     {
385                         this.state = ModelState.ERROR;
386                         System.err.println("Model parameter error -- status = " + replyMessage[9]);
387                         System.err.println("Error message = " + replyMessage[10]);
388                     }
389                 }
390             }
391             if (!this.state.isError())
392             {
393                 this.state = ModelState.PARAMETERS;
394             }
395         }
396 
397         /**
398          * Send the SimStart message FM.4.
399          * @param federationName the name of the federation
400          * @throws Sim0MQException on error
401          * @throws SerializationException on serialization problem
402          */
403         private void sendSimStart(final String federationName) throws Sim0MQException, SerializationException
404         {
405             long messageNumber = this.messageCount.get();
406             byte[] fm4Message;
407             fm4Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.4",
408                     this.messageCount.getAndIncrement(), MessageStatus.NEW);
409             this.modelSocket.send(fm4Message);
410 
411             byte[] reply = this.modelSocket.recv(0);
412             Object[] replyMessage = SimulationMessage.decode(reply);
413             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
414 
415             if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
416                     && ((Long) replyMessage[8]).longValue() == messageNumber)
417             {
418                 this.state = ModelState.SIMULATORSTARTED;
419             }
420             else
421             {
422                 this.state = ModelState.ERROR;
423                 System.err.println("Simulation start error -- status = " + replyMessage[9]);
424                 System.err.println("Error message = " + replyMessage[10]);
425             }
426         }
427 
428         /**
429          * Wait for simulation to end using status polling with message FM.5.
430          * @param federationName the name of the federation
431          * @throws Sim0MQException on error
432          * @throws SerializationException on serialization problem
433          */
434         private void waitForSimEnded(final String federationName) throws Sim0MQException, SerializationException
435         {
436             while (!this.state.isSimulatorEnded() && !this.state.isError())
437             {
438                 long messageNumber = this.messageCount.get();
439                 byte[] fm5Message;
440                 fm5Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.5",
441                         this.messageCount.getAndIncrement(), MessageStatus.NEW);
442                 this.modelSocket.send(fm5Message);
443 
444                 byte[] reply = this.modelSocket.recv(0);
445                 Object[] replyMessage = SimulationMessage.decode(reply);
446                 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
447 
448                 if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
449                         && !replyMessage[9].toString().equals("started")
450                         && ((Long) replyMessage[8]).longValue() == messageNumber)
451                 {
452                     if (replyMessage[9].toString().equals("ended"))
453                     {
454                         this.state = ModelState.SIMULATORENDED;
455                     }
456                     else
457                     {
458                         // wait a second
459                         try
460                         {
461                             Thread.sleep(1000);
462                         }
463                         catch (InterruptedException ie)
464                         {
465                             // ignore
466                         }
467                     }
468                 }
469                 else
470                 {
471                     this.state = ModelState.ERROR;
472                     System.err.println("Simulation start error -- status = " + replyMessage[9]);
473                     System.err.println("Error message = " + replyMessage[10]);
474                 }
475             }
476         }
477 
478         /**
479          * Request statistics with message FM.6.
480          * @param federationName the name of the federation
481          * @throws Sim0MQException on error
482          * @throws SerializationException on serialization problem
483          */
484         private void requestStatistics(final String federationName) throws Sim0MQException, SerializationException
485         {
486             List<String> stats = new ArrayList<>();
487             stats.add("dN.average");
488             stats.add("qN.max");
489             stats.add("uN.average");
490 
491             for (String statName : stats)
492             {
493                 if (!this.state.isError())
494                 {
495                     byte[] fm6Message;
496                     fm6Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.6",
497                             this.messageCount.getAndIncrement(), MessageStatus.NEW, statName);
498                     this.modelSocket.send(fm6Message);
499 
500                     byte[] reply = this.modelSocket.recv(0);
501                     Object[] replyMessage = SimulationMessage.decode(reply);
502                     System.out.println("Received\n" + SimulationMessage.print(replyMessage));
503 
504                     if (replyMessage[4].toString().equals("MC.3"))
505                     {
506                         if (replyMessage[9].toString().equals(statName))
507                         {
508                             System.out.println("Received statistic for " + statName + " = " + replyMessage[10].toString());
509                             this.statistics.put(statName, (Number) replyMessage[10]);
510                         }
511                         else
512                         {
513                             this.state = ModelState.ERROR;
514                             System.err.println(
515                                     "Statistics Error: Stat variable expected = " + statName + ", got: " + replyMessage[8]);
516                         }
517                     }
518                     else if (replyMessage[4].toString().equals("MC.4"))
519                     {
520                         this.state = ModelState.ERROR;
521                         System.err.println("Statistics Error: Stat variable = " + replyMessage[8]);
522                         System.err.println("Error message = " + replyMessage[9]);
523                     }
524                     else
525                     {
526                         this.state = ModelState.ERROR;
527                         System.err.println("Statistics Error: Received unknown message as reply to FM6: " + replyMessage[4]);
528                     }
529                 }
530             }
531             if (!this.state.isError())
532             {
533                 this.state = ModelState.STATISTICSGATHERED;
534             }
535         }
536 
537         /**
538          * Send the FM.8 message to the FederateStarter to kill the MM1 model.
539          * @param federationName the name of the federation
540          * @throws Sim0MQException on error
541          * @throws SerializationException on serialization problem
542          */
543         private void killFederate(final String federationName) throws Sim0MQException, SerializationException
544         {
545             byte[] fm8Message;
546             fm8Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.8", this.messageCount.getAndIncrement(),
547                     MessageStatus.NEW, this.modelName);
548             this.fsSocket.send(fm8Message);
549 
550             byte[] reply = this.fsSocket.recv(0);
551             Object[] replyMessage = SimulationMessage.decode(reply);
552             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
553 
554             if (replyMessage[4].toString().equals("FS.4") && (boolean) replyMessage[9]
555                     && replyMessage[8].toString().equals(this.modelName))
556             {
557                 this.state = ModelState.TERMINATED;
558             }
559             else
560             {
561                 this.state = ModelState.ERROR;
562                 System.err.println("Model not killed correctly");
563                 System.err.println("Tried to kill model = " + replyMessage[8]);
564                 System.err.println("Error message = " + replyMessage[10]);
565             }
566         }
567 
568         /**
569          * @return statistics
570          */
571         public final Map<String, Number> getStatistics()
572         {
573             return this.statistics;
574         }
575 
576     }
577 
578 }