View Javadoc
1   package org.sim0mq.demo.mm1;
2   
3   import java.util.ArrayList;
4   import java.util.HashMap;
5   import java.util.List;
6   import java.util.Map;
7   import java.util.UUID;
8   
9   import org.djutils.serialization.SerializationException;
10  import org.sim0mq.Sim0MQException;
11  import org.sim0mq.federationmanager.ModelState;
12  import org.sim0mq.message.MessageStatus;
13  import org.sim0mq.message.SimulationMessage;
14  import org.zeromq.SocketType;
15  import org.zeromq.ZContext;
16  import org.zeromq.ZMQ;
17  
18  /**
19   * Example implementation of a FederationManager to start the MM1Queue41Application DSOL model.
20   * <p>
21   * Copyright (c) 2016-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
22   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
23   * </p>
24   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
25   * initial version April 10, 2017 <br>
26   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
27   */
28  public class MM1FederationManager
29  {
30      /** the state of the started model. */
31      private ModelState state;
32  
33      /** the model socket. */
34      private ZMQ.Socket modelSocket;
35  
36      /** the federate starter socket. */
37      private ZMQ.Socket fsSocket;
38  
39      /** the context. */
40      private ZContext fmContext;
41  
42      /** message count. */
43      private long messageCount = 0;
44  
45      /**
46       * Send an FM.1 message to the FederateStarter.
47       * @param federationName the name of the federation
48       * @param fmPort the port number to listen on
49       * @param fsPort the port where the federate starter can be reached
50       * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
51       * @throws Sim0MQException on error
52       * @throws SerializationException on serialization problem
53       */
54      public MM1FederationManager(final String federationName, final int fmPort, final int fsPort, final String localSk3)
55              throws Sim0MQException, SerializationException
56      {
57          this.fmContext = new ZContext(1);
58          this.modelSocket = this.fmContext.createSocket(SocketType.REQ);
59          this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
60          this.fsSocket = this.fmContext.createSocket(SocketType.REQ);
61          this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
62  
63          this.state = ModelState.NOT_STARTED;
64          boolean ready = false;
65          while (!ready)
66          {
67              System.out.println(this.state);
68  
69              switch (this.state)
70              {
71                  case NOT_STARTED:
72                      startModel(federationName, fsPort, localSk3);
73                      break;
74  
75                  case STARTED:
76                      sendSimRunControl(federationName);
77                      break;
78  
79                  case RUNCONTROL:
80                      setParameters(federationName);
81                      break;
82  
83                  case PARAMETERS:
84                      sendSimStart(federationName);
85                      break;
86  
87                  case SIMULATORSTARTED:
88                      waitForSimEnded(federationName);
89                      break;
90  
91                  case SIMULATORENDED:
92                      requestStatistics(federationName);
93                      break;
94  
95                  case STATISTICSGATHERED:
96                      killFederate(federationName);
97                      ready = true;
98                      break;
99  
100                 case ERROR:
101                     killFederate(federationName);
102                     ready = true;
103                     break;
104 
105                 default:
106                     break;
107             }
108         }
109 
110         this.modelSocket.close();
111         this.fsSocket.close();
112         this.fmContext.destroy();
113         this.fmContext.close();
114     }
115 
116     /**
117      * Sed the FM.1 message to the FederateStarter to start the MM1 model.
118      * @param federationName the name of the federation
119      * @param fsPort the port where the federate starter can be reached
120      * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
121      * @throws Sim0MQException on error
122      * @throws SerializationException on serialization problem
123      */
124     private void startModel(final String federationName, final int fsPort, final String localSk3)
125             throws Sim0MQException, SerializationException
126     {
127         // Start model mmm1.jar
128         byte[] fm1Message;
129         if (localSk3.equals("sk-3"))
130         {
131             fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", ++this.messageCount,
132                     MessageStatus.NEW, "MM1.1", "java8+", "-jar", "/home/alexandv/sim0mq/MM1/mm1.jar", "MM1.1 %PORT%",
133                     "/home/alexandv/sim0mq/MM1", "", "/home/alexandv/sim0mq/MM1/out.txt", "/home/alexandv/sim0mq/MM1/err.txt",
134                     false, false, false);
135             this.fsSocket.connect("tcp://130.161.3.179:" + fsPort);
136         }
137         else
138         {
139             fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", ++this.messageCount,
140                     MessageStatus.NEW, "MM1.1", "java8+", "-jar", "e:/sim0mq/MM1/mm1.jar", "MM1.1 %PORT%", "e:/sim0mq/MM1", "",
141                     "e:/sim0mq/MM1/out.txt", "e:/sim0mq/MM1/err.txt", false, false, false);
142             this.fsSocket.connect("tcp://127.0.0.1:" + fsPort);
143         }
144         this.fsSocket.send(fm1Message);
145 
146         byte[] reply = this.fsSocket.recv(0);
147         Object[] replyMessage = SimulationMessage.decode(reply);
148         System.out.println("Received\n" + SimulationMessage.print(replyMessage));
149 
150         if (replyMessage[4].toString().equals("FS.2") && replyMessage[9].toString().equals("started")
151                 && replyMessage[8].toString().equals("MM1.1"))
152         {
153             this.state = ModelState.STARTED;
154             this.modelSocket.connect("tcp://127.0.0.1:" + replyMessage[10].toString());
155         }
156         else
157         {
158             this.state = ModelState.ERROR;
159             System.err.println("Model not started correctly -- state = " + replyMessage[9]);
160             System.err.println("Started model = " + replyMessage[8]);
161             System.err.println("Error message = " + replyMessage[10]);
162         }
163 
164     }
165 
166     /**
167      * Send the SimRunControl message FM.2.
168      * @param federationName the name of the federation
169      * @throws Sim0MQException on error
170      * @throws SerializationException on serialization problem
171      */
172     private void sendSimRunControl(final String federationName) throws Sim0MQException, SerializationException
173     {
174         byte[] fm2Message;
175         fm2Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.2", ++this.messageCount, MessageStatus.NEW,
176                 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0);
177         this.modelSocket.send(fm2Message);
178 
179         byte[] reply = this.modelSocket.recv(0);
180         Object[] replyMessage = SimulationMessage.decode(reply);
181         System.out.println("Received\n" + SimulationMessage.print(replyMessage));
182 
183         if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
184                 && ((Long) replyMessage[8]).longValue() == this.messageCount)
185         {
186             this.state = ModelState.RUNCONTROL;
187         }
188         else
189         {
190             this.state = ModelState.ERROR;
191             System.err.println("Model not started correctly -- status = " + replyMessage[9]);
192             System.err.println("Error message = " + replyMessage[10]);
193         }
194     }
195 
196     /**
197      * Send the Parameters messages FM.3.
198      * @param federationName the name of the federation
199      * @throws Sim0MQException on error
200      * @throws SerializationException on serialization problem
201      */
202     private void setParameters(final String federationName) throws Sim0MQException, SerializationException
203     {
204         Map<String, Object> parameters = new HashMap<>();
205         parameters.put("iat", new Double(1.0));
206         parameters.put("servicetime", new Double(0.85));
207 
208         for (String parameterName : parameters.keySet())
209         {
210             if (!this.state.isError())
211             {
212                 byte[] fm3Message;
213                 fm3Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.3", ++this.messageCount,
214                         MessageStatus.NEW, parameterName, parameters.get(parameterName));
215                 this.modelSocket.send(fm3Message);
216 
217                 byte[] reply = this.modelSocket.recv(0);
218                 Object[] replyMessage = SimulationMessage.decode(reply);
219                 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
220 
221                 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
222                         && ((Long) replyMessage[8]).longValue() == this.messageCount)
223                 {
224                     this.state = ModelState.PARAMETERS;
225                 }
226                 else
227                 {
228                     this.state = ModelState.ERROR;
229                     System.err.println("Model parameter error -- status = " + replyMessage[9]);
230                     System.err.println("Error message = " + replyMessage[10]);
231                 }
232             }
233         }
234         if (!this.state.isError())
235         {
236             this.state = ModelState.PARAMETERS;
237         }
238     }
239 
240     /**
241      * Send the SimStart message FM.4.
242      * @param federationName the name of the federation
243      * @throws Sim0MQException on error
244      * @throws SerializationException on serialization problem
245      */
246     private void sendSimStart(final String federationName) throws Sim0MQException, SerializationException
247     {
248         byte[] fm4Message;
249         fm4Message =
250                 SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.4", ++this.messageCount, MessageStatus.NEW);
251         this.modelSocket.send(fm4Message);
252 
253         byte[] reply = this.modelSocket.recv(0);
254         Object[] replyMessage = SimulationMessage.decode(reply);
255         System.out.println("Received\n" + SimulationMessage.print(replyMessage));
256 
257         if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
258                 && ((Long) replyMessage[8]).longValue() == this.messageCount)
259         {
260             this.state = ModelState.SIMULATORSTARTED;
261         }
262         else
263         {
264             this.state = ModelState.ERROR;
265             System.err.println("Simulation start error -- status = " + replyMessage[9]);
266             System.err.println("Error message = " + replyMessage[10]);
267         }
268     }
269 
270     /**
271      * Wait for simulation to end using status polling with message FM.5.
272      * @param federationName the name of the federation
273      * @throws Sim0MQException on error
274      * @throws SerializationException on serialization problem
275      */
276     private void waitForSimEnded(final String federationName) throws Sim0MQException, SerializationException
277     {
278         while (!this.state.isSimulatorEnded() && !this.state.isError())
279         {
280             byte[] fm5Message;
281             fm5Message =
282                     SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.5", ++this.messageCount, MessageStatus.NEW);
283             this.modelSocket.send(fm5Message);
284 
285             byte[] reply = this.modelSocket.recv(0);
286             Object[] replyMessage = SimulationMessage.decode(reply);
287             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
288 
289             if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
290                     && !replyMessage[9].toString().equals("started")
291                     && ((Long) replyMessage[8]).longValue() == this.messageCount)
292             {
293                 if (replyMessage[9].toString().equals("ended"))
294                 {
295                     this.state = ModelState.SIMULATORENDED;
296                 }
297                 else
298                 {
299                     // wait a second
300                     try
301                     {
302                         Thread.sleep(1000);
303                     }
304                     catch (InterruptedException ie)
305                     {
306                         // ignore
307                     }
308                 }
309             }
310             else
311             {
312                 this.state = ModelState.ERROR;
313                 System.err.println("Simulation start error -- status = " + replyMessage[9]);
314                 System.err.println("Error message = " + replyMessage[10]);
315             }
316         }
317     }
318 
319     /**
320      * Request statistics with message FM.6.
321      * @param federationName the name of the federation
322      * @throws Sim0MQException on error
323      * @throws SerializationException on serialization problem
324      */
325     private void requestStatistics(final String federationName) throws Sim0MQException, SerializationException
326     {
327         List<String> stats = new ArrayList<>();
328         stats.add("dN.average");
329         stats.add("qN.max");
330         stats.add("uN.average");
331 
332         for (String statName : stats)
333         {
334             if (!this.state.isError())
335             {
336                 byte[] fm6Message;
337                 fm6Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.6", ++this.messageCount,
338                         MessageStatus.NEW, statName);
339                 this.modelSocket.send(fm6Message);
340 
341                 byte[] reply = this.modelSocket.recv(0);
342                 Object[] replyMessage = SimulationMessage.decode(reply);
343                 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
344 
345                 if (replyMessage[4].toString().equals("MC.3"))
346                 {
347                     if (replyMessage[9].toString().equals(statName))
348                     {
349                         System.out.println("Received statistic for " + statName + " = " + replyMessage[10].toString());
350                     }
351                     else
352                     {
353                         this.state = ModelState.ERROR;
354                         System.err.println(
355                                 "Statistics Error: Stat variable expected = " + statName + ", got: " + replyMessage[8]);
356                     }
357                 }
358                 else if (replyMessage[4].toString().equals("MC.4"))
359                 {
360                     this.state = ModelState.ERROR;
361                     System.err.println("Statistics Error: Stat variable = " + replyMessage[8]);
362                     System.err.println("Error message = " + replyMessage[9]);
363                 }
364                 else
365                 {
366                     this.state = ModelState.ERROR;
367                     System.err.println("Statistics Error: Received unknown message as reply to FM6: " + replyMessage[4]);
368                 }
369             }
370         }
371         if (!this.state.isError())
372         {
373             this.state = ModelState.STATISTICSGATHERED;
374         }
375     }
376 
377     /**
378      * Send the FM.8 message to the FederateStarter to kill the MM1 model.
379      * @param federationName the name of the federation
380      * @throws Sim0MQException on error
381      * @throws SerializationException on serialization problem
382      */
383     private void killFederate(final String federationName) throws Sim0MQException, SerializationException
384     {
385         byte[] fm8Message;
386         fm8Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.8", ++this.messageCount, MessageStatus.NEW,
387                 "MM1.1");
388         this.fsSocket.send(fm8Message);
389 
390         byte[] reply = this.fsSocket.recv(0);
391         Object[] replyMessage = SimulationMessage.decode(reply);
392         System.out.println("Received\n" + SimulationMessage.print(replyMessage));
393 
394         if (replyMessage[4].toString().equals("FS.4") && (boolean) replyMessage[9]
395                 && replyMessage[8].toString().equals("MM1.1"))
396         {
397             this.state = ModelState.TERMINATED;
398         }
399         else
400         {
401             this.state = ModelState.ERROR;
402             System.err.println("Model not killed correctly");
403             System.err.println("Tried to kill model = " + replyMessage[8]);
404             System.err.println("Error message = " + replyMessage[10]);
405         }
406 
407     }
408 
409     /**
410      * @param args parameters for main
411      * @throws Sim0MQException on error
412      * @throws SerializationException on serialization problem
413      */
414     public static void main(final String[] args) throws Sim0MQException, SerializationException
415     {
416         if (args.length < 4)
417         {
418             System.err.println(
419                     "Use as FederationManager federationName federationManagerPortNumber federateStarterPortNumber local/sk-3");
420             System.exit(-1);
421         }
422         String federationName = args[0];
423 
424         String fmsPort = args[1];
425         int fmPort = 0;
426         try
427         {
428             fmPort = Integer.parseInt(fmsPort);
429         }
430         catch (NumberFormatException nfe)
431         {
432             System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk-3, where fmPort is a number");
433             System.exit(-1);
434         }
435         if (fmPort == 0 || fmPort > 65535)
436         {
437             System.err.println("fmPort should be between 1 and 65535");
438             System.exit(-1);
439         }
440 
441         String fssPort = args[2];
442         int fsPort = 0;
443         try
444         {
445             fsPort = Integer.parseInt(fssPort);
446         }
447         catch (NumberFormatException nfe)
448         {
449             System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where fsPort is a number");
450             System.exit(-1);
451         }
452         if (fsPort == 0 || fsPort > 65535)
453         {
454             System.err.println("fsPort should be between 1 and 65535");
455             System.exit(-1);
456         }
457 
458         String localSk3 = args[3];
459         if (!localSk3.equals("local") && !localSk3.equals("sk-3"))
460         {
461             System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where last arg is local/sk-3");
462             System.exit(-1);
463         }
464 
465         new MM1FederationManager(federationName, fmPort, fsPort, localSk3);
466 
467     }
468 
469 }