1 package org.sim0mq.federatestarter;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.nio.file.Paths;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collections;
12 import java.util.LinkedHashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Properties;
16 import java.util.UUID;
17
18 import org.djutils.io.URLResource;
19 import org.djutils.serialization.SerializationException;
20 import org.sim0mq.Sim0MQException;
21 import org.sim0mq.message.Sim0MQMessage;
22 import org.sim0mq.message.federatestarter.FS1RequestStatusMessage;
23 import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
24 import org.sim0mq.message.federatestarter.FS4FederateKilledMessage;
25 import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
26 import org.sim0mq.message.federationmanager.FM8KillFederateMessage;
27 import org.sim0mq.message.modelcontroller.MC1StatusMessage;
28 import org.zeromq.SocketType;
29 import org.zeromq.ZContext;
30 import org.zeromq.ZMQ;
31 import org.zeromq.ZMQException;
32
33
34
35
36
37
38
39
40
41
42
43
44 public class FederateStarter
45 {
46
47 private final int fsPort;
48
49
50 private final int startPort;
51
52
53 private final int endPort;
54
55
56 @SuppressWarnings("checkstyle:visibilitymodifier")
57 protected Map<Object, Process> runningProcessMap = Collections.synchronizedMap(new LinkedHashMap<>());
58
59
60 private Map<Object, Integer> modelPortMap = Collections.synchronizedMap(new LinkedHashMap<>());
61
62
63 private Map<Object, FM1StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new LinkedHashMap<>());
64
65
66 @SuppressWarnings("checkstyle:visibilitymodifier")
67 final Properties softwareProperties;
68
69
70 private ZMQ.Socket fsSocket;
71
72
73 private ZContext fsContext;
74
75
76 private long messageCount = 0;
77
78
79 private final boolean modelController;
80
81
82
83
84
85
86
87
88
89
90 public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort,
91 final boolean modelController) throws Sim0MQException, SerializationException
92 {
93 this.softwareProperties = softwareProperties;
94 this.fsPort = fsPort;
95 this.startPort = startPort;
96 this.endPort = endPort;
97 this.modelController = modelController;
98
99 this.fsContext = new ZContext(1);
100
101 this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
102 this.fsSocket.bind("tcp://*:" + this.fsPort);
103
104 while (!Thread.currentThread().isInterrupted())
105 {
106
107 try
108 {
109 String identity = this.fsSocket.recvStr();
110 this.fsSocket.recvStr();
111
112 byte[] request = this.fsSocket.recv(0);
113 Sim0MQMessage message = Sim0MQMessage.decode(request);
114 String messageTypeId = message.getMessageTypeId().toString();
115 String receiverId = message.getReceiverId().toString();
116
117 System.out.println("Received " + Sim0MQMessage.print(message.createObjectArray()));
118
119 if (receiverId.equals("FS"))
120 {
121 switch (messageTypeId)
122 {
123 case "FM.1":
124 processStartFederate(identity, message);
125 break;
126
127 case "FM.8":
128 processKillFederate(identity, message);
129 break;
130
131 case "FM.9":
132
133 break;
134
135 default:
136
137 System.err.println("Received unknown message -- not processed: " + messageTypeId);
138 }
139 }
140 else
141 {
142
143 System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
144 }
145 }
146 catch (ZMQException e)
147 {
148 System.err.println(e.getMessage());
149 }
150 }
151
152 try
153 {
154 this.fsSocket.close();
155 this.fsContext.destroy();
156 }
157 catch (Exception e)
158 {
159 System.err.println(e.getMessage());
160 }
161 }
162
163
164
165
166
167
168
169
170 private void processStartFederate(final String identity, final Sim0MQMessage message)
171 throws Sim0MQException, SerializationException
172 {
173 FM1StartFederateMessage startFederateMessage = new FM1StartFederateMessage(message.createObjectArray());
174 String error = "";
175
176 int modelPort = findFreePortNumber();
177
178 if (modelPort == -1)
179 {
180 error = "No free port number";
181 }
182
183 else
184
185 {
186 try
187 {
188 ProcessBuilder pb = new ProcessBuilder();
189
190 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
191 pb.directory(workingPath.toFile());
192
193 String softwareCode = "";
194 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
195 {
196 System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
197 + " in software properties file");
198 }
199 else
200 {
201 softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
202
203 List<String> pbArgs = new ArrayList<>();
204 pbArgs.add(softwareCode);
205 pbArgs.add(startFederateMessage.getArgsBefore());
206 pbArgs.add(startFederateMessage.getModelPath());
207 pbArgs.addAll(Arrays.asList(
208 startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
209 pb.command(pbArgs);
210
211 String stdIn = startFederateMessage.getRedirectStdin();
212 String stdOut = startFederateMessage.getRedirectStdout();
213 String stdErr = startFederateMessage.getRedirectStderr();
214
215 if (stdIn.length() > 0)
216 {
217
218 File stdInFile = new File(stdIn);
219 pb.redirectInput(stdInFile);
220 }
221
222 if (stdOut.length() > 0)
223 {
224
225 File stdOutFile = new File(stdOut);
226 pb.redirectOutput(stdOutFile);
227 }
228
229 if (stdErr.length() > 0)
230 {
231
232 File stdErrFile = new File(stdErr);
233 pb.redirectError(stdErrFile);
234 }
235
236 new Thread()
237 {
238
239 @Override
240 public void run()
241 {
242 try
243 {
244 Process process = pb.start();
245 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
246 System.err.println("Process started:" + process.isAlive());
247 }
248 catch (IOException exception)
249 {
250 exception.printStackTrace();
251 }
252 }
253 }.start();
254
255 this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
256 this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
257
258
259
260
261 System.out.println("modelController : " + this.modelController);
262 if (this.modelController)
263 {
264 error = waitForModelStarted(startFederateMessage.getFederationId(),
265 startFederateMessage.getInstanceId(), modelPort);
266 }
267 }
268 }
269 catch (IOException exception)
270 {
271 exception.printStackTrace();
272 error = exception.getMessage();
273 }
274 }
275
276 System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
277
278
279 this.fsSocket.sendMore(identity);
280 this.fsSocket.sendMore("");
281
282 byte[] fs2Message = new FS2FederateStartedMessage.Builder()
283 .setSimulationRunId(startFederateMessage.getFederationId())
284 .setInstanceId(startFederateMessage.getInstanceId())
285 .setSenderId("FS")
286 .setReceiverId(startFederateMessage.getSenderId())
287 .setMessageId(++this.messageCount)
288 .setStatus(error.isEmpty() ? "started" : "error")
289 .setError(error)
290 .setModelPort(modelPort)
291 .build()
292 .createByteArray();
293 this.fsSocket.send(fs2Message, 0);
294
295 }
296
297
298
299
300
301 private int findFreePortNumber()
302 {
303 for (int port = this.startPort; port <= this.endPort; port++)
304 {
305 if (!this.modelPortMap.containsValue(port))
306 {
307
308 ZMQ.Socket testSocket = null;
309 try
310 {
311 testSocket = this.fsContext.createSocket(SocketType.REP);
312 testSocket.bind("tcp://127.0.0.1:" + port);
313 testSocket.unbind("tcp://127.0.0.1:" + port);
314 testSocket.close();
315 return port;
316 }
317 catch (Exception exception)
318 {
319
320 if (testSocket != null)
321 {
322 try
323 {
324 testSocket.close();
325 }
326 catch (Exception e)
327 {
328
329 }
330 }
331 }
332 }
333 }
334 return -1;
335 }
336
337
338
339
340
341
342
343
344
345
346 private String waitForModelStarted(final Object federationRunId, final Object modelId, final int modelPort)
347 throws Sim0MQException, SerializationException
348 {
349 boolean ok = true;
350 String error = "";
351 ZMQ.Socket modelSocket = null;
352 try
353 {
354 modelSocket = this.fsContext.createSocket(SocketType.REQ);
355 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
356 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
357 }
358 catch (Exception exception)
359 {
360 exception.printStackTrace();
361 ok = false;
362 error = exception.getMessage();
363 }
364
365 boolean started = false;
366 while (ok && !started)
367 {
368 byte[] fs1Message =
369 new FS1RequestStatusMessage(federationRunId, "FS", modelId, ++this.messageCount).createByteArray();
370 modelSocket.send(fs1Message, 0);
371 System.out.println("Sent: FS.1 to " + modelId + ", waiting on MC1");
372
373 byte[] reply = modelSocket.recv(0);
374 Object[] objectArray = Sim0MQMessage.decodeToArray(reply);
375 System.out.println("Received\n" + Sim0MQMessage.print(objectArray));
376 MC1StatusMessage replyMessage = new MC1StatusMessage(objectArray);
377
378
379 if (!replyMessage.getStatus().equals("error") && !replyMessage.getStatus().equals("ended")
380 && ((Long) replyMessage.getReplyToId()).longValue() == this.messageCount)
381 {
382 if (replyMessage.getStatus().equals("started"))
383 {
384 started = true;
385 }
386 else
387 {
388
389 try
390 {
391 Thread.sleep(100);
392 }
393 catch (InterruptedException ie)
394 {
395
396 }
397 }
398 }
399 else
400 {
401 ok = false;
402 error = replyMessage.getError();
403 System.err.println("Simulation start error -- status = " + replyMessage.getStatus());
404 System.err.println("Error message = " + error);
405 }
406 }
407
408 if (modelSocket != null)
409 {
410 modelSocket.close();
411 }
412
413 return error;
414 }
415
416
417
418
419
420
421
422
423 private void processKillFederate(final String identity, final Sim0MQMessage message)
424 throws Sim0MQException, SerializationException
425 {
426 boolean status = true;
427 String error = "";
428
429 Object federationRunId = message.getFederationId();
430 Object senderId = message.getSenderId();
431
432 FM8KillFederateMessage killMessage = new FM8KillFederateMessage(message.createObjectArray());
433 Object modelId = killMessage.getInstanceId();
434 if (!this.modelPortMap.containsKey(modelId))
435 {
436 status = false;
437 error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
438 }
439 else
440 {
441 int modelPort = this.modelPortMap.remove(modelId);
442 Process process = this.runningProcessMap.remove(modelId);
443
444 try
445 {
446 try
447 {
448 ZMQ.Socket modelSocket = this.fsContext.createSocket(SocketType.REQ);
449 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
450 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
451
452 byte[] fs3Message =
453 Sim0MQMessage.encodeUTF8(true, federationRunId, "FS", modelId, "FS.3", ++this.messageCount);
454 modelSocket.send(fs3Message, 0);
455
456 modelSocket.close();
457 }
458 catch (Exception exception)
459 {
460 exception.printStackTrace();
461 status = true;
462 error = exception.getMessage();
463 }
464
465 try
466 {
467 Thread.sleep(100);
468 }
469 catch (InterruptedException ie)
470 {
471
472 }
473
474 if (process != null && process.isAlive())
475 {
476 process.destroyForcibly();
477 }
478
479 FM1StartFederateMessage sfm = this.startFederateMessages.get(modelId);
480 if (sfm.isDeleteStdout())
481 {
482 if (sfm.getRedirectStdout().length() > 0)
483 {
484 File stdOutFile = new File(sfm.getRedirectStdout());
485 stdOutFile.delete();
486 }
487 }
488
489 if (sfm.isDeleteStderr())
490 {
491 if (sfm.getRedirectStderr().length() > 0)
492 {
493 File stdErrFile = new File(sfm.getRedirectStderr());
494 stdErrFile.delete();
495 }
496 }
497
498 if (sfm.isDeleteWorkingDirectory())
499 {
500 File workingDir = new File(sfm.getWorkingDirectory());
501 workingDir.delete();
502 }
503 }
504 catch (Exception exception)
505 {
506 exception.printStackTrace();
507 status = false;
508 error = exception.getMessage();
509 }
510
511 byte[] fs4Message =
512 new FS4FederateKilledMessage(federationRunId, "FS", senderId, ++this.messageCount, modelId, status, error)
513 .createByteArray();
514 this.fsSocket.sendMore(identity);
515 this.fsSocket.sendMore("");
516 this.fsSocket.send(fs4Message, 0);
517 }
518 }
519
520
521
522
523 public boolean isModelController()
524 {
525 return this.modelController;
526 }
527
528
529
530
531
532
533
534
535 public static void main(final String[] args) throws Sim0MQException, SerializationException
536 {
537 if (args.length < 4)
538 {
539 System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
540 System.exit(-1);
541 }
542
543 String sPort = args[0];
544 int port = 0;
545 try
546 {
547 port = Integer.parseInt(sPort);
548 }
549 catch (NumberFormatException nfe)
550 {
551 System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
552 System.exit(-1);
553 }
554 if (port == 0 || port > 65535)
555 {
556 System.err.println("PortNumber should be between 1 and 65535");
557 System.exit(-1);
558 }
559
560 String propertiesFile = args[1];
561 Properties softwareProperties = new Properties();
562 InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
563 try
564 {
565 softwareProperties.load(propertiesStream);
566 }
567 catch (IOException | NullPointerException e)
568 {
569 System.err.println("Could not find or read software properties file " + propertiesFile);
570 System.exit(-1);
571 }
572
573 String sStartPort = args[2];
574 int startPort = 0;
575 try
576 {
577 startPort = Integer.parseInt(sStartPort);
578 }
579 catch (NumberFormatException nfe)
580 {
581 System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
582 System.exit(-1);
583 }
584 if (startPort == 0 || startPort > 65535)
585 {
586 System.err.println("startPort should be between 1 and 65535");
587 System.exit(-1);
588 }
589
590 String sEndPort = args[3];
591 int endPort = 0;
592 try
593 {
594 endPort = Integer.parseInt(sEndPort);
595 }
596 catch (NumberFormatException nfe)
597 {
598 System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
599 System.exit(-1);
600 }
601 if (endPort == 0 || endPort > 65535)
602 {
603 System.err.println("endPort should be between 1 and 65535");
604 System.exit(-1);
605 }
606
607 new FederateStarter(port, softwareProperties, startPort, endPort, true);
608 }
609
610 }