1 package org.sim0mq.federatestarter;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.nio.file.Paths;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collections;
12 import java.util.LinkedHashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Properties;
16 import java.util.UUID;
17
18 import org.djutils.io.URLResource;
19 import org.djutils.serialization.SerializationException;
20 import org.sim0mq.Sim0MQException;
21 import org.sim0mq.message.MessageStatus;
22 import org.sim0mq.message.SimulationMessage;
23 import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
24 import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
25 import org.zeromq.SocketType;
26 import org.zeromq.ZContext;
27 import org.zeromq.ZMQ;
28 import org.zeromq.ZMQException;
29
30
31
32
33
34
35
36
37
38
39
40
41 public class FederateStarter
42 {
43
44 private final int fsPort;
45
46
47 private final int startPort;
48
49
50 private final int endPort;
51
52
53 @SuppressWarnings("checkstyle:visibilitymodifier")
54 protected Map<String, Process> runningProcessMap = Collections.synchronizedMap(new LinkedHashMap<>());
55
56
57 private Map<String, Integer> modelPortMap = Collections.synchronizedMap(new LinkedHashMap<>());
58
59
60 private Map<String, FM1StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new LinkedHashMap<>());
61
62
63 @SuppressWarnings("checkstyle:visibilitymodifier")
64 final Properties softwareProperties;
65
66
67 private ZMQ.Socket fsSocket;
68
69
70 private ZContext fsContext;
71
72
73 private long messageCount = 0;
74
75
76 private final boolean modelController;
77
78
79
80
81
82
83
84
85
86
87 public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort,
88 final boolean modelController) throws Sim0MQException, SerializationException
89 {
90 super();
91 this.softwareProperties = softwareProperties;
92 this.fsPort = fsPort;
93 this.startPort = startPort;
94 this.endPort = endPort;
95 this.modelController = modelController;
96
97 this.fsContext = new ZContext(1);
98
99 this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
100 this.fsSocket.bind("tcp://*:" + this.fsPort);
101
102 while (!Thread.currentThread().isInterrupted())
103 {
104
105 try
106 {
107 String identity = this.fsSocket.recvStr();
108 this.fsSocket.recvStr();
109
110 byte[] request = this.fsSocket.recv(0);
111 Object[] fields = SimulationMessage.decode(request);
112 String messageTypeId = fields[4].toString();
113 String receiverId = fields[3].toString();
114
115 System.out.println("Received " + SimulationMessage.print(fields));
116
117 if (receiverId.equals("FS"))
118 {
119 switch (messageTypeId)
120 {
121 case "FM.1":
122 processStartFederate(identity, fields);
123 break;
124
125 case "FM.8":
126 processKillFederate(identity, fields);
127 break;
128
129 case "FM.9":
130
131 break;
132
133 default:
134
135 System.err.println("Received unknown message -- not processed: " + messageTypeId);
136 }
137 }
138 else
139 {
140
141 System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
142 }
143 }
144 catch (ZMQException e)
145 {
146 System.err.println(e.getMessage());
147 }
148 }
149
150 try
151 {
152 this.fsSocket.close();
153 this.fsContext.destroy();
154 }
155 catch (Exception e)
156 {
157 System.err.println(e.getMessage());
158 }
159 }
160
161
162
163
164
165
166
167
168 private void processStartFederate(final String identity, final Object[] fields)
169 throws Sim0MQException, SerializationException
170 {
171 FM1StartFederateMessage startFederateMessage = FM1StartFederateMessage.createMessage(fields, "FS");
172 String error = "";
173
174 int modelPort = findFreePortNumber();
175
176 if (modelPort == -1)
177 {
178 error = "No free port number";
179 }
180
181 else
182
183 {
184 try
185 {
186 ProcessBuilder pb = new ProcessBuilder();
187
188 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
189 pb.directory(workingPath.toFile());
190
191 String softwareCode = "";
192 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
193 {
194 System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
195 + " in software properties file");
196 }
197 else
198 {
199 softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
200
201 List<String> pbArgs = new ArrayList<>();
202 pbArgs.add(softwareCode);
203 pbArgs.add(startFederateMessage.getArgsBefore());
204 pbArgs.add(startFederateMessage.getModelPath());
205 pbArgs.addAll(Arrays.asList(
206 startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
207 pb.command(pbArgs);
208
209 String stdIn = startFederateMessage.getRedirectStdin();
210 String stdOut = startFederateMessage.getRedirectStdout();
211 String stdErr = startFederateMessage.getRedirectStderr();
212
213 if (stdIn.length() > 0)
214 {
215
216 File stdInFile = new File(stdIn);
217 pb.redirectInput(stdInFile);
218 }
219
220 if (stdOut.length() > 0)
221 {
222
223 File stdOutFile = new File(stdOut);
224 pb.redirectOutput(stdOutFile);
225 }
226
227 if (stdErr.length() > 0)
228 {
229
230 File stdErrFile = new File(stdErr);
231 pb.redirectError(stdErrFile);
232 }
233
234 new Thread()
235 {
236
237 @Override
238 public void run()
239 {
240 try
241 {
242 Process process = pb.start();
243 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
244 System.err.println("Process started:" + process.isAlive());
245 }
246 catch (IOException exception)
247 {
248 exception.printStackTrace();
249 }
250 }
251 }.start();
252
253 this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
254 this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
255
256
257
258
259 System.out.println("modelController : " + this.modelController);
260 if (this.modelController)
261 {
262 error = waitForModelStarted(startFederateMessage.getSimulationRunId(),
263 startFederateMessage.getInstanceId(), modelPort);
264 }
265 }
266 }
267 catch (IOException exception)
268 {
269 exception.printStackTrace();
270 error = exception.getMessage();
271 }
272 }
273
274 System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
275
276
277 this.fsSocket.sendMore(identity);
278 this.fsSocket.sendMore("");
279
280 byte[] fs2Message = new FS2FederateStartedMessage.Builder()
281 .setSimulationRunId(startFederateMessage.getSimulationRunId())
282 .setInstanceId(startFederateMessage.getInstanceId())
283 .setSenderId("FS")
284 .setReceiverId(startFederateMessage.getSenderId())
285 .setMessageId(++this.messageCount)
286 .setStatus(error.isEmpty() ? "started" : "error")
287 .setError(error)
288 .setModelPort(modelPort)
289 .build()
290 .createByteArray();
291 this.fsSocket.send(fs2Message, 0);
292
293 }
294
295
296
297
298
299 private int findFreePortNumber()
300 {
301 for (int port = this.startPort; port <= this.endPort; port++)
302 {
303 if (!this.modelPortMap.containsValue(port))
304 {
305
306 ZMQ.Socket testSocket = null;
307 try
308 {
309 testSocket = this.fsContext.createSocket(SocketType.REP);
310 testSocket.bind("tcp://127.0.0.1:" + port);
311 testSocket.unbind("tcp://127.0.0.1:" + port);
312 testSocket.close();
313 return port;
314 }
315 catch (Exception exception)
316 {
317
318 if (testSocket != null)
319 {
320 try
321 {
322 testSocket.close();
323 }
324 catch (Exception e)
325 {
326
327 }
328 }
329 }
330 }
331 }
332 return -1;
333 }
334
335
336
337
338
339
340
341
342
343
344 private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
345 throws Sim0MQException, SerializationException
346 {
347 boolean ok = true;
348 String error = "";
349 ZMQ.Socket modelSocket = null;
350 try
351 {
352 modelSocket = this.fsContext.createSocket(SocketType.REQ);
353 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
354 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
355 }
356 catch (Exception exception)
357 {
358 exception.printStackTrace();
359 ok = false;
360 error = exception.getMessage();
361 }
362
363 boolean started = false;
364 while (ok && !started)
365 {
366 byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
367 MessageStatus.NEW);
368 modelSocket.send(fs1Message, 0);
369
370 byte[] reply = modelSocket.recv(0);
371 Object[] replyMessage = SimulationMessage.decode(reply);
372 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
373
374 if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
375 && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
376 {
377 if (replyMessage[9].toString().equals("started"))
378 {
379 started = true;
380 }
381 else
382 {
383
384 try
385 {
386 Thread.sleep(100);
387 }
388 catch (InterruptedException ie)
389 {
390
391 }
392 }
393 }
394 else
395 {
396 ok = false;
397 error = replyMessage[10].toString();
398 System.err.println("Simulation start error -- status = " + replyMessage[9]);
399 System.err.println("Error message = " + replyMessage[10]);
400 }
401 }
402
403 if (modelSocket != null)
404 {
405 modelSocket.close();
406 }
407
408 return error;
409 }
410
411
412
413
414
415
416
417
418 private void processKillFederate(final String identity, final Object[] fields)
419 throws Sim0MQException, SerializationException
420 {
421 boolean status = true;
422 String error = "";
423
424 Object federationRunId = fields[1];
425 String senderId = fields[2].toString();
426
427 String modelId = fields[8].toString();
428 if (!this.modelPortMap.containsKey(modelId))
429 {
430 status = false;
431 error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
432 }
433 else
434 {
435 int modelPort = this.modelPortMap.remove(modelId);
436 Process process = this.runningProcessMap.remove(modelId);
437
438 try
439 {
440 try
441 {
442 ZMQ.Socket modelSocket = this.fsContext.createSocket(SocketType.REQ);
443 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
444 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
445
446 byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
447 ++this.messageCount, MessageStatus.NEW);
448 modelSocket.send(fs3Message, 0);
449
450 modelSocket.close();
451 }
452 catch (Exception exception)
453 {
454 exception.printStackTrace();
455 status = true;
456 error = exception.getMessage();
457 }
458
459 try
460 {
461 Thread.sleep(100);
462 }
463 catch (InterruptedException ie)
464 {
465
466 }
467
468 if (process != null && process.isAlive())
469 {
470 process.destroyForcibly();
471 }
472
473 FM1StartFederateMessage sfm = this.startFederateMessages.get(modelId);
474 if (sfm.isDeleteStdout())
475 {
476 if (sfm.getRedirectStdout().length() > 0)
477 {
478 File stdOutFile = new File(sfm.getRedirectStdout());
479 stdOutFile.delete();
480 }
481 }
482
483 if (sfm.isDeleteStderr())
484 {
485 if (sfm.getRedirectStderr().length() > 0)
486 {
487 File stdErrFile = new File(sfm.getRedirectStderr());
488 stdErrFile.delete();
489 }
490 }
491
492 if (sfm.isDeleteWorkingDirectory())
493 {
494 File workingDir = new File(sfm.getWorkingDirectory());
495 workingDir.delete();
496 }
497 }
498 catch (Exception exception)
499 {
500 exception.printStackTrace();
501 status = false;
502 error = exception.getMessage();
503 }
504
505 byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
506 MessageStatus.NEW, modelId, status, error);
507 this.fsSocket.sendMore(identity);
508 this.fsSocket.sendMore("");
509 this.fsSocket.send(fs4Message, 0);
510 }
511 }
512
513
514
515
516 public boolean isModelController()
517 {
518 return this.modelController;
519 }
520
521
522
523
524
525
526
527
528 public static void main(final String[] args) throws Sim0MQException, SerializationException
529 {
530 if (args.length < 4)
531 {
532 System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
533 System.exit(-1);
534 }
535
536 String sPort = args[0];
537 int port = 0;
538 try
539 {
540 port = Integer.parseInt(sPort);
541 }
542 catch (NumberFormatException nfe)
543 {
544 System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
545 System.exit(-1);
546 }
547 if (port == 0 || port > 65535)
548 {
549 System.err.println("PortNumber should be between 1 and 65535");
550 System.exit(-1);
551 }
552
553 String propertiesFile = args[1];
554 Properties softwareProperties = new Properties();
555 InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
556 try
557 {
558 softwareProperties.load(propertiesStream);
559 }
560 catch (IOException | NullPointerException e)
561 {
562 System.err.println("Could not find or read software properties file " + propertiesFile);
563 System.exit(-1);
564 }
565
566 String sStartPort = args[2];
567 int startPort = 0;
568 try
569 {
570 startPort = Integer.parseInt(sStartPort);
571 }
572 catch (NumberFormatException nfe)
573 {
574 System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
575 System.exit(-1);
576 }
577 if (startPort == 0 || startPort > 65535)
578 {
579 System.err.println("startPort should be between 1 and 65535");
580 System.exit(-1);
581 }
582
583 String sEndPort = args[3];
584 int endPort = 0;
585 try
586 {
587 endPort = Integer.parseInt(sEndPort);
588 }
589 catch (NumberFormatException nfe)
590 {
591 System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
592 System.exit(-1);
593 }
594 if (endPort == 0 || endPort > 65535)
595 {
596 System.err.println("endPort should be between 1 and 65535");
597 System.exit(-1);
598 }
599
600 new FederateStarter(port, softwareProperties, startPort, endPort, true);
601 }
602
603 }