1 package org.sim0mq.demo.mm1;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.UUID;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.util.concurrent.atomic.AtomicLong;
11
12 import org.sim0mq.Sim0MQException;
13 import org.sim0mq.federationmanager.ModelState;
14 import org.sim0mq.message.MessageStatus;
15 import org.sim0mq.message.SimulationMessage;
16 import org.zeromq.ZMQ;
17
18
19
20
21
22
23
24
25
26
27
28 public class MM1FederationManager20
29 {
30
31
32
33
34 public static void main(String[] args) throws Sim0MQException
35 {
36 if (args.length < 4)
37 {
38 System.err.println(
39 "Use as FederationManager federationName federationManagerPortNumber federateStarterPortNumber local/sk-3");
40 System.exit(-1);
41 }
42 String federationName = args[0];
43
44 String fmsPort = args[1];
45 int fmPort = 0;
46 try
47 {
48 fmPort = Integer.parseInt(fmsPort);
49 }
50 catch (NumberFormatException nfe)
51 {
52 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk-3, where fmPort is a number");
53 System.exit(-1);
54 }
55 if (fmPort == 0 || fmPort > 65535)
56 {
57 System.err.println("fmPort should be between 1 and 65535");
58 System.exit(-1);
59 }
60
61 String fssPort = args[2];
62 int fsPort = 0;
63 try
64 {
65 fsPort = Integer.parseInt(fssPort);
66 }
67 catch (NumberFormatException nfe)
68 {
69 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where fsPort is a number");
70 System.exit(-1);
71 }
72 if (fsPort == 0 || fsPort > 65535)
73 {
74 System.err.println("fsPort should be between 1 and 65535");
75 System.exit(-1);
76 }
77
78 String localSk3 = args[3];
79 if (!localSk3.equals("local") && !localSk3.equals("sk-3"))
80 {
81 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where last arg is local/sk-3");
82 System.exit(-1);
83 }
84
85 new MM1FederationManager20(federationName, fmPort, fsPort, localSk3);
86 }
87
88
89
90
91
92
93
94
95
96 public MM1FederationManager20(final String federationName, final int fmPort, final int fsPort, final String localSk3)
97 throws Sim0MQException
98 {
99 AtomicLong messageCount = new AtomicLong(0L);
100 AtomicInteger nrRunning = new AtomicInteger();
101
102 Map<Integer, Map<String, Number>> statMap = Collections.synchronizedMap(new HashMap<Integer, Map<String, Number>>());
103
104 for (int modelNr = 0; modelNr < 20; modelNr++)
105 {
106 new Thread()
107 {
108 @Override
109 public void run()
110 {
111 final int nr = nrRunning.getAndIncrement();
112 StateMachine stateMachine = null;
113 System.out.println("inc modelNr to " + nrRunning.get());
114 try
115 {
116 stateMachine = new StateMachine(messageCount, federationName, fsPort, localSk3, nr);
117 }
118 catch (Sim0MQException exception)
119 {
120 exception.printStackTrace();
121 }
122 nrRunning.decrementAndGet();
123 System.out.println("dec modelNr to " + nrRunning.get());
124 statMap.put(nr, stateMachine.getStatistics());
125 }
126 }.start();
127 }
128
129 while (nrRunning.get() > 0)
130 {
131 Thread.yield();
132 }
133
134 for (int nr : statMap.keySet())
135 {
136 Map<String, Number> stats = statMap.get(nr);
137 StringBuilder s = new StringBuilder();
138 s.append(String.format("%2d ", nr));
139 for (String code : stats.keySet())
140 {
141 s.append(String.format("%10s=%10.4f ", code, stats.get(code).doubleValue()));
142 }
143 System.out.println(s.toString());
144 }
145 }
146
147
148
149
150
151
152
153
154
155
156
157
158 static class StateMachine
159 {
160
161 private ModelState state;
162
163
164 private ZMQ.Socket modelSocket;
165
166
167 private String modelName;
168
169
170 protected ZMQ.Socket fsSocket;
171
172
173 private ZMQ.Context fmContext;
174
175
176 private AtomicLong messageCount;
177
178
179 private Map<String, Number> statistics = new HashMap<>();
180
181
182
183
184
185
186
187
188
189 StateMachine(final AtomicLong messageCount, final String federationName, final int fsPort,
190 final String localSk3, final int modelNr) throws Sim0MQException
191 {
192 this.fmContext = ZMQ.context(1);
193
194 this.fsSocket = this.fmContext.socket(ZMQ.REQ);
195 this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
196
197 this.modelName = "MM1." + modelNr;
198 this.messageCount = messageCount;
199
200 this.modelSocket = this.fmContext.socket(ZMQ.REQ);
201 this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
202
203 this.state = ModelState.NOT_STARTED;
204 boolean ready = false;
205 while (!ready)
206 {
207 System.out.println(this.state);
208
209 switch (this.state)
210 {
211 case NOT_STARTED:
212 startModel(federationName, fsPort, localSk3);
213 break;
214
215 case STARTED:
216 sendSimRunControl(federationName);
217 break;
218
219 case RUNCONTROL:
220 setParameters(federationName);
221 break;
222
223 case PARAMETERS:
224 sendSimStart(federationName);
225 break;
226
227 case SIMULATORSTARTED:
228 waitForSimEnded(federationName);
229 break;
230
231 case SIMULATORENDED:
232 requestStatistics(federationName);
233 break;
234
235 case STATISTICSGATHERED:
236 killFederate(federationName);
237 ready = true;
238 break;
239
240 case ERROR:
241 killFederate(federationName);
242 ready = true;
243 break;
244
245 default:
246 break;
247 }
248 }
249
250 this.fsSocket.close();
251 this.modelSocket.close();
252 this.fmContext.term();
253 }
254
255
256
257
258
259
260
261
262 private void startModel(final String federationName, final int fsPort, final String localSk3) throws Sim0MQException
263 {
264
265 byte[] fm1Message;
266 if (localSk3.equals("sk-3"))
267 {
268 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", this.messageCount.getAndIncrement(),
269 MessageStatus.NEW, this.modelName, "java8+", "-jar", "/home/alexandv/sim0mq/MM1/mm1.jar",
270 this.modelName + " %PORT%", "/home/alexandv/sim0mq/MM1", "",
271 "/home/alexandv/sim0mq/MM1/out_" + this.modelName + ".txt",
272 "/home/alexandv/sim0mq/MM1/err_" + this.modelName + ".txt", false, true, true);
273 this.fsSocket.connect("tcp://130.161.3.179:" + fsPort);
274 }
275 else
276 {
277 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", this.messageCount.getAndIncrement(),
278 MessageStatus.NEW, this.modelName, "java8+", "-jar", "e:/MM1/mm1.jar", this.modelName + " %PORT%",
279 "e:/MM1", "", "e:/MM1/out_" + this.modelName + ".txt", "e:/MM1/err_" + this.modelName + ".txt", false,
280 true, true);
281 this.fsSocket.connect("tcp://127.0.0.1:" + fsPort);
282 }
283 this.fsSocket.send(fm1Message);
284
285 byte[] reply = this.fsSocket.recv(0);
286 Object[] replyMessage = SimulationMessage.decode(reply);
287 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
288
289 if (replyMessage[4].toString().equals("FS.2") && replyMessage[9].toString().equals("started")
290 && replyMessage[8].toString().equals(this.modelName))
291 {
292 this.state = ModelState.STARTED;
293 this.modelSocket.connect("tcp://127.0.0.1:" + replyMessage[10].toString());
294 }
295 else
296 {
297 this.state = ModelState.ERROR;
298 System.err.println("Model not started correctly -- state = " + replyMessage[9]);
299 System.err.println("Started model = " + replyMessage[8]);
300 System.err.println("Error message = " + replyMessage[10]);
301 }
302
303 }
304
305
306
307
308
309
310 private void sendSimRunControl(final String federationName) throws Sim0MQException
311 {
312 long messageNumber = this.messageCount.get();
313 byte[] fm2Message;
314 fm2Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.2",
315 this.messageCount.getAndIncrement(), MessageStatus.NEW, 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0);
316 this.modelSocket.send(fm2Message);
317
318 byte[] reply = this.modelSocket.recv(0);
319 Object[] replyMessage = SimulationMessage.decode(reply);
320 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
321
322 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
323 && ((Long) replyMessage[8]).longValue() == messageNumber)
324 {
325 this.state = ModelState.RUNCONTROL;
326 }
327 else
328 {
329 this.state = ModelState.ERROR;
330 System.err.println("Model not started correctly -- status = " + replyMessage[9]);
331 System.err.println("Error message = " + replyMessage[10]);
332 }
333 }
334
335
336
337
338
339
340 private void setParameters(final String federationName) throws Sim0MQException
341 {
342 Map<String, Object> parameters = new HashMap<>();
343 parameters.put("iat", new Double(1.0));
344 parameters.put("servicetime", new Double(0.85));
345 parameters.put("seed", Math.abs(this.modelName.hashCode()));
346
347 for (String parameterName : parameters.keySet())
348 {
349 if (!this.state.isError())
350 {
351 long messageNumber = this.messageCount.get();
352 byte[] fm3Message;
353 fm3Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.3",
354 this.messageCount.getAndIncrement(), MessageStatus.NEW, parameterName,
355 parameters.get(parameterName));
356 this.modelSocket.send(fm3Message);
357
358 byte[] reply = this.modelSocket.recv(0);
359 Object[] replyMessage = SimulationMessage.decode(reply);
360 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
361
362 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
363 && ((Long) replyMessage[8]).longValue() == messageNumber)
364 {
365 this.state = ModelState.PARAMETERS;
366 }
367 else
368 {
369 this.state = ModelState.ERROR;
370 System.err.println("Model parameter error -- status = " + replyMessage[9]);
371 System.err.println("Error message = " + replyMessage[10]);
372 }
373 }
374 }
375 if (!this.state.isError())
376 {
377 this.state = ModelState.PARAMETERS;
378 }
379 }
380
381
382
383
384
385
386 private void sendSimStart(final String federationName) throws Sim0MQException
387 {
388 long messageNumber = this.messageCount.get();
389 byte[] fm4Message;
390 fm4Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.4",
391 this.messageCount.getAndIncrement(), MessageStatus.NEW);
392 this.modelSocket.send(fm4Message);
393
394 byte[] reply = this.modelSocket.recv(0);
395 Object[] replyMessage = SimulationMessage.decode(reply);
396 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
397
398 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
399 && ((Long) replyMessage[8]).longValue() == messageNumber)
400 {
401 this.state = ModelState.SIMULATORSTARTED;
402 }
403 else
404 {
405 this.state = ModelState.ERROR;
406 System.err.println("Simulation start error -- status = " + replyMessage[9]);
407 System.err.println("Error message = " + replyMessage[10]);
408 }
409 }
410
411
412
413
414
415
416 private void waitForSimEnded(final String federationName) throws Sim0MQException
417 {
418 while (!this.state.isSimulatorEnded() && !this.state.isError())
419 {
420 long messageNumber = this.messageCount.get();
421 byte[] fm5Message;
422 fm5Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.5",
423 this.messageCount.getAndIncrement(), MessageStatus.NEW);
424 this.modelSocket.send(fm5Message);
425
426 byte[] reply = this.modelSocket.recv(0);
427 Object[] replyMessage = SimulationMessage.decode(reply);
428 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
429
430 if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
431 && !replyMessage[9].toString().equals("started")
432 && ((Long) replyMessage[8]).longValue() == messageNumber)
433 {
434 if (replyMessage[9].toString().equals("ended"))
435 {
436 this.state = ModelState.SIMULATORENDED;
437 }
438 else
439 {
440
441 try
442 {
443 Thread.sleep(1000);
444 }
445 catch (InterruptedException ie)
446 {
447
448 }
449 }
450 }
451 else
452 {
453 this.state = ModelState.ERROR;
454 System.err.println("Simulation start error -- status = " + replyMessage[9]);
455 System.err.println("Error message = " + replyMessage[10]);
456 }
457 }
458 }
459
460
461
462
463
464
465 private void requestStatistics(final String federationName) throws Sim0MQException
466 {
467 List<String> stats = new ArrayList<>();
468 stats.add("dN.average");
469 stats.add("qN.max");
470 stats.add("uN.average");
471
472 for (String statName : stats)
473 {
474 if (!this.state.isError())
475 {
476 byte[] fm6Message;
477 fm6Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.6",
478 this.messageCount.getAndIncrement(), MessageStatus.NEW, statName);
479 this.modelSocket.send(fm6Message);
480
481 byte[] reply = this.modelSocket.recv(0);
482 Object[] replyMessage = SimulationMessage.decode(reply);
483 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
484
485 if (replyMessage[4].toString().equals("MC.3"))
486 {
487 if (replyMessage[9].toString().equals(statName))
488 {
489 System.out.println("Received statistic for " + statName + " = " + replyMessage[10].toString());
490 this.statistics.put(statName, (Number) replyMessage[10]);
491 }
492 else
493 {
494 this.state = ModelState.ERROR;
495 System.err.println(
496 "Statistics Error: Stat variable expected = " + statName + ", got: " + replyMessage[8]);
497 }
498 }
499 else if (replyMessage[4].toString().equals("MC.4"))
500 {
501 this.state = ModelState.ERROR;
502 System.err.println("Statistics Error: Stat variable = " + replyMessage[8]);
503 System.err.println("Error message = " + replyMessage[9]);
504 }
505 else
506 {
507 this.state = ModelState.ERROR;
508 System.err.println("Statistics Error: Received unknown message as reply to FM6: " + replyMessage[4]);
509 }
510 }
511 }
512 if (!this.state.isError())
513 {
514 this.state = ModelState.STATISTICSGATHERED;
515 }
516 }
517
518
519
520
521
522
523 private void killFederate(final String federationName) throws Sim0MQException
524 {
525 byte[] fm8Message;
526 fm8Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.8", this.messageCount.getAndIncrement(),
527 MessageStatus.NEW, this.modelName);
528 this.fsSocket.send(fm8Message);
529
530 byte[] reply = this.fsSocket.recv(0);
531 Object[] replyMessage = SimulationMessage.decode(reply);
532 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
533
534 if (replyMessage[4].toString().equals("FS.4") && (boolean) replyMessage[9]
535 && replyMessage[8].toString().equals(this.modelName))
536 {
537 this.state = ModelState.TERMINATED;
538 }
539 else
540 {
541 this.state = ModelState.ERROR;
542 System.err.println("Model not killed correctly");
543 System.err.println("Tried to kill model = " + replyMessage[8]);
544 System.err.println("Error message = " + replyMessage[10]);
545 }
546 }
547
548
549
550
551 public final Map<String, Number> getStatistics()
552 {
553 return this.statistics;
554 }
555
556 }
557
558 }