View Javadoc
1   package org.sim0mq.demo.mm1;
2   
3   import java.rmi.RemoteException;
4   
5   import javax.naming.NamingException;
6   
7   import org.djunits.unit.DurationUnit;
8   import org.djunits.value.vdouble.scalar.Duration;
9   import org.sim0mq.Sim0MQException;
10  import org.sim0mq.message.MessageStatus;
11  import org.sim0mq.message.SimulationMessage;
12  import org.sim0mq.message.TypedMessage;
13  import org.zeromq.ZMQ;
14  
15  import nl.tudelft.simulation.dsol.SimRuntimeException;
16  import nl.tudelft.simulation.dsol.experiment.Replication;
17  import nl.tudelft.simulation.dsol.experiment.ReplicationMode;
18  import nl.tudelft.simulation.dsol.simtime.SimTimeDouble;
19  import nl.tudelft.simulation.dsol.simulators.DEVSSimulator;
20  
21  /**
22   * <p>
23   * Copyright (c) 2002-2014 Delft University of Technology, Jaffalaan 5, 2628 BX Delft, the Netherlands. All rights reserved.
24   * <p>
25   * See for project information <a href="http://www.simulation.tudelft.nl/"> www.simulation.tudelft.nl</a>.
26   * <p>
27   * The DSOL project is distributed under a BSD-style license.<br>
28   * @version Aug 15, 2014 <br>
29   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
30   */
31  public class MM1Queue41Application
32  {
33      /** */
34      private DEVSSimulator.TimeDouble simulator;
35  
36      /** */
37      private MM1Queue41Model model;
38  
39      /** the socket. */
40      private ZMQ.Socket fsSocket;
41  
42      /** the context. */
43      private ZMQ.Context fsContext;
44  
45      /** federation run id. */
46      private Object federationRunId;
47  
48      /** modelId unique Id of the model that is used as the sender/receiver when communicating. */
49      private String modelId;
50  
51      /** runtime. */
52      private Duration runTime;
53  
54      /** warmup. */
55      private Duration warmupTime;
56  
57      /** message count. */
58      private long messageCount = 0;
59  
60      /**
61       * Construct a console application.
62       * @param modelId unique Id of the model that is used as the sender/receiver when communicating
63       * @param port the sim0mq port number on which the model listens
64       * @throws SimRuntimeException on error
65       * @throws RemoteException on error
66       * @throws NamingException on error
67       * @throws Sim0MQException on error
68       */
69      protected MM1Queue41Application(final String modelId, final int port)
70              throws SimRuntimeException, RemoteException, NamingException, Sim0MQException
71      {
72          this.modelId = modelId.trim();
73          this.model = new MM1Queue41Model();
74          this.simulator = new DEVSSimulator.TimeDouble();
75          startListener(port);
76      }
77  
78      /**
79       * Start listening on a port.
80       * @param port the sim0mq port number on which the model listens
81       * @throws Sim0MQException on error
82       */
83      protected void startListener(final int port) throws Sim0MQException
84      {
85          this.fsContext = ZMQ.context(1);
86  
87          this.fsSocket = this.fsContext.socket(ZMQ.ROUTER);
88          this.fsSocket.bind("tcp://*:" + port);
89  
90          System.out.println("Model started. Listening at port: " + port);
91          System.out.flush();
92  
93          while (!Thread.currentThread().isInterrupted())
94          {
95              // Wait for next request from the client -- first the identity (String) and the delimiter (#0)
96              String identity = this.fsSocket.recvStr();
97              this.fsSocket.recvStr();
98  
99              byte[] request = this.fsSocket.recv(0);
100             System.out.println(TypedMessage.printBytes(request));
101             Object[] fields = SimulationMessage.decode(request);
102 
103             System.out.println("Received " + SimulationMessage.print(fields));
104             System.out.flush();
105 
106             this.federationRunId = fields[1];
107             String senderId = fields[2].toString();
108             String receiverId = fields[3].toString();
109             String messageId = fields[4].toString();
110             long uniqueId = ((Long) fields[5]).longValue();
111 
112             if (receiverId.equals(this.modelId))
113             {
114                 System.err.println("Received: " + messageId + ", payload = " + SimulationMessage.listPayload(fields));
115                 switch (messageId)
116                 {
117                     case "FS.1":
118                     case "FM.5":
119                         processRequestStatus(identity, senderId, uniqueId);
120                         break;
121 
122                     case "FM.2":
123                         processSimRunControl(identity, senderId, uniqueId, fields);
124                         break;
125 
126                     case "FM.3":
127                         processSetParameter(identity, senderId, uniqueId, fields);
128                         break;
129 
130                     case "FM.4":
131                         processSimStart(identity, senderId, uniqueId);
132                         break;
133 
134                     case "FM.6":
135                         processRequestStatistics(identity, senderId, uniqueId, fields);
136                         break;
137 
138                     case "FS.3":
139                         processKillFederate();
140                         break;
141 
142                     default:
143                         // wrong message
144                         System.err.println("Received unknown message -- not processed: " + messageId);
145                 }
146             }
147             else
148             {
149                 // wrong receiver
150                 System.err.println(
151                         "Received message not intended for " + this.modelId + " but for " + receiverId + " -- not processed: ");
152             }
153         }
154     }
155     
156     /**
157      * Process FS.1 message and send MC.1 message back.
158      * @param identity reply id for REQ-ROUTER pattern
159      * @param receiverId the receiver of the response
160      * @param replyToMessageId the message to which this is the reply
161      * @throws Sim0MQException on error
162      */
163     private void processRequestStatus(final String identity, final String receiverId, final long replyToMessageId)
164             throws Sim0MQException
165     {
166         String status = "started";
167         if (this.simulator.isRunning())
168         {
169             status = "running";
170         }
171         else if (this.simulator.getSimulatorTime() != null && this.simulator.getReplication() != null
172                 && this.simulator.getReplication().getTreatment() != null)
173         {
174             if (this.simulator.getSimulatorTime().ge(this.simulator.getReplication().getTreatment().getEndTime()))
175             {
176                 status = "ended";
177             }
178             else
179             {
180                 status = "error";
181             }
182         }
183         this.fsSocket.sendMore(identity);
184         this.fsSocket.sendMore("");
185         byte[] mc1Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.1",
186                 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, "");
187         this.fsSocket.send(mc1Message, 0);
188         
189         System.out.println("Sent MC.1");
190         System.out.flush();
191     }
192 
193     /**
194      * Process FM.2 message and send MC.2 message back.
195      * @param identity reply id for REQ-ROUTER pattern
196      * @param receiverId the receiver of the response
197      * @param replyToMessageId the message to which this is the reply
198      * @param fields the message
199      * @throws Sim0MQException on error
200      */
201     private void processSimRunControl(final String identity, final String receiverId, final long replyToMessageId,
202             final Object[] fields) throws Sim0MQException
203     {
204         boolean status = true;
205         String error = "";
206         try
207         {
208             Object runTimeField = fields[8];
209             if (runTimeField instanceof Number)
210             {
211                 this.runTime = new Duration(((Number) fields[8]).doubleValue(), DurationUnit.SI);
212             }
213             else if (runTimeField instanceof Duration)
214             {
215                 this.runTime = (Duration) runTimeField;
216             }
217             else
218             {
219                 throw new Sim0MQException("runTimeField " + runTimeField + " neither Number nor Duration");
220             }
221 
222             Object warmupField = fields[8];
223             if (warmupField instanceof Number)
224             {
225                 this.warmupTime = new Duration(((Number) fields[9]).doubleValue(), DurationUnit.SI);
226             }
227             else if (warmupField instanceof Duration)
228             {
229                 this.warmupTime = (Duration) warmupField;
230             }
231             else
232             {
233                 throw new Sim0MQException("warmupField " + warmupField + " neither Number nor Duration");
234             }
235         }
236         catch (Exception e)
237         {
238             status = false;
239             error = e.getMessage();
240         }
241         byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
242                 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
243         this.fsSocket.sendMore(identity);
244         this.fsSocket.sendMore("");
245         this.fsSocket.send(mc2Message, 0);
246 
247         System.out.println("Sent MC.2");
248         System.out.flush();
249     }
250 
251     /**
252      * Process FM.3 message and send MC.2 message back.
253      * @param identity reply id for REQ-ROUTER pattern
254      * @param receiverId the receiver of the response
255      * @param replyToMessageId the message to which this is the reply
256      * @throws Sim0MQException on error
257      */
258     private void processSimStart(final String identity, final String receiverId, final long replyToMessageId)
259             throws Sim0MQException
260     {
261         boolean status = true;
262         String error = "";
263         try
264         {
265             Replication<Double, Double, SimTimeDouble> replication =
266                     new Replication<>("rep1", new SimTimeDouble(0.0), this.warmupTime.si, this.runTime.si, this.model);
267             this.simulator.initialize(replication, ReplicationMode.TERMINATING);
268             this.simulator.scheduleEventAbs(100.0, this, this, "terminate", null);
269 
270             this.simulator.start();
271         }
272         catch (Exception e)
273         {
274             status = false;
275             error = e.getMessage();
276         }
277 
278         byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
279                 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
280         this.fsSocket.sendMore(identity);
281         this.fsSocket.sendMore("");
282         this.fsSocket.send(mc2Message, 0);
283         
284         System.out.println("Sent MC.2");
285         System.out.flush();
286     }
287 
288     /**
289      * Process FM.4 message and send MC.2 message back.
290      * @param identity reply id for REQ-ROUTER pattern
291      * @param receiverId the receiver of the response
292      * @param replyToMessageId the message to which this is the reply
293      * @param fields the message
294      * @throws Sim0MQException on error
295      */
296     private void processSetParameter(final String identity, final String receiverId, final long replyToMessageId,
297             final Object[] fields) throws Sim0MQException
298     {
299         boolean status = true;
300         String error = "";
301         try
302         {
303             String parameterName = fields[8].toString();
304             Object parameterValueField = fields[9];
305 
306             switch (parameterName)
307             {
308                 case "seed":
309                     this.model.seed= ((Number) parameterValueField).longValue();
310                     break;
311 
312                 case "iat":
313                     this.model.iat = ((Number) parameterValueField).doubleValue();
314                     break;
315 
316                 case "servicetime":
317                     this.model.serviceTime = ((Number) parameterValueField).doubleValue();
318                     break;
319 
320                 default:
321                     status = false;
322                     error = "Parameter " + parameterName + " unknown";
323                     break;
324             }
325         }
326         catch (Exception e)
327         {
328             status = false;
329             error = e.getMessage();
330         }
331 
332         byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
333                 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
334         this.fsSocket.sendMore(identity);
335         this.fsSocket.sendMore("");
336         this.fsSocket.send(mc2Message, 0);
337         
338         System.out.println("Sent MC.2");
339         System.out.flush();
340     }
341 
342     /**
343      * Process FM.5 message and send MC.3 or MC.4 message back.
344      * @param identity reply id for REQ-ROUTER pattern
345      * @param receiverId the receiver of the response
346      * @param replyToMessageId the message to which this is the reply
347      * @param fields the message
348      * @throws Sim0MQException on error
349      */
350     private void processRequestStatistics(final String identity, final String receiverId, final long replyToMessageId,
351             final Object[] fields) throws Sim0MQException
352     {
353         boolean ok = true;
354         String error = "";
355         String variableName = fields[8].toString();
356         double variableValue = Double.NaN;
357         try
358         {
359             switch (variableName)
360             {
361                 case "dN.average":
362                     variableValue = this.model.dN.getSampleMean();
363                     break;
364 
365                 case "uN.average":
366                     variableValue = this.model.uN.getSampleMean();
367                     break;
368 
369                 case "qN.max":
370                     variableValue = this.model.qN.getMax();
371                     break;
372 
373                 default:
374                     ok = false;
375                     error = "Parameter " + variableName + " unknown";
376                     break;
377             }
378         }
379         catch (Exception e)
380         {
381             ok = false;
382             error = e.getMessage();
383         }
384 
385         if (Double.isNaN(variableValue))
386         {
387             ok = false;
388             error = "Parameter " + variableName + " not set to a value";
389         }
390 
391         if (ok)
392         {
393             byte[] mc3Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.3",
394                     ++this.messageCount, MessageStatus.NEW, replyToMessageId, variableName, variableValue);
395             this.fsSocket.sendMore(identity);
396             this.fsSocket.sendMore("");
397             this.fsSocket.send(mc3Message, 0);
398 
399             System.out.println("Sent MC.3");
400             System.out.flush();
401 }
402         else
403         {
404             byte[] mc4Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.4",
405                     ++this.messageCount, MessageStatus.NEW, replyToMessageId, ok, error);
406             this.fsSocket.sendMore(identity);
407             this.fsSocket.sendMore("");
408             this.fsSocket.send(mc4Message, 0);
409             
410             System.out.println("Sent MC.4");
411             System.out.flush();
412         }
413     }
414 
415     /**
416      * Process FS.3 message.
417      */
418     private void processKillFederate()
419     {
420         this.fsSocket.close();
421         this.fsContext.term();
422         System.exit(0);
423     }
424 
425     /** stop the simulation. */
426     protected final void terminate()
427     {
428         System.out.println("average queue length = " + this.model.qN.getSampleMean());
429         System.out.println("average queue wait   = " + this.model.dN.getSampleMean());
430         System.out.println("average utilization  = " + this.model.uN.getSampleMean());
431     }
432 
433     /**
434      * @param args can be left empty
435      * @throws SimRuntimeException on error
436      * @throws RemoteException on error
437      * @throws NamingException on error
438      * @throws Sim0MQException on error
439      */
440     public static void main(final String[] args) throws SimRuntimeException, RemoteException, NamingException, Sim0MQException
441     {
442         if (args.length < 2)
443         {
444             System.err.println("Use as MM1Queue41Application modelId sim0mqPortNumber");
445             System.exit(-1);
446         }
447 
448         String modelId = args[0];
449 
450         String sPort = args[1];
451         int port = 0;
452         try
453         {
454             port = Integer.parseInt(sPort);
455         }
456         catch (NumberFormatException nfe)
457         {
458             System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
459             System.exit(-1);
460         }
461         if (port == 0 || port > 65535)
462         {
463             System.err.println("PortNumber should be between 1 and 65535");
464             System.exit(-1);
465         }
466 
467         new MM1Queue41Application(modelId, port);
468     }
469 
470 }