1 package org.sim0mq.demo.mm1;
2
3 import java.rmi.RemoteException;
4
5 import javax.naming.NamingException;
6
7 import org.djunits.unit.DurationUnit;
8 import org.djunits.value.vdouble.scalar.Duration;
9 import org.sim0mq.Sim0MQException;
10 import org.sim0mq.message.MessageStatus;
11 import org.sim0mq.message.SimulationMessage;
12 import org.sim0mq.message.TypedMessage;
13 import org.zeromq.ZMQ;
14
15 import nl.tudelft.simulation.dsol.SimRuntimeException;
16 import nl.tudelft.simulation.dsol.experiment.Replication;
17 import nl.tudelft.simulation.dsol.experiment.ReplicationMode;
18 import nl.tudelft.simulation.dsol.simtime.SimTimeDouble;
19 import nl.tudelft.simulation.dsol.simulators.DEVSSimulator;
20
21
22
23
24
25
26
27
28
29
30
31 public class MM1Queue41Application
32 {
33
34 private DEVSSimulator.TimeDouble simulator;
35
36
37 private MM1Queue41Model model;
38
39
40 private ZMQ.Socket fsSocket;
41
42
43 private ZMQ.Context fsContext;
44
45
46 private Object federationRunId;
47
48
49 private String modelId;
50
51
52 private Duration runTime;
53
54
55 private Duration warmupTime;
56
57
58 private long messageCount = 0;
59
60
61
62
63
64
65
66
67
68
69 protected MM1Queue41Application(final String modelId, final int port)
70 throws SimRuntimeException, RemoteException, NamingException, Sim0MQException
71 {
72 this.modelId = modelId.trim();
73 this.model = new MM1Queue41Model();
74 this.simulator = new DEVSSimulator.TimeDouble();
75 startListener(port);
76 }
77
78
79
80
81
82
83 protected void startListener(final int port) throws Sim0MQException
84 {
85 this.fsContext = ZMQ.context(1);
86
87 this.fsSocket = this.fsContext.socket(ZMQ.ROUTER);
88 this.fsSocket.bind("tcp://*:" + port);
89
90 System.out.println("Model started. Listening at port: " + port);
91 System.out.flush();
92
93 while (!Thread.currentThread().isInterrupted())
94 {
95
96 String identity = this.fsSocket.recvStr();
97 this.fsSocket.recvStr();
98
99 byte[] request = this.fsSocket.recv(0);
100 System.out.println(TypedMessage.printBytes(request));
101 Object[] fields = SimulationMessage.decode(request);
102
103 System.out.println("Received " + SimulationMessage.print(fields));
104 System.out.flush();
105
106 this.federationRunId = fields[1];
107 String senderId = fields[2].toString();
108 String receiverId = fields[3].toString();
109 String messageId = fields[4].toString();
110 long uniqueId = ((Long) fields[5]).longValue();
111
112 if (receiverId.equals(this.modelId))
113 {
114 System.err.println("Received: " + messageId + ", payload = " + SimulationMessage.listPayload(fields));
115 switch (messageId)
116 {
117 case "FS.1":
118 case "FM.5":
119 processRequestStatus(identity, senderId, uniqueId);
120 break;
121
122 case "FM.2":
123 processSimRunControl(identity, senderId, uniqueId, fields);
124 break;
125
126 case "FM.3":
127 processSetParameter(identity, senderId, uniqueId, fields);
128 break;
129
130 case "FM.4":
131 processSimStart(identity, senderId, uniqueId);
132 break;
133
134 case "FM.6":
135 processRequestStatistics(identity, senderId, uniqueId, fields);
136 break;
137
138 case "FS.3":
139 processKillFederate();
140 break;
141
142 default:
143
144 System.err.println("Received unknown message -- not processed: " + messageId);
145 }
146 }
147 else
148 {
149
150 System.err.println(
151 "Received message not intended for " + this.modelId + " but for " + receiverId + " -- not processed: ");
152 }
153 }
154 }
155
156
157
158
159
160
161
162
163 private void processRequestStatus(final String identity, final String receiverId, final long replyToMessageId)
164 throws Sim0MQException
165 {
166 String status = "started";
167 if (this.simulator.isRunning())
168 {
169 status = "running";
170 }
171 else if (this.simulator.getSimulatorTime() != null && this.simulator.getReplication() != null
172 && this.simulator.getReplication().getTreatment() != null)
173 {
174 if (this.simulator.getSimulatorTime().ge(this.simulator.getReplication().getTreatment().getEndTime()))
175 {
176 status = "ended";
177 }
178 else
179 {
180 status = "error";
181 }
182 }
183 this.fsSocket.sendMore(identity);
184 this.fsSocket.sendMore("");
185 byte[] mc1Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.1",
186 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, "");
187 this.fsSocket.send(mc1Message, 0);
188
189 System.out.println("Sent MC.1");
190 System.out.flush();
191 }
192
193
194
195
196
197
198
199
200
201 private void processSimRunControl(final String identity, final String receiverId, final long replyToMessageId,
202 final Object[] fields) throws Sim0MQException
203 {
204 boolean status = true;
205 String error = "";
206 try
207 {
208 Object runTimeField = fields[8];
209 if (runTimeField instanceof Number)
210 {
211 this.runTime = new Duration(((Number) fields[8]).doubleValue(), DurationUnit.SI);
212 }
213 else if (runTimeField instanceof Duration)
214 {
215 this.runTime = (Duration) runTimeField;
216 }
217 else
218 {
219 throw new Sim0MQException("runTimeField " + runTimeField + " neither Number nor Duration");
220 }
221
222 Object warmupField = fields[8];
223 if (warmupField instanceof Number)
224 {
225 this.warmupTime = new Duration(((Number) fields[9]).doubleValue(), DurationUnit.SI);
226 }
227 else if (warmupField instanceof Duration)
228 {
229 this.warmupTime = (Duration) warmupField;
230 }
231 else
232 {
233 throw new Sim0MQException("warmupField " + warmupField + " neither Number nor Duration");
234 }
235 }
236 catch (Exception e)
237 {
238 status = false;
239 error = e.getMessage();
240 }
241 byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
242 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
243 this.fsSocket.sendMore(identity);
244 this.fsSocket.sendMore("");
245 this.fsSocket.send(mc2Message, 0);
246
247 System.out.println("Sent MC.2");
248 System.out.flush();
249 }
250
251
252
253
254
255
256
257
258 private void processSimStart(final String identity, final String receiverId, final long replyToMessageId)
259 throws Sim0MQException
260 {
261 boolean status = true;
262 String error = "";
263 try
264 {
265 Replication<Double, Double, SimTimeDouble> replication =
266 new Replication<>("rep1", new SimTimeDouble(0.0), this.warmupTime.si, this.runTime.si, this.model);
267 this.simulator.initialize(replication, ReplicationMode.TERMINATING);
268 this.simulator.scheduleEventAbs(100.0, this, this, "terminate", null);
269
270 this.simulator.start();
271 }
272 catch (Exception e)
273 {
274 status = false;
275 error = e.getMessage();
276 }
277
278 byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
279 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
280 this.fsSocket.sendMore(identity);
281 this.fsSocket.sendMore("");
282 this.fsSocket.send(mc2Message, 0);
283
284 System.out.println("Sent MC.2");
285 System.out.flush();
286 }
287
288
289
290
291
292
293
294
295
296 private void processSetParameter(final String identity, final String receiverId, final long replyToMessageId,
297 final Object[] fields) throws Sim0MQException
298 {
299 boolean status = true;
300 String error = "";
301 try
302 {
303 String parameterName = fields[8].toString();
304 Object parameterValueField = fields[9];
305
306 switch (parameterName)
307 {
308 case "seed":
309 this.model.seed= ((Number) parameterValueField).longValue();
310 break;
311
312 case "iat":
313 this.model.iat = ((Number) parameterValueField).doubleValue();
314 break;
315
316 case "servicetime":
317 this.model.serviceTime = ((Number) parameterValueField).doubleValue();
318 break;
319
320 default:
321 status = false;
322 error = "Parameter " + parameterName + " unknown";
323 break;
324 }
325 }
326 catch (Exception e)
327 {
328 status = false;
329 error = e.getMessage();
330 }
331
332 byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
333 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
334 this.fsSocket.sendMore(identity);
335 this.fsSocket.sendMore("");
336 this.fsSocket.send(mc2Message, 0);
337
338 System.out.println("Sent MC.2");
339 System.out.flush();
340 }
341
342
343
344
345
346
347
348
349
350 private void processRequestStatistics(final String identity, final String receiverId, final long replyToMessageId,
351 final Object[] fields) throws Sim0MQException
352 {
353 boolean ok = true;
354 String error = "";
355 String variableName = fields[8].toString();
356 double variableValue = Double.NaN;
357 try
358 {
359 switch (variableName)
360 {
361 case "dN.average":
362 variableValue = this.model.dN.getSampleMean();
363 break;
364
365 case "uN.average":
366 variableValue = this.model.uN.getSampleMean();
367 break;
368
369 case "qN.max":
370 variableValue = this.model.qN.getMax();
371 break;
372
373 default:
374 ok = false;
375 error = "Parameter " + variableName + " unknown";
376 break;
377 }
378 }
379 catch (Exception e)
380 {
381 ok = false;
382 error = e.getMessage();
383 }
384
385 if (Double.isNaN(variableValue))
386 {
387 ok = false;
388 error = "Parameter " + variableName + " not set to a value";
389 }
390
391 if (ok)
392 {
393 byte[] mc3Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.3",
394 ++this.messageCount, MessageStatus.NEW, replyToMessageId, variableName, variableValue);
395 this.fsSocket.sendMore(identity);
396 this.fsSocket.sendMore("");
397 this.fsSocket.send(mc3Message, 0);
398
399 System.out.println("Sent MC.3");
400 System.out.flush();
401 }
402 else
403 {
404 byte[] mc4Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.4",
405 ++this.messageCount, MessageStatus.NEW, replyToMessageId, ok, error);
406 this.fsSocket.sendMore(identity);
407 this.fsSocket.sendMore("");
408 this.fsSocket.send(mc4Message, 0);
409
410 System.out.println("Sent MC.4");
411 System.out.flush();
412 }
413 }
414
415
416
417
418 private void processKillFederate()
419 {
420 this.fsSocket.close();
421 this.fsContext.term();
422 System.exit(0);
423 }
424
425
426 protected final void terminate()
427 {
428 System.out.println("average queue length = " + this.model.qN.getSampleMean());
429 System.out.println("average queue wait = " + this.model.dN.getSampleMean());
430 System.out.println("average utilization = " + this.model.uN.getSampleMean());
431 }
432
433
434
435
436
437
438
439
440 public static void main(final String[] args) throws SimRuntimeException, RemoteException, NamingException, Sim0MQException
441 {
442 if (args.length < 2)
443 {
444 System.err.println("Use as MM1Queue41Application modelId sim0mqPortNumber");
445 System.exit(-1);
446 }
447
448 String modelId = args[0];
449
450 String sPort = args[1];
451 int port = 0;
452 try
453 {
454 port = Integer.parseInt(sPort);
455 }
456 catch (NumberFormatException nfe)
457 {
458 System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
459 System.exit(-1);
460 }
461 if (port == 0 || port > 65535)
462 {
463 System.err.println("PortNumber should be between 1 and 65535");
464 System.exit(-1);
465 }
466
467 new MM1Queue41Application(modelId, port);
468 }
469
470 }