1 package org.sim0mq.federatestarter;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.nio.file.Paths;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collections;
12 import java.util.LinkedHashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Properties;
16 import java.util.UUID;
17
18 import org.djutils.io.URLResource;
19 import org.djutils.serialization.SerializationException;
20 import org.sim0mq.Sim0MQException;
21 import org.sim0mq.message.Sim0MQMessage;
22 import org.sim0mq.message.federatestarter.FS1RequestStatusMessage;
23 import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
24 import org.sim0mq.message.federatestarter.FS4FederateKilledMessage;
25 import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
26 import org.sim0mq.message.federationmanager.FM8KillFederateMessage;
27 import org.sim0mq.message.modelcontroller.MC1StatusMessage;
28 import org.zeromq.SocketType;
29 import org.zeromq.ZContext;
30 import org.zeromq.ZMQ;
31 import org.zeromq.ZMQException;
32
33
34
35
36
37
38
39
40
41
42
43
44 public class FederateStarter
45 {
46
47 private final int fsPort;
48
49
50 private final int startPort;
51
52
53 private final int endPort;
54
55
56 @SuppressWarnings("checkstyle:visibilitymodifier")
57 protected Map<Object, Process> runningProcessMap = Collections.synchronizedMap(new LinkedHashMap<>());
58
59
60 private Map<Object, Integer> modelPortMap = Collections.synchronizedMap(new LinkedHashMap<>());
61
62
63 private Map<Object, FM1StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new LinkedHashMap<>());
64
65
66 @SuppressWarnings("checkstyle:visibilitymodifier")
67 final Properties softwareProperties;
68
69
70 private ZMQ.Socket fsSocket;
71
72
73 private ZContext fsContext;
74
75
76 private long messageCount = 0;
77
78
79 private final boolean modelController;
80
81
82
83
84
85
86
87
88
89
90 public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort,
91 final boolean modelController) throws Sim0MQException, SerializationException
92 {
93 this.softwareProperties = softwareProperties;
94 this.fsPort = fsPort;
95 this.startPort = startPort;
96 this.endPort = endPort;
97 this.modelController = modelController;
98
99 this.fsContext = new ZContext(1);
100
101 this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
102 this.fsSocket.bind("tcp://*:" + this.fsPort);
103
104 while (!Thread.currentThread().isInterrupted())
105 {
106
107 try
108 {
109 String identity = this.fsSocket.recvStr();
110 this.fsSocket.recvStr();
111
112 byte[] request = this.fsSocket.recv(0);
113 Sim0MQMessage message = Sim0MQMessage.decode(request);
114 String messageTypeId = message.getMessageTypeId().toString();
115 String receiverId = message.getReceiverId().toString();
116
117 System.out.println("Received " + Sim0MQMessage.print(message.createObjectArray()));
118
119 if (receiverId.equals("FS"))
120 {
121 switch (messageTypeId)
122 {
123 case "FM.1":
124 processStartFederate(identity, message);
125 break;
126
127 case "FM.8":
128 processKillFederate(identity, message);
129 break;
130
131 case "FM.9":
132
133 break;
134
135 default:
136
137 System.err.println("Received unknown message -- not processed: " + messageTypeId);
138 }
139 }
140 else
141 {
142
143 System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
144 }
145 }
146 catch (ZMQException e)
147 {
148 System.err.println(e.getMessage());
149 }
150 }
151
152 try
153 {
154 this.fsSocket.close();
155 this.fsContext.destroy();
156 }
157 catch (Exception e)
158 {
159 System.err.println(e.getMessage());
160 }
161 }
162
163
164
165
166
167
168
169
170 private void processStartFederate(final String identity, final Sim0MQMessage message)
171 throws Sim0MQException, SerializationException
172 {
173 FM1StartFederateMessage startFederateMessage = new FM1StartFederateMessage(message.createObjectArray());
174 String error = "";
175
176 int modelPort = findFreePortNumber();
177
178 if (modelPort == -1)
179 {
180 error = "No free port number";
181 }
182
183 else
184
185 {
186 try
187 {
188 ProcessBuilder pb = new ProcessBuilder();
189
190 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
191 pb.directory(workingPath.toFile());
192
193 String softwareCode = "";
194 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
195 {
196 System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
197 + " in software properties file");
198 }
199 else
200 {
201 softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
202
203 List<String> pbArgs = new ArrayList<>();
204 pbArgs.add(softwareCode);
205 pbArgs.add(startFederateMessage.getArgsBefore());
206 pbArgs.add(startFederateMessage.getModelPath());
207 pbArgs.addAll(Arrays.asList(
208 startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
209 pb.command(pbArgs);
210
211 String stdIn = startFederateMessage.getRedirectStdin();
212 String stdOut = startFederateMessage.getRedirectStdout();
213 String stdErr = startFederateMessage.getRedirectStderr();
214
215 if (stdIn.length() > 0)
216 {
217
218 File stdInFile = new File(stdIn);
219 pb.redirectInput(stdInFile);
220 }
221
222 if (stdOut.length() > 0)
223 {
224
225 File stdOutFile = new File(stdOut);
226 pb.redirectOutput(stdOutFile);
227 }
228
229 if (stdErr.length() > 0)
230 {
231
232 File stdErrFile = new File(stdErr);
233 pb.redirectError(stdErrFile);
234 }
235
236 new Thread()
237 {
238 @Override
239 public void run()
240 {
241 try
242 {
243 Process process = pb.start();
244 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
245 System.err.println("Process started:" + process.isAlive());
246 }
247 catch (IOException exception)
248 {
249 exception.printStackTrace();
250 }
251 }
252 }.start();
253
254 this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
255 this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
256
257
258
259
260 System.out.println("modelController : " + this.modelController);
261 if (this.modelController)
262 {
263 error = waitForModelStarted(startFederateMessage.getFederationId(),
264 startFederateMessage.getInstanceId(), modelPort);
265 }
266 }
267 }
268 catch (IOException exception)
269 {
270 exception.printStackTrace();
271 error = exception.getMessage();
272 }
273 }
274
275 System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
276
277
278 this.fsSocket.sendMore(identity);
279 this.fsSocket.sendMore("");
280
281 byte[] fs2Message = new FS2FederateStartedMessage.Builder()
282 .setSimulationRunId(startFederateMessage.getFederationId())
283 .setInstanceId(startFederateMessage.getInstanceId())
284 .setSenderId("FS")
285 .setReceiverId(startFederateMessage.getSenderId())
286 .setMessageId(++this.messageCount)
287 .setStatus(error.isEmpty() ? "started" : "error")
288 .setError(error)
289 .setModelPort(modelPort)
290 .build()
291 .createByteArray();
292 this.fsSocket.send(fs2Message, 0);
293
294 }
295
296
297
298
299
300 private int findFreePortNumber()
301 {
302 for (int port = this.startPort; port <= this.endPort; port++)
303 {
304 if (!this.modelPortMap.containsValue(port))
305 {
306
307 ZMQ.Socket testSocket = null;
308 try
309 {
310 testSocket = this.fsContext.createSocket(SocketType.REP);
311 testSocket.bind("tcp://127.0.0.1:" + port);
312 testSocket.unbind("tcp://127.0.0.1:" + port);
313 testSocket.close();
314 return port;
315 }
316 catch (Exception exception)
317 {
318
319 if (testSocket != null)
320 {
321 try
322 {
323 testSocket.close();
324 }
325 catch (Exception e)
326 {
327
328 }
329 }
330 }
331 }
332 }
333 return -1;
334 }
335
336
337
338
339
340
341
342
343
344
345 private String waitForModelStarted(final Object federationRunId, final Object modelId, final int modelPort)
346 throws Sim0MQException, SerializationException
347 {
348 boolean ok = true;
349 String error = "";
350 ZMQ.Socket modelSocket = null;
351 try
352 {
353 modelSocket = this.fsContext.createSocket(SocketType.REQ);
354 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
355 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
356 }
357 catch (Exception exception)
358 {
359 exception.printStackTrace();
360 ok = false;
361 error = exception.getMessage();
362 }
363
364 boolean started = false;
365 while (ok && !started)
366 {
367 byte[] fs1Message =
368 new FS1RequestStatusMessage(federationRunId, "FS", modelId, ++this.messageCount).createByteArray();
369 modelSocket.send(fs1Message, 0);
370 System.out.println("Sent: FS.1 to " + modelId + ", waiting on MC1");
371
372 byte[] reply = modelSocket.recv(0);
373 Object[] objectArray = Sim0MQMessage.decodeToArray(reply);
374 System.out.println("Received\n" + Sim0MQMessage.print(objectArray));
375 MC1StatusMessage replyMessage = new MC1StatusMessage(objectArray);
376
377
378 if (!replyMessage.getStatus().equals("error") && !replyMessage.getStatus().equals("ended")
379 && ((Long) replyMessage.getReplyToId()).longValue() == this.messageCount)
380 {
381 if (replyMessage.getStatus().equals("started"))
382 {
383 started = true;
384 }
385 else
386 {
387
388 try
389 {
390 Thread.sleep(100);
391 }
392 catch (InterruptedException ie)
393 {
394
395 }
396 }
397 }
398 else
399 {
400 ok = false;
401 error = replyMessage.getError();
402 System.err.println("Simulation start error -- status = " + replyMessage.getStatus());
403 System.err.println("Error message = " + error);
404 }
405 }
406
407 if (modelSocket != null)
408 {
409 modelSocket.close();
410 }
411
412 return error;
413 }
414
415
416
417
418
419
420
421
422 private void processKillFederate(final String identity, final Sim0MQMessage message)
423 throws Sim0MQException, SerializationException
424 {
425 boolean status = true;
426 String error = "";
427
428 Object federationRunId = message.getFederationId();
429 Object senderId = message.getSenderId();
430
431 FM8KillFederateMessage killMessage = new FM8KillFederateMessage(message.createObjectArray());
432 Object modelId = killMessage.getInstanceId();
433 if (!this.modelPortMap.containsKey(modelId))
434 {
435 status = false;
436 error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
437 }
438 else
439 {
440 int modelPort = this.modelPortMap.remove(modelId);
441 Process process = this.runningProcessMap.remove(modelId);
442
443 try
444 {
445 try
446 {
447 ZMQ.Socket modelSocket = this.fsContext.createSocket(SocketType.REQ);
448 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
449 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
450
451 byte[] fs3Message =
452 Sim0MQMessage.encodeUTF8(true, federationRunId, "FS", modelId, "FS.3", ++this.messageCount);
453 modelSocket.send(fs3Message, 0);
454
455 modelSocket.close();
456 }
457 catch (Exception exception)
458 {
459 exception.printStackTrace();
460 status = true;
461 error = exception.getMessage();
462 }
463
464 try
465 {
466 Thread.sleep(100);
467 }
468 catch (InterruptedException ie)
469 {
470
471 }
472
473 if (process != null && process.isAlive())
474 {
475 process.destroyForcibly();
476 }
477
478 FM1StartFederateMessage sfm = this.startFederateMessages.get(modelId);
479 if (sfm.isDeleteStdout())
480 {
481 if (sfm.getRedirectStdout().length() > 0)
482 {
483 File stdOutFile = new File(sfm.getRedirectStdout());
484 stdOutFile.delete();
485 }
486 }
487
488 if (sfm.isDeleteStderr())
489 {
490 if (sfm.getRedirectStderr().length() > 0)
491 {
492 File stdErrFile = new File(sfm.getRedirectStderr());
493 stdErrFile.delete();
494 }
495 }
496
497 if (sfm.isDeleteWorkingDirectory())
498 {
499 File workingDir = new File(sfm.getWorkingDirectory());
500 workingDir.delete();
501 }
502 }
503 catch (Exception exception)
504 {
505 exception.printStackTrace();
506 status = false;
507 error = exception.getMessage();
508 }
509
510 byte[] fs4Message =
511 new FS4FederateKilledMessage(federationRunId, "FS", senderId, ++this.messageCount, modelId, status, error)
512 .createByteArray();
513 this.fsSocket.sendMore(identity);
514 this.fsSocket.sendMore("");
515 this.fsSocket.send(fs4Message, 0);
516 }
517 }
518
519
520
521
522 public boolean isModelController()
523 {
524 return this.modelController;
525 }
526
527
528
529
530
531
532
533
534 public static void main(final String[] args) throws Sim0MQException, SerializationException
535 {
536 if (args.length < 4)
537 {
538 System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
539 System.exit(-1);
540 }
541
542 String sPort = args[0];
543 int port = 0;
544 try
545 {
546 port = Integer.parseInt(sPort);
547 }
548 catch (NumberFormatException nfe)
549 {
550 System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
551 System.exit(-1);
552 }
553 if (port == 0 || port > 65535)
554 {
555 System.err.println("PortNumber should be between 1 and 65535");
556 System.exit(-1);
557 }
558
559 String propertiesFile = args[1];
560 Properties softwareProperties = new Properties();
561 InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
562 try
563 {
564 softwareProperties.load(propertiesStream);
565 }
566 catch (IOException | NullPointerException e)
567 {
568 System.err.println("Could not find or read software properties file " + propertiesFile);
569 System.exit(-1);
570 }
571
572 String sStartPort = args[2];
573 int startPort = 0;
574 try
575 {
576 startPort = Integer.parseInt(sStartPort);
577 }
578 catch (NumberFormatException nfe)
579 {
580 System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
581 System.exit(-1);
582 }
583 if (startPort == 0 || startPort > 65535)
584 {
585 System.err.println("startPort should be between 1 and 65535");
586 System.exit(-1);
587 }
588
589 String sEndPort = args[3];
590 int endPort = 0;
591 try
592 {
593 endPort = Integer.parseInt(sEndPort);
594 }
595 catch (NumberFormatException nfe)
596 {
597 System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
598 System.exit(-1);
599 }
600 if (endPort == 0 || endPort > 65535)
601 {
602 System.err.println("endPort should be between 1 and 65535");
603 System.exit(-1);
604 }
605
606 new FederateStarter(port, softwareProperties, startPort, endPort, true);
607 }
608
609 }