1 package org.sim0mq.demo.mm1;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.UUID;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.util.concurrent.atomic.AtomicLong;
11
12 import org.djutils.serialization.SerializationException;
13 import org.sim0mq.Sim0MQException;
14 import org.sim0mq.federationmanager.ModelState;
15 import org.sim0mq.message.MessageStatus;
16 import org.sim0mq.message.SimulationMessage;
17 import org.zeromq.SocketType;
18 import org.zeromq.ZContext;
19 import org.zeromq.ZMQ;
20
21
22
23
24
25
26
27
28
29
30
31 public final class MM1FederationManager20
32 {
33
34
35
36
37 public static void main(final String[] args) throws Sim0MQException
38 {
39 if (args.length < 4)
40 {
41 System.err.println(
42 "Use as FederationManager federationName federationManagerPortNumber federateStarterPortNumber local/sk-3");
43 System.exit(-1);
44 }
45 String federationName = args[0];
46
47 String fmsPort = args[1];
48 int fmPort = 0;
49 try
50 {
51 fmPort = Integer.parseInt(fmsPort);
52 }
53 catch (NumberFormatException nfe)
54 {
55 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk-3, where fmPort is a number");
56 System.exit(-1);
57 }
58 if (fmPort == 0 || fmPort > 65535)
59 {
60 System.err.println("fmPort should be between 1 and 65535");
61 System.exit(-1);
62 }
63
64 String fssPort = args[2];
65 int fsPort = 0;
66 try
67 {
68 fsPort = Integer.parseInt(fssPort);
69 }
70 catch (NumberFormatException nfe)
71 {
72 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where fsPort is a number");
73 System.exit(-1);
74 }
75 if (fsPort == 0 || fsPort > 65535)
76 {
77 System.err.println("fsPort should be between 1 and 65535");
78 System.exit(-1);
79 }
80
81 String localSk3 = args[3];
82 if (!localSk3.equals("local") && !localSk3.equals("sk-3"))
83 {
84 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where last arg is local/sk-3");
85 System.exit(-1);
86 }
87
88 new MM1FederationManager20(federationName, fmPort, fsPort, localSk3);
89 }
90
91
92
93
94
95
96
97
98
99 private MM1FederationManager20(final String federationName, final int fmPort, final int fsPort, final String localSk3)
100 throws Sim0MQException
101 {
102 AtomicLong messageCount = new AtomicLong(0L);
103 AtomicInteger nrRunning = new AtomicInteger();
104
105 Map<Integer, Map<String, Number>> statMap = Collections.synchronizedMap(new HashMap<Integer, Map<String, Number>>());
106
107 for (int modelNr = 0; modelNr < 20; modelNr++)
108 {
109 new Thread()
110 {
111 @Override
112 public void run()
113 {
114 final int nr = nrRunning.getAndIncrement();
115 StateMachine stateMachine = null;
116 System.out.println("inc modelNr to " + nr);
117 try
118 {
119 stateMachine = new StateMachine(messageCount, federationName, fsPort, localSk3, nr);
120 }
121 catch (Sim0MQException | SerializationException exception)
122 {
123 exception.printStackTrace();
124 }
125 int decNr = nrRunning.decrementAndGet();
126 System.out.println("dec modelNr to " + decNr);
127 synchronized (statMap)
128 {
129 statMap.put(nr, stateMachine.getStatistics());
130 }
131 }
132 }.start();
133 }
134
135 while (nrRunning.get() > 0)
136 {
137 Thread.yield();
138 }
139
140 synchronized (statMap)
141 {
142 for (int nr : statMap.keySet())
143 {
144 Map<String, Number> stats = statMap.get(nr);
145 StringBuilder s = new StringBuilder();
146 s.append(String.format("%2d ", nr));
147 for (String code : stats.keySet())
148 {
149 s.append(String.format("%10s=%10.4f ", code, stats.get(code).doubleValue()));
150 }
151 System.out.println(s.toString());
152 }
153 }
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167 static class StateMachine
168 {
169
170 private ModelState state;
171
172
173 private ZMQ.Socket modelSocket;
174
175
176 private String modelName;
177
178
179 private ZMQ.Socket fsSocket;
180
181
182 private ZContext fmContext;
183
184
185 private AtomicLong messageCount;
186
187
188 private Map<String, Number> statistics = new HashMap<>();
189
190
191
192
193
194
195
196
197
198
199 StateMachine(final AtomicLong messageCount, final String federationName, final int fsPort, final String localSk3,
200 final int modelNr) throws Sim0MQException, SerializationException
201 {
202 this.fmContext = new ZContext(1);
203
204 this.fsSocket = this.fmContext.createSocket(SocketType.REQ);
205 this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
206
207 this.modelName = "MM1." + modelNr;
208 this.messageCount = messageCount;
209
210 this.modelSocket = this.fmContext.createSocket(SocketType.REQ);
211 this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
212
213 this.state = ModelState.NOT_STARTED;
214 boolean ready = false;
215 while (!ready)
216 {
217 System.out.println(this.state);
218
219 switch (this.state)
220 {
221 case NOT_STARTED:
222 startModel(federationName, fsPort, localSk3);
223 break;
224
225 case STARTED:
226 sendSimRunControl(federationName);
227 break;
228
229 case RUNCONTROL:
230 setParameters(federationName);
231 break;
232
233 case PARAMETERS:
234 sendSimStart(federationName);
235 break;
236
237 case SIMULATORSTARTED:
238 waitForSimEnded(federationName);
239 break;
240
241 case SIMULATORENDED:
242 requestStatistics(federationName);
243 break;
244
245 case STATISTICSGATHERED:
246 killFederate(federationName);
247 ready = true;
248 break;
249
250 case ERROR:
251 killFederate(federationName);
252 ready = true;
253 break;
254
255 default:
256 break;
257 }
258 }
259
260 this.fsSocket.close();
261 this.modelSocket.close();
262 this.fmContext.destroy();
263 this.fmContext.close();
264 }
265
266
267
268
269
270
271
272
273
274 private void startModel(final String federationName, final int fsPort, final String localSk3)
275 throws Sim0MQException, SerializationException
276 {
277
278 byte[] fm1Message;
279 if (localSk3.equals("sk-3"))
280 {
281 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1",
282 this.messageCount.getAndIncrement(), MessageStatus.NEW, this.modelName, "java8+", "-jar",
283 "/home/alexandv/sim0mq/MM1/mm1.jar", this.modelName + " %PORT%", "/home/alexandv/sim0mq/MM1", "",
284 "/home/alexandv/sim0mq/MM1/out_" + this.modelName + ".txt",
285 "/home/alexandv/sim0mq/MM1/err_" + this.modelName + ".txt", false, true, true);
286 this.fsSocket.connect("tcp://130.161.3.179:" + fsPort);
287 }
288 else
289 {
290 fm1Message =
291 SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", this.messageCount.getAndIncrement(),
292 MessageStatus.NEW, this.modelName, "java8+", "-jar", "e:/sim0mq/MM1/mm1.jar",
293 this.modelName + " %PORT%", "e:/sim0mq/MM1", "", "e:/sim0mq/MM1/out_" + this.modelName + ".txt",
294 "e:/sim0mq/MM1/err_" + this.modelName + ".txt", false, true, true);
295 this.fsSocket.connect("tcp://127.0.0.1:" + fsPort);
296 }
297 this.fsSocket.send(fm1Message);
298
299 byte[] reply = this.fsSocket.recv(0);
300 Object[] replyMessage = SimulationMessage.decode(reply);
301 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
302
303 if (replyMessage[4].toString().equals("FS.2") && replyMessage[9].toString().equals("started")
304 && replyMessage[8].toString().equals(this.modelName))
305 {
306 this.state = ModelState.STARTED;
307 this.modelSocket.connect("tcp://127.0.0.1:" + replyMessage[10].toString());
308 }
309 else
310 {
311 this.state = ModelState.ERROR;
312 System.err.println("Model not started correctly -- state = " + replyMessage[9]);
313 System.err.println("Started model = " + replyMessage[8]);
314 System.err.println("Error message = " + replyMessage[10]);
315 }
316
317 }
318
319
320
321
322
323
324
325 private void sendSimRunControl(final String federationName) throws Sim0MQException, SerializationException
326 {
327 long messageNumber = this.messageCount.get();
328 byte[] fm2Message;
329 fm2Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.2",
330 this.messageCount.getAndIncrement(), MessageStatus.NEW, 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0);
331 this.modelSocket.send(fm2Message);
332
333 byte[] reply = this.modelSocket.recv(0);
334 Object[] replyMessage = SimulationMessage.decode(reply);
335 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
336
337 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
338 && ((Long) replyMessage[8]).longValue() == messageNumber)
339 {
340 this.state = ModelState.RUNCONTROL;
341 }
342 else
343 {
344 this.state = ModelState.ERROR;
345 System.err.println("Model not started correctly -- status = " + replyMessage[9]);
346 System.err.println("Error message = " + replyMessage[10]);
347 }
348 }
349
350
351
352
353
354
355
356 private void setParameters(final String federationName) throws Sim0MQException, SerializationException
357 {
358 Map<String, Object> parameters = new HashMap<>();
359 parameters.put("iat", new Double(1.0));
360 parameters.put("servicetime", new Double(0.85));
361 parameters.put("seed", Math.abs(this.modelName.hashCode()));
362
363 for (String parameterName : parameters.keySet())
364 {
365 if (!this.state.isError())
366 {
367 long messageNumber = this.messageCount.get();
368 byte[] fm3Message;
369 fm3Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.3",
370 this.messageCount.getAndIncrement(), MessageStatus.NEW, parameterName,
371 parameters.get(parameterName));
372 this.modelSocket.send(fm3Message);
373
374 byte[] reply = this.modelSocket.recv(0);
375 Object[] replyMessage = SimulationMessage.decode(reply);
376 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
377
378 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
379 && ((Long) replyMessage[8]).longValue() == messageNumber)
380 {
381 this.state = ModelState.PARAMETERS;
382 }
383 else
384 {
385 this.state = ModelState.ERROR;
386 System.err.println("Model parameter error -- status = " + replyMessage[9]);
387 System.err.println("Error message = " + replyMessage[10]);
388 }
389 }
390 }
391 if (!this.state.isError())
392 {
393 this.state = ModelState.PARAMETERS;
394 }
395 }
396
397
398
399
400
401
402
403 private void sendSimStart(final String federationName) throws Sim0MQException, SerializationException
404 {
405 long messageNumber = this.messageCount.get();
406 byte[] fm4Message;
407 fm4Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.4",
408 this.messageCount.getAndIncrement(), MessageStatus.NEW);
409 this.modelSocket.send(fm4Message);
410
411 byte[] reply = this.modelSocket.recv(0);
412 Object[] replyMessage = SimulationMessage.decode(reply);
413 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
414
415 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
416 && ((Long) replyMessage[8]).longValue() == messageNumber)
417 {
418 this.state = ModelState.SIMULATORSTARTED;
419 }
420 else
421 {
422 this.state = ModelState.ERROR;
423 System.err.println("Simulation start error -- status = " + replyMessage[9]);
424 System.err.println("Error message = " + replyMessage[10]);
425 }
426 }
427
428
429
430
431
432
433
434 private void waitForSimEnded(final String federationName) throws Sim0MQException, SerializationException
435 {
436 while (!this.state.isSimulatorEnded() && !this.state.isError())
437 {
438 long messageNumber = this.messageCount.get();
439 byte[] fm5Message;
440 fm5Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.5",
441 this.messageCount.getAndIncrement(), MessageStatus.NEW);
442 this.modelSocket.send(fm5Message);
443
444 byte[] reply = this.modelSocket.recv(0);
445 Object[] replyMessage = SimulationMessage.decode(reply);
446 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
447
448 if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
449 && !replyMessage[9].toString().equals("started")
450 && ((Long) replyMessage[8]).longValue() == messageNumber)
451 {
452 if (replyMessage[9].toString().equals("ended"))
453 {
454 this.state = ModelState.SIMULATORENDED;
455 }
456 else
457 {
458
459 try
460 {
461 Thread.sleep(1000);
462 }
463 catch (InterruptedException ie)
464 {
465
466 }
467 }
468 }
469 else
470 {
471 this.state = ModelState.ERROR;
472 System.err.println("Simulation start error -- status = " + replyMessage[9]);
473 System.err.println("Error message = " + replyMessage[10]);
474 }
475 }
476 }
477
478
479
480
481
482
483
484 private void requestStatistics(final String federationName) throws Sim0MQException, SerializationException
485 {
486 List<String> stats = new ArrayList<>();
487 stats.add("dN.average");
488 stats.add("qN.max");
489 stats.add("uN.average");
490
491 for (String statName : stats)
492 {
493 if (!this.state.isError())
494 {
495 byte[] fm6Message;
496 fm6Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.6",
497 this.messageCount.getAndIncrement(), MessageStatus.NEW, statName);
498 this.modelSocket.send(fm6Message);
499
500 byte[] reply = this.modelSocket.recv(0);
501 Object[] replyMessage = SimulationMessage.decode(reply);
502 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
503
504 if (replyMessage[4].toString().equals("MC.3"))
505 {
506 if (replyMessage[9].toString().equals(statName))
507 {
508 System.out.println("Received statistic for " + statName + " = " + replyMessage[10].toString());
509 this.statistics.put(statName, (Number) replyMessage[10]);
510 }
511 else
512 {
513 this.state = ModelState.ERROR;
514 System.err.println(
515 "Statistics Error: Stat variable expected = " + statName + ", got: " + replyMessage[8]);
516 }
517 }
518 else if (replyMessage[4].toString().equals("MC.4"))
519 {
520 this.state = ModelState.ERROR;
521 System.err.println("Statistics Error: Stat variable = " + replyMessage[8]);
522 System.err.println("Error message = " + replyMessage[9]);
523 }
524 else
525 {
526 this.state = ModelState.ERROR;
527 System.err.println("Statistics Error: Received unknown message as reply to FM6: " + replyMessage[4]);
528 }
529 }
530 }
531 if (!this.state.isError())
532 {
533 this.state = ModelState.STATISTICSGATHERED;
534 }
535 }
536
537
538
539
540
541
542
543 private void killFederate(final String federationName) throws Sim0MQException, SerializationException
544 {
545 byte[] fm8Message;
546 fm8Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.8", this.messageCount.getAndIncrement(),
547 MessageStatus.NEW, this.modelName);
548 this.fsSocket.send(fm8Message);
549
550 byte[] reply = this.fsSocket.recv(0);
551 Object[] replyMessage = SimulationMessage.decode(reply);
552 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
553
554 if (replyMessage[4].toString().equals("FS.4") && (boolean) replyMessage[9]
555 && replyMessage[8].toString().equals(this.modelName))
556 {
557 this.state = ModelState.TERMINATED;
558 }
559 else
560 {
561 this.state = ModelState.ERROR;
562 System.err.println("Model not killed correctly");
563 System.err.println("Tried to kill model = " + replyMessage[8]);
564 System.err.println("Error message = " + replyMessage[10]);
565 }
566 }
567
568
569
570
571 public final Map<String, Number> getStatistics()
572 {
573 return this.statistics;
574 }
575
576 }
577
578 }