View Javadoc
1   package org.sim0mq.demo.mm1;
2   
3   import java.rmi.RemoteException;
4   
5   import javax.naming.NamingException;
6   
7   import org.djunits.unit.DurationUnit;
8   import org.djunits.value.vdouble.scalar.Duration;
9   import org.djutils.serialization.SerializationException;
10  import org.sim0mq.Sim0MQException;
11  import org.sim0mq.message.MessageUtil;
12  import org.sim0mq.message.Sim0MQMessage;
13  import org.sim0mq.message.federatestarter.FS1RequestStatusMessage;
14  import org.sim0mq.message.federationmanager.FM2SimRunControlMessage;
15  import org.sim0mq.message.federationmanager.FM3SetParameterMessage;
16  import org.sim0mq.message.federationmanager.FM4SimStartMessage;
17  import org.sim0mq.message.federationmanager.FM5RequestStatus;
18  import org.sim0mq.message.federationmanager.FM6RequestStatisticsMessage;
19  import org.sim0mq.message.modelcontroller.MC1StatusMessage;
20  import org.sim0mq.message.modelcontroller.MC2AckNakMessage;
21  import org.sim0mq.message.modelcontroller.MC3StatisticsMessage;
22  import org.sim0mq.message.modelcontroller.MC4StatisticsErrorMessage;
23  import org.zeromq.SocketType;
24  import org.zeromq.ZContext;
25  import org.zeromq.ZMQ;
26  
27  import nl.tudelft.simulation.dsol.SimRuntimeException;
28  import nl.tudelft.simulation.dsol.experiment.Replication;
29  import nl.tudelft.simulation.dsol.experiment.ReplicationMode;
30  import nl.tudelft.simulation.dsol.simulators.DEVSSimulator;
31  import nl.tudelft.simulation.dsol.simulators.DEVSSimulatorInterface;
32  
33  /**
34   * <p>
35   * Copyright (c) 2015-2020 Delft University of Technology, Jaffalaan 5, 2628 BX Delft, the Netherlands. All rights reserved.
36   * <p>
37   * See for project information <a href="http://www.simulation.tudelft.nl/"> www.simulation.tudelft.nl</a>.
38   * <p>
39   * The DSOL project is distributed under a BSD-style license.<br>
40   * @version Aug 15, 2014 <br>
41   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
42   */
43  public class MM1Queue41Application
44  {
45      /** */
46      private DEVSSimulator.TimeDouble simulator;
47  
48      /** */
49      private MM1Queue41Model model;
50  
51      /** the socket. */
52      private ZMQ.Socket fsSocket;
53  
54      /** the context. */
55      private ZContext fsContext;
56  
57      /** federation run id. */
58      private Object federationRunId;
59  
60      /** federateId unique id of the model that is used as the sender/receiver when communicating. */
61      private Object modelId;
62  
63      /** runtime. */
64      private Duration runDuration;
65  
66      /** warmup. */
67      private Duration warmupDuration;
68  
69      /** message count. */
70      private long messageCount = 0;
71  
72      /**
73       * Construct a console application.
74       * @param modelId unique Id of the model that is used as the sender/receiver when communicating
75       * @param port the sim0mq port number on which the model listens
76       * @throws SimRuntimeException on error
77       * @throws RemoteException on error
78       * @throws NamingException on error
79       * @throws Sim0MQException on error
80       * @throws SerializationException on serialization problem
81       */
82      protected MM1Queue41Application(final String modelId, final int port)
83              throws SimRuntimeException, RemoteException, NamingException, Sim0MQException, SerializationException
84      {
85          this.simulator = new DEVSSimulator.TimeDouble(modelId + ".simulator");
86          this.modelId = modelId.trim();
87          this.model = new MM1Queue41Model(this.simulator);
88          startListener(port);
89      }
90  
91      /**
92       * Start listening on a port.
93       * @param port the sim0mq port number on which the model listens
94       * @throws Sim0MQException on error
95       * @throws SerializationException on serialization problem
96       */
97      protected void startListener(final int port) throws Sim0MQException, SerializationException
98      {
99          this.fsContext = new ZContext(1);
100 
101         this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
102         this.fsSocket.bind("tcp://*:" + port);
103 
104         System.out.println("Model started. Listening at port: " + port);
105         System.out.flush();
106 
107         while (!Thread.currentThread().isInterrupted())
108         {
109             // Wait for next request from the client -- first the identity (String) and the delimiter (#0)
110             String identity = this.fsSocket.recvStr();
111             this.fsSocket.recvStr();
112 
113             byte[] request = this.fsSocket.recv(0);
114             System.out.println(MessageUtil.printBytes(request));
115             Object[] fields = Sim0MQMessage.decodeToArray(request);
116             Object receiverId = fields[4];
117             Object messageTypeId = fields[5];
118 
119             System.out.println("Received " + Sim0MQMessage.print(fields));
120             System.out.flush();
121 
122             if (receiverId.equals(this.modelId))
123             {
124                 switch (messageTypeId.toString())
125                 {
126                     case "FS.1":
127                         processRequestStatus(identity, new FS1RequestStatusMessage(fields));
128                         break;
129 
130                     case "FM.5":
131                         processRequestStatus(identity, new FM5RequestStatus(fields));
132                         break;
133 
134                     case "FM.2":
135                         processSimRunControl(identity, new FM2SimRunControlMessage(fields));
136                         break;
137 
138                     case "FM.3":
139                         processSetParameter(identity, new FM3SetParameterMessage(fields));
140                         break;
141 
142                     case "FM.4":
143                         processSimStart(identity, new FM4SimStartMessage(fields));
144                         break;
145 
146                     case "FM.6":
147                         processRequestStatistics(identity, new FM6RequestStatisticsMessage(fields));
148                         break;
149 
150                     case "FS.3":
151                         processKillFederate();
152                         break;
153 
154                     default:
155                         // wrong message
156                         System.err.println("Received unknown message -- not processed: " + messageTypeId);
157                 }
158             }
159             else
160             {
161                 // wrong receiver
162                 System.err.println("Received message not intended for " + this.modelId + " but for " + receiverId
163                         + " -- not processed");
164             }
165         }
166     }
167 
168     /**
169      * Process FS.1 or FM.5 message and send MC.1 message back.
170      * @param identity reply id for REQ-ROUTER pattern
171      * @param message the message (0 payload fields)
172      * @throws Sim0MQException on error
173      * @throws SerializationException on serialization problem
174      */
175     private void processRequestStatus(final String identity, final Sim0MQMessage message)
176             throws Sim0MQException, SerializationException
177     {
178         if (this.federationRunId == null)
179         {
180             this.federationRunId = message.getFederationId();
181         }
182         String status = "started";
183         if (this.simulator.isStartingOrRunning())
184         {
185             status = "running";
186         }
187         else if (this.simulator.getSimulatorTime() != null && this.simulator.getReplication() != null
188                 && this.simulator.getReplication().getTreatment() != null)
189         {
190             if (this.simulator.getSimulatorTime() >= this.simulator.getReplication().getTreatment().getEndTime())
191             {
192                 status = "ended";
193             }
194             else
195             {
196                 status = "error";
197             }
198         }
199         this.fsSocket.sendMore(identity);
200         this.fsSocket.sendMore("");
201         byte[] mc1Message = new MC1StatusMessage(this.federationRunId, this.modelId, message.getSenderId(),
202                 ++this.messageCount, message.getMessageId(), status, "").createByteArray();
203         this.fsSocket.send(mc1Message, 0);
204 
205         System.out.println("Sent MC.1");
206         System.out.flush();
207     }
208 
209     /**
210      * Process FM.2 message and send MC.2 message back.
211      * @param identity reply id for REQ-ROUTER pattern
212      * @param message the FM.2 message
213      * @throws Sim0MQException on error
214      * @throws SerializationException on serialization problem
215      */
216     private void processSimRunControl(final String identity, final FM2SimRunControlMessage message)
217             throws Sim0MQException, SerializationException
218     {
219         boolean status = true;
220         String error = "";
221         try
222         {
223             Object runDurationField = message.getRunDuration();
224             if (runDurationField instanceof Number)
225             {
226                 this.runDuration = new Duration(((Number) runDurationField).doubleValue(), DurationUnit.SI);
227             }
228             else if (runDurationField instanceof Duration)
229             {
230                 this.runDuration = (Duration) runDurationField;
231             }
232             else
233             {
234                 throw new Sim0MQException("runTimeField " + runDurationField + " neither Number nor Duration");
235             }
236 
237             Object warmupDurationField = message.getWarmupDuration();
238             if (warmupDurationField instanceof Number)
239             {
240                 this.warmupDuration = new Duration(((Number) warmupDurationField).doubleValue(), DurationUnit.SI);
241             }
242             else if (warmupDurationField instanceof Duration)
243             {
244                 this.warmupDuration = (Duration) warmupDurationField;
245             }
246             else
247             {
248                 throw new Sim0MQException("warmupField " + warmupDurationField + " neither Number nor Duration");
249             }
250         }
251         catch (Exception e)
252         {
253             status = false;
254             error = e.getMessage();
255         }
256         byte[] mc2Message = new MC2AckNakMessage(this.federationRunId, this.modelId, message.getSenderId(),
257                 ++this.messageCount, message.getMessageId(), status, error).createByteArray();
258         this.fsSocket.sendMore(identity);
259         this.fsSocket.sendMore("");
260         this.fsSocket.send(mc2Message, 0);
261 
262         System.out.println("Sent MC.2");
263         System.out.flush();
264     }
265 
266     /**
267      * Process FM.3 message and send MC.2 message back.
268      * @param identity reply id for REQ-ROUTER pattern
269      * @param message the FM3 message
270      * @throws Sim0MQException on error
271      * @throws SerializationException on serialization problem
272      */
273     private void processSetParameter(final String identity, final FM3SetParameterMessage message)
274             throws Sim0MQException, SerializationException
275     {
276         boolean status = true;
277         String error = "";
278         try
279         {
280             String parameterName = message.getParameterName();
281             Object parameterValueField = message.getParameterValue();
282 
283             switch (parameterName)
284             {
285                 case "seed":
286                     this.model.seed = ((Number) parameterValueField).longValue();
287                     break;
288 
289                 case "iat":
290                     this.model.iat = ((Number) parameterValueField).doubleValue();
291                     break;
292 
293                 case "servicetime":
294                     this.model.serviceTime = ((Number) parameterValueField).doubleValue();
295                     break;
296 
297                 default:
298                     status = false;
299                     error = "Parameter " + parameterName + " unknown";
300                     break;
301             }
302         }
303         catch (Exception e)
304         {
305             status = false;
306             error = e.getMessage();
307         }
308 
309         byte[] mc2Message = new MC2AckNakMessage(this.federationRunId, this.modelId, message.getSenderId(),
310                 ++this.messageCount, message.getMessageId(), status, error).createByteArray();
311         this.fsSocket.sendMore(identity);
312         this.fsSocket.sendMore("");
313         this.fsSocket.send(mc2Message, 0);
314 
315         System.out.println("Sent MC.2");
316         System.out.flush();
317     }
318 
319     /**
320      * Process FM.4 message and send MC.2 message back.
321      * @param identity reply id for REQ-ROUTER pattern
322      * @param message the FM.4 message
323      * @throws Sim0MQException on error
324      * @throws SerializationException on serialization problem
325      */
326     private void processSimStart(final String identity, final FM4SimStartMessage message)
327             throws Sim0MQException, SerializationException
328     {
329         boolean status = true;
330         String error = "";
331         try
332         {
333             Replication.TimeDouble<DEVSSimulatorInterface.TimeDouble> replication =
334                     Replication.TimeDouble.create("rep1", 0.0, this.warmupDuration.si, this.runDuration.si, this.model);
335             this.simulator.initialize(replication, ReplicationMode.TERMINATING);
336             this.simulator.scheduleEventAbs(100.0, this, this, "terminate", null);
337 
338             this.simulator.start();
339         }
340         catch (Exception e)
341         {
342             status = false;
343             error = e.getMessage();
344         }
345 
346         byte[] mc2Message = new MC2AckNakMessage(this.federationRunId, this.modelId, message.getSenderId(),
347                 ++this.messageCount, message.getMessageId(), status, error).createByteArray();
348         this.fsSocket.sendMore(identity);
349         this.fsSocket.sendMore("");
350         this.fsSocket.send(mc2Message, 0);
351 
352         System.out.println("Sent MC.2");
353         System.out.flush();
354     }
355 
356     /**
357      * Process FM.6 message and send MC.3 or MC.4 message back.
358      * @param identity reply id for REQ-ROUTER pattern
359      * @param message the FM.6 message
360      * @throws Sim0MQException on error
361      * @throws SerializationException on serialization problem
362      */
363     private void processRequestStatistics(final String identity, final FM6RequestStatisticsMessage message)
364             throws Sim0MQException, SerializationException
365     {
366         boolean ok = true;
367         String error = "";
368         String variableName = message.getVariableName();
369         double variableValue = Double.NaN;
370         try
371         {
372             switch (variableName)
373             {
374                 case "dN.average":
375                     variableValue = this.model.dN.getSampleMean();
376                     break;
377 
378                 case "uN.average":
379                     variableValue = this.model.uN.getWeightedSampleMean();
380                     break;
381 
382                 case "qN.max":
383                     variableValue = this.model.qN.getMax();
384                     break;
385 
386                 default:
387                     ok = false;
388                     error = "Parameter " + variableName + " unknown";
389                     break;
390             }
391         }
392         catch (Exception e)
393         {
394             ok = false;
395             error = e.getMessage();
396         }
397 
398         if (Double.isNaN(variableValue))
399         {
400             ok = false;
401             error = "Parameter " + variableName + " not set to a value";
402         }
403 
404         if (ok)
405         {
406             byte[] mc3Message = new MC3StatisticsMessage(this.federationRunId, this.modelId, message.getSenderId(),
407                     ++this.messageCount, variableName, variableValue).createByteArray();
408             this.fsSocket.sendMore(identity);
409             this.fsSocket.sendMore("");
410             this.fsSocket.send(mc3Message, 0);
411 
412             System.out.println("Sent MC.3");
413             System.out.flush();
414         }
415         else
416         {
417             byte[] mc4Message = new MC4StatisticsErrorMessage(this.federationRunId, this.modelId, message.getSenderId(),
418                     ++this.messageCount, variableName, error).createByteArray();
419             this.fsSocket.sendMore(identity);
420             this.fsSocket.sendMore("");
421             this.fsSocket.send(mc4Message, 0);
422 
423             System.out.println("Sent MC.4");
424             System.out.flush();
425         }
426     }
427 
428     /**
429      * Process FS.3 message.
430      */
431     private void processKillFederate()
432     {
433         this.fsSocket.close();
434         this.fsContext.destroy();
435         this.fsContext.close();
436         System.exit(0);
437     }
438 
439     /** stop the simulation. */
440     protected final void terminate()
441     {
442         System.out.println("average queue length = " + this.model.qN.getSampleMean());
443         System.out.println("average queue wait   = " + this.model.dN.getSampleMean());
444         System.out.println("average utilization  = " + this.model.uN.getWeightedSampleMean());
445     }
446 
447     /**
448      * @param args can be left empty
449      * @throws SimRuntimeException on error
450      * @throws RemoteException on error
451      * @throws NamingException on error
452      * @throws Sim0MQException on error
453      * @throws SerializationException on serialization problem
454      */
455     public static void main(final String[] args)
456             throws SimRuntimeException, RemoteException, NamingException, Sim0MQException, SerializationException
457     {
458         if (args.length < 2)
459         {
460             System.err.println("Use as MM1Queue41Application modelId sim0mqPortNumber");
461             System.exit(-1);
462         }
463 
464         System.out.println("Started with args: " + args[0] + " " + args[1]);
465         System.out.flush();
466         
467         String modelId = args[0];
468 
469         String sPort = args[1];
470         int port = 0;
471         try
472         {
473             port = Integer.parseInt(sPort);
474         }
475         catch (NumberFormatException nfe)
476         {
477             System.err.println("Use as FederateStarter modelId portNumber, where portNumber is a number");
478             System.exit(-1);
479         }
480         if (port == 0 || port > 65535)
481         {
482             System.err.println("PortNumber should be between 1 and 65535");
483             System.exit(-1);
484         }
485 
486         new MM1Queue41Application(modelId, port);
487     }
488 
489 }