1   package org.sim0mq.demo.mm1;
2   
3   import java.rmi.RemoteException;
4   
5   import javax.naming.NamingException;
6   
7   import org.djunits.unit.DurationUnit;
8   import org.djunits.value.vdouble.scalar.Duration;
9   import org.djutils.serialization.SerializationException;
10  import org.sim0mq.Sim0MQException;
11  import org.sim0mq.message.MessageStatus;
12  import org.sim0mq.message.MessageUtil;
13  import org.sim0mq.message.SimulationMessage;
14  import org.zeromq.SocketType;
15  import org.zeromq.ZContext;
16  import org.zeromq.ZMQ;
17  
18  import nl.tudelft.simulation.dsol.SimRuntimeException;
19  import nl.tudelft.simulation.dsol.experiment.Replication;
20  import nl.tudelft.simulation.dsol.experiment.ReplicationMode;
21  import nl.tudelft.simulation.dsol.simulators.DEVSSimulator;
22  import nl.tudelft.simulation.dsol.simulators.DEVSSimulatorInterface;
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  public class MM1Queue41Application
35  {
36      
37      private DEVSSimulator.TimeDouble simulator;
38  
39      
40      private MM1Queue41Model model;
41  
42      
43      private ZMQ.Socket fsSocket;
44  
45      
46      private ZContext fsContext;
47  
48      
49      private Object federationRunId;
50  
51      
52      private String modelId;
53  
54      
55      private Duration runTime;
56  
57      
58      private Duration warmupTime;
59  
60      
61      private long messageCount = 0;
62  
63      
64  
65  
66  
67  
68  
69  
70  
71  
72  
73      protected MM1Queue41Application(final String modelId, final int port)
74              throws SimRuntimeException, RemoteException, NamingException, Sim0MQException, SerializationException
75      {
76          this.simulator = new DEVSSimulator.TimeDouble();
77          this.modelId = modelId.trim();
78          this.model = new MM1Queue41Model(this.simulator);
79          startListener(port);
80      }
81  
82      
83  
84  
85  
86  
87  
88      protected void startListener(final int port) throws Sim0MQException, SerializationException
89      {
90          this.fsContext = new ZContext(1);
91  
92          this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
93          this.fsSocket.bind("tcp://*:" + port);
94  
95          System.out.println("Model started. Listening at port: " + port);
96          System.out.flush();
97  
98          while (!Thread.currentThread().isInterrupted())
99          {
100             
101             String identity = this.fsSocket.recvStr();
102             this.fsSocket.recvStr();
103 
104             byte[] request = this.fsSocket.recv(0);
105             System.out.println(MessageUtil.printBytes(request));
106             Object[] fields = SimulationMessage.decode(request);
107 
108             System.out.println("Received " + SimulationMessage.print(fields));
109             System.out.flush();
110 
111             this.federationRunId = fields[1];
112             String senderId = fields[2].toString();
113             String receiverId = fields[3].toString();
114             String messageId = fields[4].toString();
115             long uniqueId = ((Long) fields[5]).longValue();
116 
117             if (receiverId.equals(this.modelId))
118             {
119                 System.err.println("Received: " + messageId + ", payload = " + SimulationMessage.listPayload(fields));
120                 switch (messageId)
121                 {
122                     case "FS.1":
123                     case "FM.5":
124                         processRequestStatus(identity, senderId, uniqueId);
125                         break;
126 
127                     case "FM.2":
128                         processSimRunControl(identity, senderId, uniqueId, fields);
129                         break;
130 
131                     case "FM.3":
132                         processSetParameter(identity, senderId, uniqueId, fields);
133                         break;
134 
135                     case "FM.4":
136                         processSimStart(identity, senderId, uniqueId);
137                         break;
138 
139                     case "FM.6":
140                         processRequestStatistics(identity, senderId, uniqueId, fields);
141                         break;
142 
143                     case "FS.3":
144                         processKillFederate();
145                         break;
146 
147                     default:
148                         
149                         System.err.println("Received unknown message -- not processed: " + messageId);
150                 }
151             }
152             else
153             {
154                 
155                 System.err.println(
156                         "Received message not intended for " + this.modelId + " but for " + receiverId + " -- not processed: ");
157             }
158         }
159     }
160 
161     
162 
163 
164 
165 
166 
167 
168 
169     private void processRequestStatus(final String identity, final String receiverId, final long replyToMessageId)
170             throws Sim0MQException, SerializationException
171     {
172         String status = "started";
173         if (this.simulator.isRunning())
174         {
175             status = "running";
176         }
177         else if (this.simulator.getSimulatorTime() != null && this.simulator.getReplication() != null
178                 && this.simulator.getReplication().getTreatment() != null)
179         {
180             if (this.simulator.getSimulatorTime() >= this.simulator.getReplication().getTreatment().getEndTime())
181             {
182                 status = "ended";
183             }
184             else
185             {
186                 status = "error";
187             }
188         }
189         this.fsSocket.sendMore(identity);
190         this.fsSocket.sendMore("");
191         byte[] mc1Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.1",
192                 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, "");
193         this.fsSocket.send(mc1Message, 0);
194 
195         System.out.println("Sent MC.1");
196         System.out.flush();
197     }
198 
199     
200 
201 
202 
203 
204 
205 
206 
207 
208     private void processSimRunControl(final String identity, final String receiverId, final long replyToMessageId,
209             final Object[] fields) throws Sim0MQException, SerializationException
210     {
211         boolean status = true;
212         String error = "";
213         try
214         {
215             Object runTimeField = fields[8];
216             if (runTimeField instanceof Number)
217             {
218                 this.runTime = new Duration(((Number) fields[8]).doubleValue(), DurationUnit.SI);
219             }
220             else if (runTimeField instanceof Duration)
221             {
222                 this.runTime = (Duration) runTimeField;
223             }
224             else
225             {
226                 throw new Sim0MQException("runTimeField " + runTimeField + " neither Number nor Duration");
227             }
228 
229             Object warmupField = fields[8];
230             if (warmupField instanceof Number)
231             {
232                 this.warmupTime = new Duration(((Number) fields[9]).doubleValue(), DurationUnit.SI);
233             }
234             else if (warmupField instanceof Duration)
235             {
236                 this.warmupTime = (Duration) warmupField;
237             }
238             else
239             {
240                 throw new Sim0MQException("warmupField " + warmupField + " neither Number nor Duration");
241             }
242         }
243         catch (Exception e)
244         {
245             status = false;
246             error = e.getMessage();
247         }
248         byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
249                 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
250         this.fsSocket.sendMore(identity);
251         this.fsSocket.sendMore("");
252         this.fsSocket.send(mc2Message, 0);
253 
254         System.out.println("Sent MC.2");
255         System.out.flush();
256     }
257 
258     
259 
260 
261 
262 
263 
264 
265 
266     private void processSimStart(final String identity, final String receiverId, final long replyToMessageId)
267             throws Sim0MQException, SerializationException
268     {
269         boolean status = true;
270         String error = "";
271         try
272         {
273             Replication.TimeDouble<DEVSSimulatorInterface.TimeDouble> replication =
274                     Replication.TimeDouble.create("rep1", 0.0, this.warmupTime.si, this.runTime.si, this.model);
275             this.simulator.initialize(replication, ReplicationMode.TERMINATING);
276             this.simulator.scheduleEventAbs(100.0, this, this, "terminate", null);
277 
278             this.simulator.start();
279         }
280         catch (Exception e)
281         {
282             status = false;
283             error = e.getMessage();
284         }
285 
286         byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
287                 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
288         this.fsSocket.sendMore(identity);
289         this.fsSocket.sendMore("");
290         this.fsSocket.send(mc2Message, 0);
291 
292         System.out.println("Sent MC.2");
293         System.out.flush();
294     }
295 
296     
297 
298 
299 
300 
301 
302 
303 
304 
305     private void processSetParameter(final String identity, final String receiverId, final long replyToMessageId,
306             final Object[] fields) throws Sim0MQException, SerializationException
307     {
308         boolean status = true;
309         String error = "";
310         try
311         {
312             String parameterName = fields[8].toString();
313             Object parameterValueField = fields[9];
314 
315             switch (parameterName)
316             {
317                 case "seed":
318                     this.model.seed = ((Number) parameterValueField).longValue();
319                     break;
320 
321                 case "iat":
322                     this.model.iat = ((Number) parameterValueField).doubleValue();
323                     break;
324 
325                 case "servicetime":
326                     this.model.serviceTime = ((Number) parameterValueField).doubleValue();
327                     break;
328 
329                 default:
330                     status = false;
331                     error = "Parameter " + parameterName + " unknown";
332                     break;
333             }
334         }
335         catch (Exception e)
336         {
337             status = false;
338             error = e.getMessage();
339         }
340 
341         byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
342                 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
343         this.fsSocket.sendMore(identity);
344         this.fsSocket.sendMore("");
345         this.fsSocket.send(mc2Message, 0);
346 
347         System.out.println("Sent MC.2");
348         System.out.flush();
349     }
350 
351     
352 
353 
354 
355 
356 
357 
358 
359 
360     private void processRequestStatistics(final String identity, final String receiverId, final long replyToMessageId,
361             final Object[] fields) throws Sim0MQException, SerializationException
362     {
363         boolean ok = true;
364         String error = "";
365         String variableName = fields[8].toString();
366         double variableValue = Double.NaN;
367         try
368         {
369             switch (variableName)
370             {
371                 case "dN.average":
372                     variableValue = this.model.dN.getSampleMean();
373                     break;
374 
375                 case "uN.average":
376                     variableValue = this.model.uN.getSampleMean();
377                     break;
378 
379                 case "qN.max":
380                     variableValue = this.model.qN.getMax();
381                     break;
382 
383                 default:
384                     ok = false;
385                     error = "Parameter " + variableName + " unknown";
386                     break;
387             }
388         }
389         catch (Exception e)
390         {
391             ok = false;
392             error = e.getMessage();
393         }
394 
395         if (Double.isNaN(variableValue))
396         {
397             ok = false;
398             error = "Parameter " + variableName + " not set to a value";
399         }
400 
401         if (ok)
402         {
403             byte[] mc3Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.3",
404                     ++this.messageCount, MessageStatus.NEW, replyToMessageId, variableName, variableValue);
405             this.fsSocket.sendMore(identity);
406             this.fsSocket.sendMore("");
407             this.fsSocket.send(mc3Message, 0);
408 
409             System.out.println("Sent MC.3");
410             System.out.flush();
411         }
412         else
413         {
414             byte[] mc4Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.4",
415                     ++this.messageCount, MessageStatus.NEW, replyToMessageId, ok, error);
416             this.fsSocket.sendMore(identity);
417             this.fsSocket.sendMore("");
418             this.fsSocket.send(mc4Message, 0);
419 
420             System.out.println("Sent MC.4");
421             System.out.flush();
422         }
423     }
424 
425     
426 
427 
428     private void processKillFederate()
429     {
430         this.fsSocket.close();
431         this.fsContext.destroy();
432         this.fsContext.close();
433         System.exit(0);
434     }
435 
436     
437     protected final void terminate()
438     {
439         System.out.println("average queue length = " + this.model.qN.getSampleMean());
440         System.out.println("average queue wait   = " + this.model.dN.getSampleMean());
441         System.out.println("average utilization  = " + this.model.uN.getSampleMean());
442     }
443 
444     
445 
446 
447 
448 
449 
450 
451 
452     public static void main(final String[] args)
453             throws SimRuntimeException, RemoteException, NamingException, Sim0MQException, SerializationException
454     {
455         if (args.length < 2)
456         {
457             System.err.println("Use as MM1Queue41Application modelId sim0mqPortNumber");
458             System.exit(-1);
459         }
460 
461         String modelId = args[0];
462 
463         String sPort = args[1];
464         int port = 0;
465         try
466         {
467             port = Integer.parseInt(sPort);
468         }
469         catch (NumberFormatException nfe)
470         {
471             System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
472             System.exit(-1);
473         }
474         if (port == 0 || port > 65535)
475         {
476             System.err.println("PortNumber should be between 1 and 65535");
477             System.exit(-1);
478         }
479 
480         new MM1Queue41Application(modelId, port);
481     }
482 
483 }