1 package org.sim0mq.demo.mm1;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.UUID;
8
9 import org.djutils.serialization.SerializationException;
10 import org.sim0mq.Sim0MQException;
11 import org.sim0mq.federationmanager.ModelState;
12 import org.sim0mq.message.MessageStatus;
13 import org.sim0mq.message.SimulationMessage;
14 import org.zeromq.SocketType;
15 import org.zeromq.ZContext;
16 import org.zeromq.ZMQ;
17
18
19
20
21
22
23
24
25
26
27
28 public class MM1FederationManager
29 {
30
31 private ModelState state;
32
33
34 private ZMQ.Socket modelSocket;
35
36
37 private ZMQ.Socket fsSocket;
38
39
40 private ZContext fmContext;
41
42
43 private long messageCount = 0;
44
45
46
47
48
49
50
51
52
53
54 public MM1FederationManager(final String federationName, final int fmPort, final int fsPort, final String localSk3)
55 throws Sim0MQException, SerializationException
56 {
57 this.fmContext = new ZContext(1);
58 this.modelSocket = this.fmContext.createSocket(SocketType.REQ);
59 this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
60 this.fsSocket = this.fmContext.createSocket(SocketType.REQ);
61 this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
62
63 this.state = ModelState.NOT_STARTED;
64 boolean ready = false;
65 while (!ready)
66 {
67 System.out.println(this.state);
68
69 switch (this.state)
70 {
71 case NOT_STARTED:
72 startModel(federationName, fsPort, localSk3);
73 break;
74
75 case STARTED:
76 sendSimRunControl(federationName);
77 break;
78
79 case RUNCONTROL:
80 setParameters(federationName);
81 break;
82
83 case PARAMETERS:
84 sendSimStart(federationName);
85 break;
86
87 case SIMULATORSTARTED:
88 waitForSimEnded(federationName);
89 break;
90
91 case SIMULATORENDED:
92 requestStatistics(federationName);
93 break;
94
95 case STATISTICSGATHERED:
96 killFederate(federationName);
97 ready = true;
98 break;
99
100 case ERROR:
101 killFederate(federationName);
102 ready = true;
103 break;
104
105 default:
106 break;
107 }
108 }
109
110 this.modelSocket.close();
111 this.fsSocket.close();
112 this.fmContext.destroy();
113 this.fmContext.close();
114 }
115
116
117
118
119
120
121
122
123
124 private void startModel(final String federationName, final int fsPort, final String localSk3)
125 throws Sim0MQException, SerializationException
126 {
127
128 byte[] fm1Message;
129 if (localSk3.equals("sk-3"))
130 {
131 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", ++this.messageCount,
132 MessageStatus.NEW, "MM1.1", "java8+", "-jar", "/home/alexandv/sim0mq/MM1/mm1.jar", "MM1.1 %PORT%",
133 "/home/alexandv/sim0mq/MM1", "", "/home/alexandv/sim0mq/MM1/out.txt", "/home/alexandv/sim0mq/MM1/err.txt",
134 false, false, false);
135 this.fsSocket.connect("tcp://130.161.3.179:" + fsPort);
136 }
137 else
138 {
139 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", ++this.messageCount,
140 MessageStatus.NEW, "MM1.1", "java8+", "-jar", "e:/sim0mq/MM1/mm1.jar", "MM1.1 %PORT%", "e:/sim0mq/MM1", "",
141 "e:/sim0mq/MM1/out.txt", "e:/sim0mq/MM1/err.txt", false, false, false);
142 this.fsSocket.connect("tcp://127.0.0.1:" + fsPort);
143 }
144 this.fsSocket.send(fm1Message);
145
146 byte[] reply = this.fsSocket.recv(0);
147 Object[] replyMessage = SimulationMessage.decode(reply);
148 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
149
150 if (replyMessage[4].toString().equals("FS.2") && replyMessage[9].toString().equals("started")
151 && replyMessage[8].toString().equals("MM1.1"))
152 {
153 this.state = ModelState.STARTED;
154 this.modelSocket.connect("tcp://127.0.0.1:" + replyMessage[10].toString());
155 }
156 else
157 {
158 this.state = ModelState.ERROR;
159 System.err.println("Model not started correctly -- state = " + replyMessage[9]);
160 System.err.println("Started model = " + replyMessage[8]);
161 System.err.println("Error message = " + replyMessage[10]);
162 }
163
164 }
165
166
167
168
169
170
171
172 private void sendSimRunControl(final String federationName) throws Sim0MQException, SerializationException
173 {
174 byte[] fm2Message;
175 fm2Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.2", ++this.messageCount, MessageStatus.NEW,
176 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0);
177 this.modelSocket.send(fm2Message);
178
179 byte[] reply = this.modelSocket.recv(0);
180 Object[] replyMessage = SimulationMessage.decode(reply);
181 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
182
183 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
184 && ((Long) replyMessage[8]).longValue() == this.messageCount)
185 {
186 this.state = ModelState.RUNCONTROL;
187 }
188 else
189 {
190 this.state = ModelState.ERROR;
191 System.err.println("Model not started correctly -- status = " + replyMessage[9]);
192 System.err.println("Error message = " + replyMessage[10]);
193 }
194 }
195
196
197
198
199
200
201
202 private void setParameters(final String federationName) throws Sim0MQException, SerializationException
203 {
204 Map<String, Object> parameters = new HashMap<>();
205 parameters.put("iat", new Double(1.0));
206 parameters.put("servicetime", new Double(0.85));
207
208 for (String parameterName : parameters.keySet())
209 {
210 if (!this.state.isError())
211 {
212 byte[] fm3Message;
213 fm3Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.3", ++this.messageCount,
214 MessageStatus.NEW, parameterName, parameters.get(parameterName));
215 this.modelSocket.send(fm3Message);
216
217 byte[] reply = this.modelSocket.recv(0);
218 Object[] replyMessage = SimulationMessage.decode(reply);
219 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
220
221 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
222 && ((Long) replyMessage[8]).longValue() == this.messageCount)
223 {
224 this.state = ModelState.PARAMETERS;
225 }
226 else
227 {
228 this.state = ModelState.ERROR;
229 System.err.println("Model parameter error -- status = " + replyMessage[9]);
230 System.err.println("Error message = " + replyMessage[10]);
231 }
232 }
233 }
234 if (!this.state.isError())
235 {
236 this.state = ModelState.PARAMETERS;
237 }
238 }
239
240
241
242
243
244
245
246 private void sendSimStart(final String federationName) throws Sim0MQException, SerializationException
247 {
248 byte[] fm4Message;
249 fm4Message =
250 SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.4", ++this.messageCount, MessageStatus.NEW);
251 this.modelSocket.send(fm4Message);
252
253 byte[] reply = this.modelSocket.recv(0);
254 Object[] replyMessage = SimulationMessage.decode(reply);
255 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
256
257 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
258 && ((Long) replyMessage[8]).longValue() == this.messageCount)
259 {
260 this.state = ModelState.SIMULATORSTARTED;
261 }
262 else
263 {
264 this.state = ModelState.ERROR;
265 System.err.println("Simulation start error -- status = " + replyMessage[9]);
266 System.err.println("Error message = " + replyMessage[10]);
267 }
268 }
269
270
271
272
273
274
275
276 private void waitForSimEnded(final String federationName) throws Sim0MQException, SerializationException
277 {
278 while (!this.state.isSimulatorEnded() && !this.state.isError())
279 {
280 byte[] fm5Message;
281 fm5Message =
282 SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.5", ++this.messageCount, MessageStatus.NEW);
283 this.modelSocket.send(fm5Message);
284
285 byte[] reply = this.modelSocket.recv(0);
286 Object[] replyMessage = SimulationMessage.decode(reply);
287 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
288
289 if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
290 && !replyMessage[9].toString().equals("started")
291 && ((Long) replyMessage[8]).longValue() == this.messageCount)
292 {
293 if (replyMessage[9].toString().equals("ended"))
294 {
295 this.state = ModelState.SIMULATORENDED;
296 }
297 else
298 {
299
300 try
301 {
302 Thread.sleep(1000);
303 }
304 catch (InterruptedException ie)
305 {
306
307 }
308 }
309 }
310 else
311 {
312 this.state = ModelState.ERROR;
313 System.err.println("Simulation start error -- status = " + replyMessage[9]);
314 System.err.println("Error message = " + replyMessage[10]);
315 }
316 }
317 }
318
319
320
321
322
323
324
325 private void requestStatistics(final String federationName) throws Sim0MQException, SerializationException
326 {
327 List<String> stats = new ArrayList<>();
328 stats.add("dN.average");
329 stats.add("qN.max");
330 stats.add("uN.average");
331
332 for (String statName : stats)
333 {
334 if (!this.state.isError())
335 {
336 byte[] fm6Message;
337 fm6Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.6", ++this.messageCount,
338 MessageStatus.NEW, statName);
339 this.modelSocket.send(fm6Message);
340
341 byte[] reply = this.modelSocket.recv(0);
342 Object[] replyMessage = SimulationMessage.decode(reply);
343 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
344
345 if (replyMessage[4].toString().equals("MC.3"))
346 {
347 if (replyMessage[9].toString().equals(statName))
348 {
349 System.out.println("Received statistic for " + statName + " = " + replyMessage[10].toString());
350 }
351 else
352 {
353 this.state = ModelState.ERROR;
354 System.err.println(
355 "Statistics Error: Stat variable expected = " + statName + ", got: " + replyMessage[8]);
356 }
357 }
358 else if (replyMessage[4].toString().equals("MC.4"))
359 {
360 this.state = ModelState.ERROR;
361 System.err.println("Statistics Error: Stat variable = " + replyMessage[8]);
362 System.err.println("Error message = " + replyMessage[9]);
363 }
364 else
365 {
366 this.state = ModelState.ERROR;
367 System.err.println("Statistics Error: Received unknown message as reply to FM6: " + replyMessage[4]);
368 }
369 }
370 }
371 if (!this.state.isError())
372 {
373 this.state = ModelState.STATISTICSGATHERED;
374 }
375 }
376
377
378
379
380
381
382
383 private void killFederate(final String federationName) throws Sim0MQException, SerializationException
384 {
385 byte[] fm8Message;
386 fm8Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.8", ++this.messageCount, MessageStatus.NEW,
387 "MM1.1");
388 this.fsSocket.send(fm8Message);
389
390 byte[] reply = this.fsSocket.recv(0);
391 Object[] replyMessage = SimulationMessage.decode(reply);
392 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
393
394 if (replyMessage[4].toString().equals("FS.4") && (boolean) replyMessage[9]
395 && replyMessage[8].toString().equals("MM1.1"))
396 {
397 this.state = ModelState.TERMINATED;
398 }
399 else
400 {
401 this.state = ModelState.ERROR;
402 System.err.println("Model not killed correctly");
403 System.err.println("Tried to kill model = " + replyMessage[8]);
404 System.err.println("Error message = " + replyMessage[10]);
405 }
406
407 }
408
409
410
411
412
413
414 public static void main(final String[] args) throws Sim0MQException, SerializationException
415 {
416 if (args.length < 4)
417 {
418 System.err.println(
419 "Use as FederationManager federationName federationManagerPortNumber federateStarterPortNumber local/sk-3");
420 System.exit(-1);
421 }
422 String federationName = args[0];
423
424 String fmsPort = args[1];
425 int fmPort = 0;
426 try
427 {
428 fmPort = Integer.parseInt(fmsPort);
429 }
430 catch (NumberFormatException nfe)
431 {
432 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk-3, where fmPort is a number");
433 System.exit(-1);
434 }
435 if (fmPort == 0 || fmPort > 65535)
436 {
437 System.err.println("fmPort should be between 1 and 65535");
438 System.exit(-1);
439 }
440
441 String fssPort = args[2];
442 int fsPort = 0;
443 try
444 {
445 fsPort = Integer.parseInt(fssPort);
446 }
447 catch (NumberFormatException nfe)
448 {
449 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where fsPort is a number");
450 System.exit(-1);
451 }
452 if (fsPort == 0 || fsPort > 65535)
453 {
454 System.err.println("fsPort should be between 1 and 65535");
455 System.exit(-1);
456 }
457
458 String localSk3 = args[3];
459 if (!localSk3.equals("local") && !localSk3.equals("sk-3"))
460 {
461 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where last arg is local/sk-3");
462 System.exit(-1);
463 }
464
465 new MM1FederationManager(federationName, fmPort, fsPort, localSk3);
466
467 }
468
469 }