1 package org.sim0mq.demo.mm1;
2
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.UUID;
8
9 import org.sim0mq.Sim0MQException;
10 import org.sim0mq.federationmanager.ModelState;
11 import org.sim0mq.message.MessageStatus;
12 import org.sim0mq.message.SimulationMessage;
13 import org.zeromq.ZMQ;
14
15
16
17
18
19
20
21
22
23
24
25 public class MM1FederationManager
26 {
27
28 private ModelState state;
29
30
31 private ZMQ.Socket modelSocket;
32
33
34 private ZMQ.Socket fsSocket;
35
36
37 private ZMQ.Context fmContext;
38
39
40 private long messageCount = 0;
41
42
43
44
45
46
47
48
49
50 public MM1FederationManager(final String federationName, final int fmPort, final int fsPort, final String localSk3)
51 throws Sim0MQException
52 {
53 this.fmContext = ZMQ.context(1);
54 this.modelSocket = this.fmContext.socket(ZMQ.REQ);
55 this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
56 this.fsSocket = this.fmContext.socket(ZMQ.REQ);
57 this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
58
59 this.state = ModelState.NOT_STARTED;
60 boolean ready = false;
61 while (!ready)
62 {
63 System.out.println(this.state);
64
65 switch (this.state)
66 {
67 case NOT_STARTED:
68 startModel(federationName, fsPort, localSk3);
69 break;
70
71 case STARTED:
72 sendSimRunControl(federationName);
73 break;
74
75 case RUNCONTROL:
76 setParameters(federationName);
77 break;
78
79 case PARAMETERS:
80 sendSimStart(federationName);
81 break;
82
83 case SIMULATORSTARTED:
84 waitForSimEnded(federationName);
85 break;
86
87 case SIMULATORENDED:
88 requestStatistics(federationName);
89 break;
90
91 case STATISTICSGATHERED:
92 killFederate(federationName);
93 ready = true;
94 break;
95
96 case ERROR:
97 killFederate(federationName);
98 ready = true;
99 break;
100
101 default:
102 break;
103 }
104 }
105
106 this.modelSocket.close();
107 this.fsSocket.close();
108 this.fmContext.term();
109 }
110
111
112
113
114
115
116
117
118 private void startModel(final String federationName, final int fsPort, final String localSk3) throws Sim0MQException
119 {
120
121 byte[] fm1Message;
122 if (localSk3.equals("sk-3"))
123 {
124 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", ++this.messageCount, MessageStatus.NEW,
125 "MM1.1", "java8+", "-jar", "/home/alexandv/sim0mq/MM1/mm1.jar", "MM1.1 %PORT%", "/home/alexandv/sim0mq/MM1",
126 "", "/home/alexandv/sim0mq/MM1/out.txt", "/home/alexandv/sim0mq/MM1/err.txt", false, false, false);
127 this.fsSocket.connect("tcp://130.161.3.179:" + fsPort);
128 }
129 else
130 {
131 fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", ++this.messageCount, MessageStatus.NEW,
132 "MM1.1", "java8+", "-jar", "e:/MM1/mm1.jar", "MM1.1 %PORT%", "e:/MM1", "", "e:/MM1/out.txt", "e:/MM1/err.txt",
133 false, false, false);
134 this.fsSocket.connect("tcp://127.0.0.1:" + fsPort);
135 }
136 this.fsSocket.send(fm1Message);
137
138 byte[] reply = this.fsSocket.recv(0);
139 Object[] replyMessage = SimulationMessage.decode(reply);
140 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
141
142 if (replyMessage[4].toString().equals("FS.2") && replyMessage[9].toString().equals("started")
143 && replyMessage[8].toString().equals("MM1.1"))
144 {
145 this.state = ModelState.STARTED;
146 this.modelSocket.connect("tcp://127.0.0.1:" + replyMessage[10].toString());
147 }
148 else
149 {
150 this.state = ModelState.ERROR;
151 System.err.println("Model not started correctly -- state = " + replyMessage[9]);
152 System.err.println("Started model = " + replyMessage[8]);
153 System.err.println("Error message = " + replyMessage[10]);
154 }
155
156 }
157
158
159
160
161
162
163 private void sendSimRunControl(final String federationName) throws Sim0MQException
164 {
165 byte[] fm2Message;
166 fm2Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.2", ++this.messageCount, MessageStatus.NEW,
167 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0);
168 this.modelSocket.send(fm2Message);
169
170 byte[] reply = this.modelSocket.recv(0);
171 Object[] replyMessage = SimulationMessage.decode(reply);
172 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
173
174 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
175 && ((Long) replyMessage[8]).longValue() == this.messageCount)
176 {
177 this.state = ModelState.RUNCONTROL;
178 }
179 else
180 {
181 this.state = ModelState.ERROR;
182 System.err.println("Model not started correctly -- status = " + replyMessage[9]);
183 System.err.println("Error message = " + replyMessage[10]);
184 }
185 }
186
187
188
189
190
191
192 private void setParameters(final String federationName) throws Sim0MQException
193 {
194 Map<String, Object> parameters = new HashMap<>();
195 parameters.put("iat", new Double(1.0));
196 parameters.put("servicetime", new Double(0.85));
197
198 for (String parameterName : parameters.keySet())
199 {
200 if (!this.state.isError())
201 {
202 byte[] fm3Message;
203 fm3Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.3", ++this.messageCount,
204 MessageStatus.NEW, parameterName, parameters.get(parameterName));
205 this.modelSocket.send(fm3Message);
206
207 byte[] reply = this.modelSocket.recv(0);
208 Object[] replyMessage = SimulationMessage.decode(reply);
209 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
210
211 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
212 && ((Long) replyMessage[8]).longValue() == this.messageCount)
213 {
214 this.state = ModelState.PARAMETERS;
215 }
216 else
217 {
218 this.state = ModelState.ERROR;
219 System.err.println("Model parameter error -- status = " + replyMessage[9]);
220 System.err.println("Error message = " + replyMessage[10]);
221 }
222 }
223 }
224 if (!this.state.isError())
225 {
226 this.state = ModelState.PARAMETERS;
227 }
228 }
229
230
231
232
233
234
235 private void sendSimStart(final String federationName) throws Sim0MQException
236 {
237 byte[] fm4Message;
238 fm4Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.4", ++this.messageCount, MessageStatus.NEW);
239 this.modelSocket.send(fm4Message);
240
241 byte[] reply = this.modelSocket.recv(0);
242 Object[] replyMessage = SimulationMessage.decode(reply);
243 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
244
245 if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
246 && ((Long) replyMessage[8]).longValue() == this.messageCount)
247 {
248 this.state = ModelState.SIMULATORSTARTED;
249 }
250 else
251 {
252 this.state = ModelState.ERROR;
253 System.err.println("Simulation start error -- status = " + replyMessage[9]);
254 System.err.println("Error message = " + replyMessage[10]);
255 }
256 }
257
258
259
260
261
262
263 private void waitForSimEnded(final String federationName) throws Sim0MQException
264 {
265 while (!this.state.isSimulatorEnded() && !this.state.isError())
266 {
267 byte[] fm5Message;
268 fm5Message =
269 SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.5", ++this.messageCount, MessageStatus.NEW);
270 this.modelSocket.send(fm5Message);
271
272 byte[] reply = this.modelSocket.recv(0);
273 Object[] replyMessage = SimulationMessage.decode(reply);
274 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
275
276 if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
277 && !replyMessage[9].toString().equals("started")
278 && ((Long) replyMessage[8]).longValue() == this.messageCount)
279 {
280 if (replyMessage[9].toString().equals("ended"))
281 {
282 this.state = ModelState.SIMULATORENDED;
283 }
284 else
285 {
286
287 try
288 {
289 Thread.sleep(1000);
290 }
291 catch (InterruptedException ie)
292 {
293
294 }
295 }
296 }
297 else
298 {
299 this.state = ModelState.ERROR;
300 System.err.println("Simulation start error -- status = " + replyMessage[9]);
301 System.err.println("Error message = " + replyMessage[10]);
302 }
303 }
304 }
305
306
307
308
309
310
311 private void requestStatistics(final String federationName) throws Sim0MQException
312 {
313 List<String> stats = new ArrayList<>();
314 stats.add("dN.average");
315 stats.add("qN.max");
316 stats.add("uN.average");
317
318 for (String statName : stats)
319 {
320 if (!this.state.isError())
321 {
322 byte[] fm6Message;
323 fm6Message = SimulationMessage.encodeUTF8(federationName, "FM", "MM1.1", "FM.6", ++this.messageCount,
324 MessageStatus.NEW, statName);
325 this.modelSocket.send(fm6Message);
326
327 byte[] reply = this.modelSocket.recv(0);
328 Object[] replyMessage = SimulationMessage.decode(reply);
329 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
330
331 if (replyMessage[4].toString().equals("MC.3"))
332 {
333 if (replyMessage[9].toString().equals(statName))
334 {
335 System.out.println("Received statistic for " + statName + " = " + replyMessage[10].toString());
336 }
337 else
338 {
339 this.state = ModelState.ERROR;
340 System.err.println(
341 "Statistics Error: Stat variable expected = " + statName + ", got: " + replyMessage[8]);
342 }
343 }
344 else if (replyMessage[4].toString().equals("MC.4"))
345 {
346 this.state = ModelState.ERROR;
347 System.err.println("Statistics Error: Stat variable = " + replyMessage[8]);
348 System.err.println("Error message = " + replyMessage[9]);
349 }
350 else
351 {
352 this.state = ModelState.ERROR;
353 System.err.println("Statistics Error: Received unknown message as reply to FM6: " + replyMessage[4]);
354 }
355 }
356 }
357 if (!this.state.isError())
358 {
359 this.state = ModelState.STATISTICSGATHERED;
360 }
361 }
362
363
364
365
366
367
368 private void killFederate(final String federationName) throws Sim0MQException
369 {
370 byte[] fm8Message;
371 fm8Message =
372 SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.8", ++this.messageCount, MessageStatus.NEW, "MM1.1");
373 this.fsSocket.send(fm8Message);
374
375 byte[] reply = this.fsSocket.recv(0);
376 Object[] replyMessage = SimulationMessage.decode(reply);
377 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
378
379 if (replyMessage[4].toString().equals("FS.4") && (boolean) replyMessage[9]
380 && replyMessage[8].toString().equals("MM1.1"))
381 {
382 this.state = ModelState.TERMINATED;
383 }
384 else
385 {
386 this.state = ModelState.ERROR;
387 System.err.println("Model not killed correctly");
388 System.err.println("Tried to kill model = " + replyMessage[8]);
389 System.err.println("Error message = " + replyMessage[10]);
390 }
391
392 }
393
394
395
396
397
398 public static void main(String[] args) throws Sim0MQException
399 {
400 if (args.length < 4)
401 {
402 System.err.println(
403 "Use as FederationManager federationName federationManagerPortNumber federateStarterPortNumber local/sk-3");
404 System.exit(-1);
405 }
406 String federationName = args[0];
407
408 String fmsPort = args[1];
409 int fmPort = 0;
410 try
411 {
412 fmPort = Integer.parseInt(fmsPort);
413 }
414 catch (NumberFormatException nfe)
415 {
416 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk-3, where fmPort is a number");
417 System.exit(-1);
418 }
419 if (fmPort == 0 || fmPort > 65535)
420 {
421 System.err.println("fmPort should be between 1 and 65535");
422 System.exit(-1);
423 }
424
425 String fssPort = args[2];
426 int fsPort = 0;
427 try
428 {
429 fsPort = Integer.parseInt(fssPort);
430 }
431 catch (NumberFormatException nfe)
432 {
433 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where fsPort is a number");
434 System.exit(-1);
435 }
436 if (fsPort == 0 || fsPort > 65535)
437 {
438 System.err.println("fsPort should be between 1 and 65535");
439 System.exit(-1);
440 }
441
442 String localSk3 = args[3];
443 if (!localSk3.equals("local") && !localSk3.equals("sk-3"))
444 {
445 System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where last arg is local/sk-3");
446 System.exit(-1);
447 }
448
449 new MM1FederationManager(federationName, fmPort, fsPort, localSk3);
450
451 }
452
453 }