1 package org.sim0mq.demo.mm1;
2
3 import java.rmi.RemoteException;
4
5 import javax.naming.NamingException;
6
7 import org.djunits.unit.DurationUnit;
8 import org.djunits.value.vdouble.scalar.Duration;
9 import org.djutils.serialization.SerializationException;
10 import org.sim0mq.Sim0MQException;
11 import org.sim0mq.message.MessageStatus;
12 import org.sim0mq.message.MessageUtil;
13 import org.sim0mq.message.SimulationMessage;
14 import org.zeromq.SocketType;
15 import org.zeromq.ZContext;
16 import org.zeromq.ZMQ;
17
18 import nl.tudelft.simulation.dsol.SimRuntimeException;
19 import nl.tudelft.simulation.dsol.experiment.Replication;
20 import nl.tudelft.simulation.dsol.experiment.ReplicationMode;
21 import nl.tudelft.simulation.dsol.simulators.DEVSSimulator;
22 import nl.tudelft.simulation.dsol.simulators.DEVSSimulatorInterface;
23
24
25
26
27
28
29
30
31
32
33
34 public class MM1Queue41Application
35 {
36
37 private DEVSSimulator.TimeDouble simulator;
38
39
40 private MM1Queue41Model model;
41
42
43 private ZMQ.Socket fsSocket;
44
45
46 private ZContext fsContext;
47
48
49 private Object federationRunId;
50
51
52 private String modelId;
53
54
55 private Duration runTime;
56
57
58 private Duration warmupTime;
59
60
61 private long messageCount = 0;
62
63
64
65
66
67
68
69
70
71
72
73 protected MM1Queue41Application(final String modelId, final int port)
74 throws SimRuntimeException, RemoteException, NamingException, Sim0MQException, SerializationException
75 {
76 this.simulator = new DEVSSimulator.TimeDouble();
77 this.modelId = modelId.trim();
78 this.model = new MM1Queue41Model(this.simulator);
79 startListener(port);
80 }
81
82
83
84
85
86
87
88 protected void startListener(final int port) throws Sim0MQException, SerializationException
89 {
90 this.fsContext = new ZContext(1);
91
92 this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
93 this.fsSocket.bind("tcp://*:" + port);
94
95 System.out.println("Model started. Listening at port: " + port);
96 System.out.flush();
97
98 while (!Thread.currentThread().isInterrupted())
99 {
100
101 String identity = this.fsSocket.recvStr();
102 this.fsSocket.recvStr();
103
104 byte[] request = this.fsSocket.recv(0);
105 System.out.println(MessageUtil.printBytes(request));
106 Object[] fields = SimulationMessage.decode(request);
107
108 System.out.println("Received " + SimulationMessage.print(fields));
109 System.out.flush();
110
111 this.federationRunId = fields[1];
112 String senderId = fields[2].toString();
113 String receiverId = fields[3].toString();
114 String messageId = fields[4].toString();
115 long uniqueId = ((Long) fields[5]).longValue();
116
117 if (receiverId.equals(this.modelId))
118 {
119 System.err.println("Received: " + messageId + ", payload = " + SimulationMessage.listPayload(fields));
120 switch (messageId)
121 {
122 case "FS.1":
123 case "FM.5":
124 processRequestStatus(identity, senderId, uniqueId);
125 break;
126
127 case "FM.2":
128 processSimRunControl(identity, senderId, uniqueId, fields);
129 break;
130
131 case "FM.3":
132 processSetParameter(identity, senderId, uniqueId, fields);
133 break;
134
135 case "FM.4":
136 processSimStart(identity, senderId, uniqueId);
137 break;
138
139 case "FM.6":
140 processRequestStatistics(identity, senderId, uniqueId, fields);
141 break;
142
143 case "FS.3":
144 processKillFederate();
145 break;
146
147 default:
148
149 System.err.println("Received unknown message -- not processed: " + messageId);
150 }
151 }
152 else
153 {
154
155 System.err.println(
156 "Received message not intended for " + this.modelId + " but for " + receiverId + " -- not processed: ");
157 }
158 }
159 }
160
161
162
163
164
165
166
167
168
169 private void processRequestStatus(final String identity, final String receiverId, final long replyToMessageId)
170 throws Sim0MQException, SerializationException
171 {
172 String status = "started";
173 if (this.simulator.isRunning())
174 {
175 status = "running";
176 }
177 else if (this.simulator.getSimulatorTime() != null && this.simulator.getReplication() != null
178 && this.simulator.getReplication().getTreatment() != null)
179 {
180 if (this.simulator.getSimulatorTime() >= this.simulator.getReplication().getTreatment().getEndTime())
181 {
182 status = "ended";
183 }
184 else
185 {
186 status = "error";
187 }
188 }
189 this.fsSocket.sendMore(identity);
190 this.fsSocket.sendMore("");
191 byte[] mc1Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.1",
192 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, "");
193 this.fsSocket.send(mc1Message, 0);
194
195 System.out.println("Sent MC.1");
196 System.out.flush();
197 }
198
199
200
201
202
203
204
205
206
207
208 private void processSimRunControl(final String identity, final String receiverId, final long replyToMessageId,
209 final Object[] fields) throws Sim0MQException, SerializationException
210 {
211 boolean status = true;
212 String error = "";
213 try
214 {
215 Object runTimeField = fields[8];
216 if (runTimeField instanceof Number)
217 {
218 this.runTime = new Duration(((Number) fields[8]).doubleValue(), DurationUnit.SI);
219 }
220 else if (runTimeField instanceof Duration)
221 {
222 this.runTime = (Duration) runTimeField;
223 }
224 else
225 {
226 throw new Sim0MQException("runTimeField " + runTimeField + " neither Number nor Duration");
227 }
228
229 Object warmupField = fields[8];
230 if (warmupField instanceof Number)
231 {
232 this.warmupTime = new Duration(((Number) fields[9]).doubleValue(), DurationUnit.SI);
233 }
234 else if (warmupField instanceof Duration)
235 {
236 this.warmupTime = (Duration) warmupField;
237 }
238 else
239 {
240 throw new Sim0MQException("warmupField " + warmupField + " neither Number nor Duration");
241 }
242 }
243 catch (Exception e)
244 {
245 status = false;
246 error = e.getMessage();
247 }
248 byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
249 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
250 this.fsSocket.sendMore(identity);
251 this.fsSocket.sendMore("");
252 this.fsSocket.send(mc2Message, 0);
253
254 System.out.println("Sent MC.2");
255 System.out.flush();
256 }
257
258
259
260
261
262
263
264
265
266 private void processSimStart(final String identity, final String receiverId, final long replyToMessageId)
267 throws Sim0MQException, SerializationException
268 {
269 boolean status = true;
270 String error = "";
271 try
272 {
273 Replication.TimeDouble<DEVSSimulatorInterface.TimeDouble> replication =
274 Replication.TimeDouble.create("rep1", 0.0, this.warmupTime.si, this.runTime.si, this.model);
275 this.simulator.initialize(replication, ReplicationMode.TERMINATING);
276 this.simulator.scheduleEventAbs(100.0, this, this, "terminate", null);
277
278 this.simulator.start();
279 }
280 catch (Exception e)
281 {
282 status = false;
283 error = e.getMessage();
284 }
285
286 byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
287 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
288 this.fsSocket.sendMore(identity);
289 this.fsSocket.sendMore("");
290 this.fsSocket.send(mc2Message, 0);
291
292 System.out.println("Sent MC.2");
293 System.out.flush();
294 }
295
296
297
298
299
300
301
302
303
304
305 private void processSetParameter(final String identity, final String receiverId, final long replyToMessageId,
306 final Object[] fields) throws Sim0MQException, SerializationException
307 {
308 boolean status = true;
309 String error = "";
310 try
311 {
312 String parameterName = fields[8].toString();
313 Object parameterValueField = fields[9];
314
315 switch (parameterName)
316 {
317 case "seed":
318 this.model.seed = ((Number) parameterValueField).longValue();
319 break;
320
321 case "iat":
322 this.model.iat = ((Number) parameterValueField).doubleValue();
323 break;
324
325 case "servicetime":
326 this.model.serviceTime = ((Number) parameterValueField).doubleValue();
327 break;
328
329 default:
330 status = false;
331 error = "Parameter " + parameterName + " unknown";
332 break;
333 }
334 }
335 catch (Exception e)
336 {
337 status = false;
338 error = e.getMessage();
339 }
340
341 byte[] mc2Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.2",
342 ++this.messageCount, MessageStatus.NEW, replyToMessageId, status, error);
343 this.fsSocket.sendMore(identity);
344 this.fsSocket.sendMore("");
345 this.fsSocket.send(mc2Message, 0);
346
347 System.out.println("Sent MC.2");
348 System.out.flush();
349 }
350
351
352
353
354
355
356
357
358
359
360 private void processRequestStatistics(final String identity, final String receiverId, final long replyToMessageId,
361 final Object[] fields) throws Sim0MQException, SerializationException
362 {
363 boolean ok = true;
364 String error = "";
365 String variableName = fields[8].toString();
366 double variableValue = Double.NaN;
367 try
368 {
369 switch (variableName)
370 {
371 case "dN.average":
372 variableValue = this.model.dN.getSampleMean();
373 break;
374
375 case "uN.average":
376 variableValue = this.model.uN.getSampleMean();
377 break;
378
379 case "qN.max":
380 variableValue = this.model.qN.getMax();
381 break;
382
383 default:
384 ok = false;
385 error = "Parameter " + variableName + " unknown";
386 break;
387 }
388 }
389 catch (Exception e)
390 {
391 ok = false;
392 error = e.getMessage();
393 }
394
395 if (Double.isNaN(variableValue))
396 {
397 ok = false;
398 error = "Parameter " + variableName + " not set to a value";
399 }
400
401 if (ok)
402 {
403 byte[] mc3Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.3",
404 ++this.messageCount, MessageStatus.NEW, replyToMessageId, variableName, variableValue);
405 this.fsSocket.sendMore(identity);
406 this.fsSocket.sendMore("");
407 this.fsSocket.send(mc3Message, 0);
408
409 System.out.println("Sent MC.3");
410 System.out.flush();
411 }
412 else
413 {
414 byte[] mc4Message = SimulationMessage.encodeUTF8(this.federationRunId, this.modelId, receiverId, "MC.4",
415 ++this.messageCount, MessageStatus.NEW, replyToMessageId, ok, error);
416 this.fsSocket.sendMore(identity);
417 this.fsSocket.sendMore("");
418 this.fsSocket.send(mc4Message, 0);
419
420 System.out.println("Sent MC.4");
421 System.out.flush();
422 }
423 }
424
425
426
427
428 private void processKillFederate()
429 {
430 this.fsSocket.close();
431 this.fsContext.destroy();
432 this.fsContext.close();
433 System.exit(0);
434 }
435
436
437 protected final void terminate()
438 {
439 System.out.println("average queue length = " + this.model.qN.getSampleMean());
440 System.out.println("average queue wait = " + this.model.dN.getSampleMean());
441 System.out.println("average utilization = " + this.model.uN.getSampleMean());
442 }
443
444
445
446
447
448
449
450
451
452 public static void main(final String[] args)
453 throws SimRuntimeException, RemoteException, NamingException, Sim0MQException, SerializationException
454 {
455 if (args.length < 2)
456 {
457 System.err.println("Use as MM1Queue41Application modelId sim0mqPortNumber");
458 System.exit(-1);
459 }
460
461 String modelId = args[0];
462
463 String sPort = args[1];
464 int port = 0;
465 try
466 {
467 port = Integer.parseInt(sPort);
468 }
469 catch (NumberFormatException nfe)
470 {
471 System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
472 System.exit(-1);
473 }
474 if (port == 0 || port > 65535)
475 {
476 System.err.println("PortNumber should be between 1 and 65535");
477 System.exit(-1);
478 }
479
480 new MM1Queue41Application(modelId, port);
481 }
482
483 }