1 package org.sim0mq.demo.mm1;
2
3 import java.rmi.RemoteException;
4
5 import javax.naming.NamingException;
6
7 import org.djunits.unit.DurationUnit;
8 import org.djunits.value.vdouble.scalar.Duration;
9 import org.djutils.serialization.SerializationException;
10 import org.sim0mq.Sim0MQException;
11 import org.sim0mq.message.MessageUtil;
12 import org.sim0mq.message.Sim0MQMessage;
13 import org.sim0mq.message.federatestarter.FS1RequestStatusMessage;
14 import org.sim0mq.message.federationmanager.FM2SimRunControlMessage;
15 import org.sim0mq.message.federationmanager.FM3SetParameterMessage;
16 import org.sim0mq.message.federationmanager.FM4SimStartMessage;
17 import org.sim0mq.message.federationmanager.FM5RequestStatus;
18 import org.sim0mq.message.federationmanager.FM6RequestStatisticsMessage;
19 import org.sim0mq.message.modelcontroller.MC1StatusMessage;
20 import org.sim0mq.message.modelcontroller.MC2AckNakMessage;
21 import org.sim0mq.message.modelcontroller.MC3StatisticsMessage;
22 import org.sim0mq.message.modelcontroller.MC4StatisticsErrorMessage;
23 import org.zeromq.SocketType;
24 import org.zeromq.ZContext;
25 import org.zeromq.ZMQ;
26
27 import nl.tudelft.simulation.dsol.SimRuntimeException;
28 import nl.tudelft.simulation.dsol.experiment.Replication;
29 import nl.tudelft.simulation.dsol.experiment.ReplicationMode;
30 import nl.tudelft.simulation.dsol.simulators.DEVSSimulator;
31 import nl.tudelft.simulation.dsol.simulators.DEVSSimulatorInterface;
32
33
34
35
36
37
38
39
40
41
42
43 public class MM1Queue41Application
44 {
45
46 private DEVSSimulator.TimeDouble simulator;
47
48
49 private MM1Queue41Model model;
50
51
52 private ZMQ.Socket fsSocket;
53
54
55 private ZContext fsContext;
56
57
58 private Object federationRunId;
59
60
61 private Object modelId;
62
63
64 private Duration runDuration;
65
66
67 private Duration warmupDuration;
68
69
70 private long messageCount = 0;
71
72
73
74
75
76
77
78
79
80
81
82 protected MM1Queue41Application(final String modelId, final int port)
83 throws SimRuntimeException, RemoteException, NamingException, Sim0MQException, SerializationException
84 {
85 this.simulator = new DEVSSimulator.TimeDouble(modelId + ".simulator");
86 this.modelId = modelId.trim();
87 this.model = new MM1Queue41Model(this.simulator);
88 startListener(port);
89 }
90
91
92
93
94
95
96
97 protected void startListener(final int port) throws Sim0MQException, SerializationException
98 {
99 this.fsContext = new ZContext(1);
100
101 this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
102 this.fsSocket.bind("tcp://*:" + port);
103
104 System.out.println("Model started. Listening at port: " + port);
105 System.out.flush();
106
107 while (!Thread.currentThread().isInterrupted())
108 {
109
110 String identity = this.fsSocket.recvStr();
111 this.fsSocket.recvStr();
112
113 byte[] request = this.fsSocket.recv(0);
114 System.out.println(MessageUtil.printBytes(request));
115 Object[] fields = Sim0MQMessage.decodeToArray(request);
116 Object receiverId = fields[4];
117 Object messageTypeId = fields[5];
118
119 System.out.println("Received " + Sim0MQMessage.print(fields));
120 System.out.flush();
121
122 if (receiverId.equals(this.modelId))
123 {
124 switch (messageTypeId.toString())
125 {
126 case "FS.1":
127 processRequestStatus(identity, new FS1RequestStatusMessage(fields));
128 break;
129
130 case "FM.5":
131 processRequestStatus(identity, new FM5RequestStatus(fields));
132 break;
133
134 case "FM.2":
135 processSimRunControl(identity, new FM2SimRunControlMessage(fields));
136 break;
137
138 case "FM.3":
139 processSetParameter(identity, new FM3SetParameterMessage(fields));
140 break;
141
142 case "FM.4":
143 processSimStart(identity, new FM4SimStartMessage(fields));
144 break;
145
146 case "FM.6":
147 processRequestStatistics(identity, new FM6RequestStatisticsMessage(fields));
148 break;
149
150 case "FS.3":
151 processKillFederate();
152 break;
153
154 default:
155
156 System.err.println("Received unknown message -- not processed: " + messageTypeId);
157 }
158 }
159 else
160 {
161
162 System.err.println("Received message not intended for " + this.modelId + " but for " + receiverId
163 + " -- not processed");
164 }
165 }
166 }
167
168
169
170
171
172
173
174
175 private void processRequestStatus(final String identity, final Sim0MQMessage message)
176 throws Sim0MQException, SerializationException
177 {
178 if (this.federationRunId == null)
179 {
180 this.federationRunId = message.getFederationId();
181 }
182 String status = "started";
183 if (this.simulator.isStartingOrRunning())
184 {
185 status = "running";
186 }
187 else if (this.simulator.getSimulatorTime() != null && this.simulator.getReplication() != null
188 && this.simulator.getReplication().getTreatment() != null)
189 {
190 if (this.simulator.getSimulatorTime() >= this.simulator.getReplication().getTreatment().getEndTime())
191 {
192 status = "ended";
193 }
194 else
195 {
196 status = "error";
197 }
198 }
199 this.fsSocket.sendMore(identity);
200 this.fsSocket.sendMore("");
201 byte[] mc1Message = new MC1StatusMessage(this.federationRunId, this.modelId, message.getSenderId(),
202 ++this.messageCount, message.getMessageId(), status, "").createByteArray();
203 this.fsSocket.send(mc1Message, 0);
204
205 System.out.println("Sent MC.1");
206 System.out.flush();
207 }
208
209
210
211
212
213
214
215
216 private void processSimRunControl(final String identity, final FM2SimRunControlMessage message)
217 throws Sim0MQException, SerializationException
218 {
219 boolean status = true;
220 String error = "";
221 try
222 {
223 Object runDurationField = message.getRunDuration();
224 if (runDurationField instanceof Number)
225 {
226 this.runDuration = new Duration(((Number) runDurationField).doubleValue(), DurationUnit.SI);
227 }
228 else if (runDurationField instanceof Duration)
229 {
230 this.runDuration = (Duration) runDurationField;
231 }
232 else
233 {
234 throw new Sim0MQException("runTimeField " + runDurationField + " neither Number nor Duration");
235 }
236
237 Object warmupDurationField = message.getWarmupDuration();
238 if (warmupDurationField instanceof Number)
239 {
240 this.warmupDuration = new Duration(((Number) warmupDurationField).doubleValue(), DurationUnit.SI);
241 }
242 else if (warmupDurationField instanceof Duration)
243 {
244 this.warmupDuration = (Duration) warmupDurationField;
245 }
246 else
247 {
248 throw new Sim0MQException("warmupField " + warmupDurationField + " neither Number nor Duration");
249 }
250 }
251 catch (Exception e)
252 {
253 status = false;
254 error = e.getMessage();
255 }
256 byte[] mc2Message = new MC2AckNakMessage(this.federationRunId, this.modelId, message.getSenderId(),
257 ++this.messageCount, message.getMessageId(), status, error).createByteArray();
258 this.fsSocket.sendMore(identity);
259 this.fsSocket.sendMore("");
260 this.fsSocket.send(mc2Message, 0);
261
262 System.out.println("Sent MC.2");
263 System.out.flush();
264 }
265
266
267
268
269
270
271
272
273 private void processSetParameter(final String identity, final FM3SetParameterMessage message)
274 throws Sim0MQException, SerializationException
275 {
276 boolean status = true;
277 String error = "";
278 try
279 {
280 String parameterName = message.getParameterName();
281 Object parameterValueField = message.getParameterValue();
282
283 switch (parameterName)
284 {
285 case "seed":
286 this.model.seed = ((Number) parameterValueField).longValue();
287 break;
288
289 case "iat":
290 this.model.iat = ((Number) parameterValueField).doubleValue();
291 break;
292
293 case "servicetime":
294 this.model.serviceTime = ((Number) parameterValueField).doubleValue();
295 break;
296
297 default:
298 status = false;
299 error = "Parameter " + parameterName + " unknown";
300 break;
301 }
302 }
303 catch (Exception e)
304 {
305 status = false;
306 error = e.getMessage();
307 }
308
309 byte[] mc2Message = new MC2AckNakMessage(this.federationRunId, this.modelId, message.getSenderId(),
310 ++this.messageCount, message.getMessageId(), status, error).createByteArray();
311 this.fsSocket.sendMore(identity);
312 this.fsSocket.sendMore("");
313 this.fsSocket.send(mc2Message, 0);
314
315 System.out.println("Sent MC.2");
316 System.out.flush();
317 }
318
319
320
321
322
323
324
325
326 private void processSimStart(final String identity, final FM4SimStartMessage message)
327 throws Sim0MQException, SerializationException
328 {
329 boolean status = true;
330 String error = "";
331 try
332 {
333 Replication.TimeDouble<DEVSSimulatorInterface.TimeDouble> replication =
334 Replication.TimeDouble.create("rep1", 0.0, this.warmupDuration.si, this.runDuration.si, this.model);
335 this.simulator.initialize(replication, ReplicationMode.TERMINATING);
336 this.simulator.scheduleEventAbs(100.0, this, this, "terminate", null);
337
338 this.simulator.start();
339 }
340 catch (Exception e)
341 {
342 status = false;
343 error = e.getMessage();
344 }
345
346 byte[] mc2Message = new MC2AckNakMessage(this.federationRunId, this.modelId, message.getSenderId(),
347 ++this.messageCount, message.getMessageId(), status, error).createByteArray();
348 this.fsSocket.sendMore(identity);
349 this.fsSocket.sendMore("");
350 this.fsSocket.send(mc2Message, 0);
351
352 System.out.println("Sent MC.2");
353 System.out.flush();
354 }
355
356
357
358
359
360
361
362
363 private void processRequestStatistics(final String identity, final FM6RequestStatisticsMessage message)
364 throws Sim0MQException, SerializationException
365 {
366 boolean ok = true;
367 String error = "";
368 String variableName = message.getVariableName();
369 double variableValue = Double.NaN;
370 try
371 {
372 switch (variableName)
373 {
374 case "dN.average":
375 variableValue = this.model.dN.getSampleMean();
376 break;
377
378 case "uN.average":
379 variableValue = this.model.uN.getWeightedSampleMean();
380 break;
381
382 case "qN.max":
383 variableValue = this.model.qN.getMax();
384 break;
385
386 default:
387 ok = false;
388 error = "Parameter " + variableName + " unknown";
389 break;
390 }
391 }
392 catch (Exception e)
393 {
394 ok = false;
395 error = e.getMessage();
396 }
397
398 if (Double.isNaN(variableValue))
399 {
400 ok = false;
401 error = "Parameter " + variableName + " not set to a value";
402 }
403
404 if (ok)
405 {
406 byte[] mc3Message = new MC3StatisticsMessage(this.federationRunId, this.modelId, message.getSenderId(),
407 ++this.messageCount, variableName, variableValue).createByteArray();
408 this.fsSocket.sendMore(identity);
409 this.fsSocket.sendMore("");
410 this.fsSocket.send(mc3Message, 0);
411
412 System.out.println("Sent MC.3");
413 System.out.flush();
414 }
415 else
416 {
417 byte[] mc4Message = new MC4StatisticsErrorMessage(this.federationRunId, this.modelId, message.getSenderId(),
418 ++this.messageCount, variableName, error).createByteArray();
419 this.fsSocket.sendMore(identity);
420 this.fsSocket.sendMore("");
421 this.fsSocket.send(mc4Message, 0);
422
423 System.out.println("Sent MC.4");
424 System.out.flush();
425 }
426 }
427
428
429
430
431 private void processKillFederate()
432 {
433 this.fsSocket.close();
434 this.fsContext.destroy();
435 this.fsContext.close();
436 System.exit(0);
437 }
438
439
440 protected final void terminate()
441 {
442 System.out.println("average queue length = " + this.model.qN.getSampleMean());
443 System.out.println("average queue wait = " + this.model.dN.getSampleMean());
444 System.out.println("average utilization = " + this.model.uN.getWeightedSampleMean());
445 }
446
447
448
449
450
451
452
453
454
455 public static void main(final String[] args)
456 throws SimRuntimeException, RemoteException, NamingException, Sim0MQException, SerializationException
457 {
458 if (args.length < 2)
459 {
460 System.err.println("Use as MM1Queue41Application modelId sim0mqPortNumber");
461 System.exit(-1);
462 }
463
464 System.out.println("Started with args: " + args[0] + " " + args[1]);
465 System.out.flush();
466
467 String modelId = args[0];
468
469 String sPort = args[1];
470 int port = 0;
471 try
472 {
473 port = Integer.parseInt(sPort);
474 }
475 catch (NumberFormatException nfe)
476 {
477 System.err.println("Use as FederateStarter modelId portNumber, where portNumber is a number");
478 System.exit(-1);
479 }
480 if (port == 0 || port > 65535)
481 {
482 System.err.println("PortNumber should be between 1 and 65535");
483 System.exit(-1);
484 }
485
486 new MM1Queue41Application(modelId, port);
487 }
488
489 }