View Javadoc
1   package org.sim0mq.demo.mm1;
2   
3   import java.util.ArrayList;
4   import java.util.HashMap;
5   import java.util.List;
6   import java.util.Map;
7   import java.util.UUID;
8   
9   import org.sim0mq.Sim0MQException;
10  import org.sim0mq.federationmanager.ModelState;
11  import org.sim0mq.message.MessageStatus;
12  import org.sim0mq.message.SimulationMessage;
13  import org.zeromq.ZMQ;
14  
15  /**
16   * Example implementation of a FederationManager to start the MM1Queue41Application DSOL model.
17   * <p>
18   * Copyright (c) 2016-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
19   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
20   * </p>
21   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
22   * initial version April 10, 2017 <br>
23   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
24   */
25  public class MM1FederationManager
26  {
27      /** the state of the started model. */
28      private ModelState state;
29  
30      /** the model socket. */
31      private ZMQ.Socket modelSocket;
32  
33      /** the federate starter socket. */
34      private ZMQ.Socket fsSocket;
35  
36      /** the context. */
37      private ZMQ.Context fmContext;
38  
39      /** message count. */
40      private long messageCount = 0;
41  
42      /**
43       * Send an FM.1 message to the FederateStarter.
44       * @param federationName the name of the federation
45       * @param fmPort the port number to listen on
46       * @param fsPort the port where the federate starter can be reached
47       * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
48       * @throws Sim0MQException on error
49       */
50      public MM1FederationManager(final String federationName, final int fmPort, final int fsPort, final String localSk3)
51              throws Sim0MQException
52      {
53          this.fmContext = ZMQ.context(1);
54          this.modelSocket = this.fmContext.socket(ZMQ.REQ);
55          this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
56          this.fsSocket = this.fmContext.socket(ZMQ.REQ);
57          this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
58  
59          this.state = ModelState.NOT_STARTED;
60          boolean ready = false;
61          while (!ready)
62          {
63              System.out.println(this.state);
64  
65              switch (this.state)
66              {
67                  case NOT_STARTED:
68                      startModel(federationName, fsPort, localSk3);
69                      break;
70  
71                  case STARTED:
72                      sendSimRunControl(federationName);
73                      break;
74  
75                  case RUNCONTROL:
76                      setParameters(federationName);
77                      break;
78  
79                  case PARAMETERS:
80                      sendSimStart(federationName);
81                      break;
82  
83                  case SIMULATORSTARTED:
84                      waitForSimEnded(federationName);
85                      break;
86  
87                  case SIMULATORENDED:
88                      requestStatistics(federationName);
89                      break;
90  
91                  case STATISTICSGATHERED:
92                      killFederate(federationName);
93                      ready = true;
94                      break;
95  
96                  case ERROR:
97                      killFederate(federationName);
98                      ready = true;
99                      break;
100 
101                 default:
102                     break;
103             }
104         }
105 
106         this.modelSocket.close();
107         this.fsSocket.close();
108         this.fmContext.term();
109     }
110 
111     /**
112      * Sed the FM.1 message to the FederateStarter to start the MM1 model.
113      * @param federationName the name of the federation
114      * @param fsPort the port where the federate starter can be reached
115      * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
116      * @throws Sim0MQException on error
117      */
118     private void startModel(final String federationName, final int fsPort, final String localSk3) throws Sim0MQException
119     {
120         // Start model mmm1.jar
121         byte[] fm1Message;
122         if (localSk3.equals("sk-3"))
123         {
124             fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", ++this.messageCount, MessageStatus.NEW,
125                     "MM1.1", "java8+", "-jar", "/home/alexandv/sim0mq/MM1/mm1.jar", "MM1.1 %PORT%", "/home/alexandv/sim0mq/MM1",
126                     "", "/home/alexandv/sim0mq/MM1/out.txt", "/home/alexandv/sim0mq/MM1/err.txt", false, false, false);
127             this.fsSocket.connect("tcp://130.161.3.179:" + fsPort);
128         }
129         else
130         {
131             fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", ++this.messageCount, MessageStatus.NEW,
132                     "MM1.1", "java8+", "-jar", "e:/MM1/mm1.jar", "MM1.1 %PORT%", "e:/MM1", "", "e:/MM1/out.txt", "e:/MM1/err.txt",
133                     false, false, false);
134             this.fsSocket.connect("tcp://127.0.0.1:" + fsPort);
135         }
136         this.fsSocket.send(fm1Message);
137 
138         byte[] reply = this.fsSocket.recv(0);
139         Object[] replyMessage = SimulationMessage.decode(reply);
140         System.out.println("Received\n" + SimulationMessage.print(replyMessage));
141 
142         if (replyMessage[4].toString().equals("FS.2") && replyMessage[9].toString().equals("started")
143                 && replyMessage[8].toString().equals("MM1.1"))
144         {
145             this.state = ModelState.STARTED;
146             this.modelSocket.connect("tcp://127.0.0.1:" + replyMessage[10].toString());
147         }
148         else
149         {
150             this.state = ModelState.ERROR;
151             System.err.println("Model not started correctly -- state = " + replyMessage[9]);
152             System.err.println("Started model = " + replyMessage[8]);
153             System.err.println("Error message = " + replyMessage[10]);
154         }
155 
156     }
157 
158     /**
159      * Send the SimRunControl message FM.2.
160      * @param federationName the name of the federation
161      * @throws Sim0MQException on error
162      */
163     private void sendSimRunControl(final String federationName) throws Sim0MQException
164     {
165         byte[] fm2Message;
166         fm2Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.2", ++this.messageCount, MessageStatus.NEW,
167                 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0);
168         this.modelSocket.send(fm2Message);
169 
170         byte[] reply = this.modelSocket.recv(0);
171         Object[] replyMessage = SimulationMessage.decode(reply);
172         System.out.println("Received\n" + SimulationMessage.print(replyMessage));
173 
174         if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
175                 && ((Long) replyMessage[8]).longValue() == this.messageCount)
176         {
177             this.state = ModelState.RUNCONTROL;
178         }
179         else
180         {
181             this.state = ModelState.ERROR;
182             System.err.println("Model not started correctly -- status = " + replyMessage[9]);
183             System.err.println("Error message = " + replyMessage[10]);
184         }
185     }
186 
187     /**
188      * Send the Parameters messages FM.3.
189      * @param federationName the name of the federation
190      * @throws Sim0MQException on error
191      */
192     private void setParameters(final String federationName) throws Sim0MQException
193     {
194         Map<String, Object> parameters = new HashMap<>();
195         parameters.put("iat", new Double(1.0));
196         parameters.put("servicetime", new Double(0.85));
197 
198         for (String parameterName : parameters.keySet())
199         {
200             if (!this.state.isError())
201             {
202                 byte[] fm3Message;
203                 fm3Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.3", ++this.messageCount,
204                         MessageStatus.NEW, parameterName, parameters.get(parameterName));
205                 this.modelSocket.send(fm3Message);
206 
207                 byte[] reply = this.modelSocket.recv(0);
208                 Object[] replyMessage = SimulationMessage.decode(reply);
209                 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
210 
211                 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
212                         && ((Long) replyMessage[8]).longValue() == this.messageCount)
213                 {
214                     this.state = ModelState.PARAMETERS;
215                 }
216                 else
217                 {
218                     this.state = ModelState.ERROR;
219                     System.err.println("Model parameter error -- status = " + replyMessage[9]);
220                     System.err.println("Error message = " + replyMessage[10]);
221                 }
222             }
223         }
224         if (!this.state.isError())
225         {
226             this.state = ModelState.PARAMETERS;
227         }
228     }
229 
230     /**
231      * Send the SimStart message FM.4.
232      * @param federationName the name of the federation
233      * @throws Sim0MQException on error
234      */
235     private void sendSimStart(final String federationName) throws Sim0MQException
236     {
237         byte[] fm4Message;
238         fm4Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.4", ++this.messageCount, MessageStatus.NEW);
239         this.modelSocket.send(fm4Message);
240 
241         byte[] reply = this.modelSocket.recv(0);
242         Object[] replyMessage = SimulationMessage.decode(reply);
243         System.out.println("Received\n" + SimulationMessage.print(replyMessage));
244 
245         if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
246                 && ((Long) replyMessage[8]).longValue() == this.messageCount)
247         {
248             this.state = ModelState.SIMULATORSTARTED;
249         }
250         else
251         {
252             this.state = ModelState.ERROR;
253             System.err.println("Simulation start error -- status = " + replyMessage[9]);
254             System.err.println("Error message = " + replyMessage[10]);
255         }
256     }
257 
258     /**
259      * Wait for simulation to end using status polling with message FM.5.
260      * @param federationName the name of the federation
261      * @throws Sim0MQException on error
262      */
263     private void waitForSimEnded(final String federationName) throws Sim0MQException
264     {
265         while (!this.state.isSimulatorEnded() && !this.state.isError())
266         {
267             byte[] fm5Message;
268             fm5Message =
269                     SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.5", ++this.messageCount, MessageStatus.NEW);
270             this.modelSocket.send(fm5Message);
271 
272             byte[] reply = this.modelSocket.recv(0);
273             Object[] replyMessage = SimulationMessage.decode(reply);
274             System.out.println("Received\n" + SimulationMessage.print(replyMessage));
275 
276             if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
277                     && !replyMessage[9].toString().equals("started")
278                     && ((Long) replyMessage[8]).longValue() == this.messageCount)
279             {
280                 if (replyMessage[9].toString().equals("ended"))
281                 {
282                     this.state = ModelState.SIMULATORENDED;
283                 }
284                 else
285                 {
286                     // wait a second
287                     try
288                     {
289                         Thread.sleep(1000);
290                     }
291                     catch (InterruptedException ie)
292                     {
293                         // ignore
294                     }
295                 }
296             }
297             else
298             {
299                 this.state = ModelState.ERROR;
300                 System.err.println("Simulation start error -- status = " + replyMessage[9]);
301                 System.err.println("Error message = " + replyMessage[10]);
302             }
303         }
304     }
305 
306     /**
307      * Request statistics with message FM.6.
308      * @param federationName the name of the federation
309      * @throws Sim0MQException on error
310      */
311     private void requestStatistics(final String federationName) throws Sim0MQException
312     {
313         List<String> stats = new ArrayList<>();
314         stats.add("dN.average");
315         stats.add("qN.max");
316         stats.add("uN.average");
317 
318         for (String statName : stats)
319         {
320             if (!this.state.isError())
321             {
322                 byte[] fm6Message;
323                 fm6Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.6", ++this.messageCount,
324                         MessageStatus.NEW, statName);
325                 this.modelSocket.send(fm6Message);
326 
327                 byte[] reply = this.modelSocket.recv(0);
328                 Object[] replyMessage = SimulationMessage.decode(reply);
329                 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
330 
331                 if (replyMessage[4].toString().equals("MC.3"))
332                 {
333                     if (replyMessage[9].toString().equals(statName))
334                     {
335                         System.out.println("Received statistic for " + statName + " = " + replyMessage[10].toString());
336                     }
337                     else
338                     {
339                         this.state = ModelState.ERROR;
340                         System.err.println(
341                                 "Statistics Error: Stat variable expected = " + statName + ", got: " + replyMessage[8]);
342                     }
343                 }
344                 else if (replyMessage[4].toString().equals("MC.4"))
345                 {
346                     this.state = ModelState.ERROR;
347                     System.err.println("Statistics Error: Stat variable = " + replyMessage[8]);
348                     System.err.println("Error message = " + replyMessage[9]);
349                 }
350                 else
351                 {
352                     this.state = ModelState.ERROR;
353                     System.err.println("Statistics Error: Received unknown message as reply to FM6: " + replyMessage[4]);
354                 }
355             }
356         }
357         if (!this.state.isError())
358         {
359             this.state = ModelState.STATISTICSGATHERED;
360         }
361     }
362 
363     /**
364      * Send the FM.8 message to the FederateStarter to kill the MM1 model.
365      * @param federationName the name of the federation
366      * @throws Sim0MQException on error
367      */
368     private void killFederate(final String federationName) throws Sim0MQException
369     {
370         byte[] fm8Message;
371         fm8Message =
372                 SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.8", ++this.messageCount, MessageStatus.NEW, "MM1.1");
373         this.fsSocket.send(fm8Message);
374 
375         byte[] reply = this.fsSocket.recv(0);
376         Object[] replyMessage = SimulationMessage.decode(reply);
377         System.out.println("Received\n" + SimulationMessage.print(replyMessage));
378 
379         if (replyMessage[4].toString().equals("FS.4") && (boolean) replyMessage[9]
380                 && replyMessage[8].toString().equals("MM1.1"))
381         {
382             this.state = ModelState.TERMINATED;
383         }
384         else
385         {
386             this.state = ModelState.ERROR;
387             System.err.println("Model not killed correctly");
388             System.err.println("Tried to kill model = " + replyMessage[8]);
389             System.err.println("Error message = " + replyMessage[10]);
390         }
391 
392     }
393 
394     /**
395      * @param args parameters for main
396      * @throws Sim0MQException on error
397      */
398     public static void main(String[] args) throws Sim0MQException
399     {
400         if (args.length < 4)
401         {
402             System.err.println(
403                     "Use as FederationManager federationName federationManagerPortNumber federateStarterPortNumber local/sk-3");
404             System.exit(-1);
405         }
406         String federationName = args[0];
407 
408         String fmsPort = args[1];
409         int fmPort = 0;
410         try
411         {
412             fmPort = Integer.parseInt(fmsPort);
413         }
414         catch (NumberFormatException nfe)
415         {
416             System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk-3, where fmPort is a number");
417             System.exit(-1);
418         }
419         if (fmPort == 0 || fmPort > 65535)
420         {
421             System.err.println("fmPort should be between 1 and 65535");
422             System.exit(-1);
423         }
424 
425         String fssPort = args[2];
426         int fsPort = 0;
427         try
428         {
429             fsPort = Integer.parseInt(fssPort);
430         }
431         catch (NumberFormatException nfe)
432         {
433             System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where fsPort is a number");
434             System.exit(-1);
435         }
436         if (fsPort == 0 || fsPort > 65535)
437         {
438             System.err.println("fsPort should be between 1 and 65535");
439             System.exit(-1);
440         }
441 
442         String localSk3 = args[3];
443         if (!localSk3.equals("local") && !localSk3.equals("sk-3"))
444         {
445             System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where last arg is local/sk-3");
446             System.exit(-1);
447         }
448 
449         new MM1FederationManager(federationName, fmPort, fsPort, localSk3);
450 
451     }
452 
453 }