1 package org.sim0mq.demo.mm1;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.LinkedHashMap;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.UUID;
10 import java.util.concurrent.atomic.AtomicInteger;
11 import java.util.concurrent.atomic.AtomicLong;
12
13 import org.djutils.serialization.SerializationException;
14 import org.sim0mq.Sim0MQException;
15 import org.sim0mq.federationmanager.ModelState;
16 import org.sim0mq.message.Sim0MQMessage;
17 import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
18 import org.sim0mq.message.federatestarter.FS4FederateKilledMessage;
19 import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
20 import org.sim0mq.message.federationmanager.FM2SimRunControlMessage;
21 import org.sim0mq.message.federationmanager.FM3SetParameterMessage;
22 import org.sim0mq.message.federationmanager.FM4SimStartMessage;
23 import org.sim0mq.message.federationmanager.FM5RequestStatus;
24 import org.sim0mq.message.federationmanager.FM6RequestStatisticsMessage;
25 import org.sim0mq.message.federationmanager.FM8KillFederateMessage;
26 import org.sim0mq.message.modelcontroller.MC1StatusMessage;
27 import org.sim0mq.message.modelcontroller.MC2AckNakMessage;
28 import org.sim0mq.message.modelcontroller.MC3StatisticsMessage;
29 import org.sim0mq.message.modelcontroller.MC4StatisticsErrorMessage;
30 import org.zeromq.SocketType;
31 import org.zeromq.ZContext;
32 import org.zeromq.ZMQ;
33
34
35
36
37
38
39
40
41
42
43
44 public final class MM1FederationManager20
45 {
46
47
48
49
50 public static void main(final String[] args) throws Sim0MQException
51 {
52 if (args.length < 5)
53 {
54 System.err.println("Use as FederationManager federationName federationManagerPortNumber "
55 + "federateStarterIPorName federateStarterPortNumber modelFolder");
56 System.exit(-1);
57 }
58 String federationName = args[0];
59
60 String fmsPort = args[1];
61 int fmPort = 0;
62 try
63 {
64 fmPort = Integer.parseInt(fmsPort);
65 }
66 catch (NumberFormatException nfe)
67 {
68 System.err.println("Use as FederationManager fedName fmPort fsIP fsPort modelFolder, where fmPort is a number");
69 System.exit(-1);
70 }
71 if (fmPort == 0 || fmPort > 65535)
72 {
73 System.err.println("fmPort should be between 1 and 65535");
74 System.exit(-1);
75 }
76
77 String fsServerNameOrIP = args[2];
78
79 String fsPortString = args[3];
80 int fsPort = 0;
81 try
82 {
83 fsPort = Integer.parseInt(fsPortString);
84 }
85 catch (NumberFormatException nfe)
86 {
87 System.err.println("Use as FederationManager fedName fmPort fsIP fsPort modelFolder, where fmPort is a number");
88 System.exit(-1);
89 }
90 if (fsPort == 0 || fsPort > 65535)
91 {
92 System.err.println("fsPort should be between 1 and 65535");
93 System.exit(-1);
94 }
95
96 String mm1ModelFolder = args[4];
97
98 new MM1FederationManager20(federationName, fmPort, fsServerNameOrIP, fsPort, mm1ModelFolder);
99 }
100
101
102
103
104
105
106
107
108
109
110 private MM1FederationManager20(final String federationName, final int fmPort, final String fsServerNameOrIP,
111 final int fsPort, final String mm1ModelFolder) throws Sim0MQException
112 {
113 AtomicLong messageCount = new AtomicLong(0L);
114 AtomicInteger nrRunning = new AtomicInteger();
115
116 Map<Integer, Map<String, Number>> statMap =
117 Collections.synchronizedMap(new LinkedHashMap<Integer, Map<String, Number>>());
118
119 for (int modelNr = 0; modelNr < 20; modelNr++)
120 {
121 new Thread()
122 {
123 @Override
124 public void run()
125 {
126 final int nr = nrRunning.getAndIncrement();
127 StateMachine stateMachine = null;
128 System.out.println("inc modelNr to " + nr);
129 try
130 {
131 stateMachine =
132 new StateMachine(messageCount, federationName, fsServerNameOrIP, fsPort, mm1ModelFolder, nr);
133 }
134 catch (Sim0MQException | SerializationException exception)
135 {
136 exception.printStackTrace();
137 }
138 int decNr = nrRunning.decrementAndGet();
139 System.out.println("dec modelNr to " + decNr);
140 synchronized (statMap)
141 {
142 statMap.put(nr, stateMachine.getStatistics());
143 }
144 }
145 }.start();
146 }
147
148 while (nrRunning.get() > 0)
149 {
150 Thread.yield();
151 }
152
153 synchronized (statMap)
154 {
155 for (int nr : statMap.keySet())
156 {
157 Map<String, Number> stats = statMap.get(nr);
158 StringBuilder s = new StringBuilder();
159 s.append(String.format("%2d ", nr));
160 for (String code : stats.keySet())
161 {
162 s.append(String.format("%10s=%10.4f ", code, stats.get(code).doubleValue()));
163 }
164 System.out.println(s.toString());
165 }
166 }
167 }
168
169
170
171
172
173
174
175
176
177
178
179
180 static class StateMachine
181 {
182
183 private ModelState state;
184
185
186 private ZMQ.Socket modelSocket;
187
188
189 private String modelName;
190
191
192 private ZMQ.Socket fsSocket;
193
194
195 private ZContext fmContext;
196
197
198 private AtomicLong messageCount;
199
200
201 private Map<String, Number> statistics = new LinkedHashMap<>();
202
203
204
205
206
207
208
209
210
211
212
213 StateMachine(final AtomicLong messageCount, final String federationName, final String fsServerNameOrIP,
214 final int fsPort, final String mm1ModelFolder, final int modelNr) throws Sim0MQException, SerializationException
215 {
216 this.fmContext = new ZContext(1);
217
218 this.fsSocket = this.fmContext.createSocket(SocketType.REQ);
219 this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
220
221 this.modelName = "MM1." + modelNr;
222 this.messageCount = messageCount;
223
224 this.modelSocket = this.fmContext.createSocket(SocketType.REQ);
225 this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
226
227 this.state = ModelState.NOT_STARTED;
228 boolean ready = false;
229 while (!ready)
230 {
231 System.out.println(this.state);
232
233 switch (this.state)
234 {
235 case NOT_STARTED:
236
237 startModel(federationName, fsServerNameOrIP, fsPort, mm1ModelFolder);
238 break;
239
240 case STARTED:
241 sendSimRunControl(federationName);
242 break;
243
244 case RUNCONTROL:
245 setParameters(federationName);
246 break;
247
248 case PARAMETERS:
249 sendSimStart(federationName);
250 break;
251
252 case SIMULATORSTARTED:
253 waitForSimEnded(federationName);
254 break;
255
256 case SIMULATORENDED:
257 requestStatistics(federationName);
258 break;
259
260 case STATISTICSGATHERED:
261 killFederate(federationName);
262 ready = true;
263 break;
264
265 case ERROR:
266 killFederate(federationName);
267 ready = true;
268 break;
269
270 default:
271 break;
272 }
273 }
274
275 this.fsSocket.close();
276 this.modelSocket.close();
277 this.fmContext.destroy();
278 this.fmContext.close();
279 }
280
281
282
283
284
285
286
287
288
289
290 private void startModel(final String federationName, final String fsServerNameOrIP, final int fsPort,
291 final String mm1ModelFolder) throws Sim0MQException, SerializationException
292 {
293
294 byte[] fm1Message = new FM1StartFederateMessage(federationName, "FM", "FS", this.messageCount.getAndIncrement(),
295 this.modelName, "java8+", "-jar", mm1ModelFolder + "/mm1.jar", this.modelName + " %PORT%", mm1ModelFolder,
296 "", mm1ModelFolder + "/out_" + this.modelName + ".txt", mm1ModelFolder + "/err_" + this.modelName + ".txt",
297 false, true, true).createByteArray();
298 this.fsSocket.connect("tcp://" + fsServerNameOrIP + ":" + fsPort);
299 this.fsSocket.send(fm1Message);
300
301 byte[] reply = this.fsSocket.recv(0);
302
303 try
304 {
305 Object[] fs2Fields = Sim0MQMessage.decode(reply).createObjectArray();
306 System.out.println("Received\n" + Sim0MQMessage.print(fs2Fields));
307 FS2FederateStartedMessage message = new FS2FederateStartedMessage(fs2Fields);
308
309 if (message.getStatus().toString().equals("started") && message.getInstanceId().equals(this.modelName))
310 {
311 this.state = ModelState.STARTED;
312 this.modelSocket.connect("tcp://" + fsServerNameOrIP + ":" + message.getModelPort());
313 }
314 else
315 {
316 this.state = ModelState.ERROR;
317 System.err.println("Model not started correctly -- state = " + message.getStatus());
318 System.err.println("Started model = " + message.getInstanceId() + " on port " + message.getModelPort());
319 System.err.println("Error message = " + message.getError());
320 }
321 }
322 catch (Exception exception)
323 {
324 this.state = ModelState.ERROR;
325 System.err.println("Model not started correctly -- error = " + exception.getClass().getSimpleName());
326 System.err.println("Started instance of model = " + this.modelName);
327 System.err.println("Error message = " + exception.getMessage());
328 }
329 }
330
331
332
333
334
335
336
337 private void sendSimRunControl(final String federationName) throws Sim0MQException, SerializationException
338 {
339 long messageNumber = this.messageCount.get();
340 byte[] fm2Message = new FM2SimRunControlMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement(),
341 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0, new HashMap<Object, Long>()).createByteArray();
342 this.modelSocket.send(fm2Message);
343
344 byte[] reply = this.modelSocket.recv(0);
345 try
346 {
347 Object[] mc2Fields = Sim0MQMessage.decode(reply).createObjectArray();
348 System.out.println("Received\n" + Sim0MQMessage.print(mc2Fields));
349 MC2AckNakMessage message = new MC2AckNakMessage(mc2Fields);
350 if (message.getStatus() && (Long) message.getReplyToId() == messageNumber)
351 {
352 this.state = ModelState.RUNCONTROL;
353 }
354 else
355 {
356 this.state = ModelState.ERROR;
357 System.err.println("Model not started correctly -- state = " + message.getStatus());
358 System.err.println("Error message = " + message.getError());
359 }
360 }
361 catch (Exception exception)
362 {
363 this.state = ModelState.ERROR;
364 System.err.println("Model not started correctly -- error = " + exception.getClass().getSimpleName());
365 System.err.println("Error message = " + exception.getMessage());
366 }
367 }
368
369
370
371
372
373
374
375 private void setParameters(final String federationName) throws Sim0MQException, SerializationException
376 {
377 Map<String, Object> parameters = new LinkedHashMap<>();
378 parameters.put("iat", new Double(1.0));
379 parameters.put("servicetime", new Double(0.85));
380 parameters.put("seed", Math.abs(this.modelName.hashCode()));
381
382 for (String parameterName : parameters.keySet())
383 {
384 if (!this.state.isError())
385 {
386 long messageNumber = this.messageCount.get();
387 byte[] fm3Message =
388 new FM3SetParameterMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement(),
389 parameterName, parameters.get(parameterName)).createByteArray();
390 this.modelSocket.send(fm3Message);
391
392 byte[] reply = this.modelSocket.recv(0);
393
394 try
395 {
396 Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
397 System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
398 MC2AckNakMessage message = new MC2AckNakMessage(replyFields);
399 if (message.getStatus() && (Long) message.getReplyToId() == messageNumber)
400 {
401 this.state = ModelState.PARAMETERS;
402 }
403 else
404 {
405 this.state = ModelState.ERROR;
406 System.err.println("Model parameter error -- status = " + message.getStatus());
407 System.err.println("Error message = " + message.getError());
408 }
409 }
410 catch (Exception exception)
411 {
412 this.state = ModelState.ERROR;
413 System.err.println("Model parameter error = " + exception.getClass().getSimpleName());
414 System.err.println("Error message = " + exception.getMessage());
415 }
416 }
417 }
418 if (!this.state.isError())
419 {
420 this.state = ModelState.PARAMETERS;
421 }
422 }
423
424
425
426
427
428
429
430 private void sendSimStart(final String federationName) throws Sim0MQException, SerializationException
431 {
432 long messageNumber = this.messageCount.get();
433 byte[] fm4Message =
434 new FM4SimStartMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement()).createByteArray();
435 this.modelSocket.send(fm4Message);
436
437 byte[] reply = this.modelSocket.recv(0);
438
439 try
440 {
441 Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
442 System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
443 MC2AckNakMessage message = new MC2AckNakMessage(replyFields);
444 if (message.getStatus() && (Long) message.getReplyToId() == messageNumber)
445 {
446 this.state = ModelState.SIMULATORSTARTED;
447 }
448 else
449 {
450 this.state = ModelState.ERROR;
451 System.err.println("Model start error -- status = " + message.getStatus());
452 System.err.println("Error message = " + message.getError());
453 }
454 }
455 catch (Exception exception)
456 {
457 this.state = ModelState.ERROR;
458 System.err.println("Model start error = " + exception.getClass().getSimpleName());
459 System.err.println("Error message = " + exception.getMessage());
460 }
461 }
462
463
464
465
466
467
468
469 private void waitForSimEnded(final String federationName) throws Sim0MQException, SerializationException
470 {
471 while (!this.state.isSimulatorEnded() && !this.state.isError())
472 {
473 long messageNumber = this.messageCount.get();
474 byte[] fm5Message =
475 new FM5RequestStatus(federationName, "FM", this.modelName, this.messageCount.getAndIncrement()).createByteArray();
476 this.modelSocket.send(fm5Message);
477
478 byte[] reply = this.modelSocket.recv(0);
479
480 try
481 {
482 Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
483 System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
484 MC1StatusMessage message = new MC1StatusMessage(replyFields);
485 if (!message.getStatus().equals("error") && !message.getStatus().equals("started")
486 && (Long) message.getReplyToId() == messageNumber)
487 {
488 if (message.getStatus().equals("ended"))
489 {
490 this.state = ModelState.SIMULATORENDED;
491 }
492 else
493 {
494
495 try
496 {
497 Thread.sleep(1000);
498 }
499 catch (InterruptedException ie)
500 {
501
502 }
503 }
504 }
505 else
506 {
507 this.state = ModelState.ERROR;
508 System.err.println("Model poll status error -- status = " + message.getStatus());
509 System.err.println("Error message = " + message.getError());
510 }
511 }
512 catch (Exception exception)
513 {
514 this.state = ModelState.ERROR;
515 System.err.println("Model poll status error = " + exception.getClass().getSimpleName());
516 System.err.println("Error message = " + exception.getMessage());
517 }
518 }
519 }
520
521
522
523
524
525
526
527 private void requestStatistics(final String federationName) throws Sim0MQException, SerializationException
528 {
529 List<String> stats = new ArrayList<>();
530 stats.add("dN.average");
531 stats.add("qN.max");
532 stats.add("uN.average");
533
534 for (String statName : stats)
535 {
536 if (!this.state.isError())
537 {
538 byte[] fm6Message = new FM6RequestStatisticsMessage(federationName, "FM", this.modelName,
539 this.messageCount.getAndIncrement(), statName).createByteArray();
540 this.modelSocket.send(fm6Message);
541
542 byte[] reply = this.modelSocket.recv(0);
543
544 try
545 {
546 Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
547 System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
548
549 if (replyFields[5].toString().equals("MC.3"))
550 {
551 MC3StatisticsMessage message = new MC3StatisticsMessage(replyFields);
552 if (message.getVariableName().equals(statName))
553 {
554 System.out.println(
555 "Received statistic for " + statName + " = " + message.getVariableValue().toString());
556 this.statistics.put(statName, (Number) message.getVariableValue());
557 }
558 else
559 {
560 this.state = ModelState.ERROR;
561 System.err.println("Statistics Error: Stat variable expected = " + statName + ", got: "
562 + message.getVariableName());
563 }
564 }
565
566 else if (replyFields[5].toString().equals("MC.4"))
567 {
568 MC4StatisticsErrorMessage message = new MC4StatisticsErrorMessage(replyFields);
569 this.state = ModelState.ERROR;
570 System.err.println("Statistics Error: Stat variable = " + message.getVariableName());
571 System.err.println("Error message = " + message.getError());
572 }
573
574 }
575 catch (Exception exception)
576 {
577 this.state = ModelState.ERROR;
578 System.err.println("Model get statistics error = " + exception.getClass().getSimpleName());
579 System.err.println("Error message = " + exception.getMessage());
580 }
581 }
582 }
583 if (!this.state.isError())
584 {
585 this.state = ModelState.STATISTICSGATHERED;
586 }
587 }
588
589
590
591
592
593
594
595 private void killFederate(final String federationName) throws Sim0MQException, SerializationException
596 {
597 byte[] fm8Message =
598 new FM8KillFederateMessage(federationName, "FM", "FS", this.messageCount.getAndIncrement(), this.modelName)
599 .createByteArray();
600 this.fsSocket.send(fm8Message);
601
602 byte[] reply = this.fsSocket.recv(0);
603
604 try
605 {
606 Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
607 System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
608 FS4FederateKilledMessage message = new FS4FederateKilledMessage(replyFields);
609 if (message.isStatus() && message.getInstanceId().equals(this.modelName))
610 {
611 this.state = ModelState.TERMINATED;
612 }
613 else
614 {
615 this.state = ModelState.ERROR;
616 System.err.println("Model not killed correctly, model = " + this.modelName);
617 System.err.println("Error message = " + message.getError());
618 }
619 }
620 catch (Exception exception)
621 {
622 this.state = ModelState.ERROR;
623 System.err.println("Model not killed correctly, error = " + exception.getClass().getSimpleName());
624 System.err.println("Error message = " + exception.getMessage());
625 }
626 }
627
628
629
630
631 public final Map<String, Number> getStatistics()
632 {
633 return this.statistics;
634 }
635
636 }
637
638 }