1 package org.sim0mq.federatestarter;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.nio.file.Paths;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Properties;
16 import java.util.UUID;
17
18 import org.djutils.io.URLResource;
19 import org.djutils.serialization.SerializationException;
20 import org.sim0mq.Sim0MQException;
21 import org.sim0mq.message.MessageStatus;
22 import org.sim0mq.message.SimulationMessage;
23 import org.sim0mq.message.federatestarter.FederateStartedMessage;
24 import org.sim0mq.message.federationmanager.StartFederateMessage;
25 import org.zeromq.SocketType;
26 import org.zeromq.ZContext;
27 import org.zeromq.ZMQ;
28
29
30
31
32
33
34
35
36
37
38
39
40 public class FederateStarter
41 {
42
43 private final int fsPort;
44
45
46 private final int startPort;
47
48
49 private final int endPort;
50
51
52 @SuppressWarnings("checkstyle:visibilitymodifier")
53 protected Map<String, Process> runningProcessMap = Collections.synchronizedMap(new HashMap<>());
54
55
56 private Map<String, Integer> modelPortMap = Collections.synchronizedMap(new HashMap<>());
57
58
59 private Map<String, StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new HashMap<>());
60
61
62 @SuppressWarnings("checkstyle:visibilitymodifier")
63 final Properties softwareProperties;
64
65
66 private ZMQ.Socket fsSocket;
67
68
69 private ZContext fsContext;
70
71
72 private long messageCount = 0;
73
74
75
76
77
78
79
80
81
82 public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort)
83 throws Sim0MQException, SerializationException
84 {
85 super();
86 this.softwareProperties = softwareProperties;
87 this.fsPort = fsPort;
88 this.startPort = startPort;
89 this.endPort = endPort;
90
91 this.fsContext = new ZContext(1);
92
93 this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
94 this.fsSocket.bind("tcp://*:" + this.fsPort);
95
96 while (!Thread.currentThread().isInterrupted())
97 {
98
99 String identity = this.fsSocket.recvStr();
100 this.fsSocket.recvStr();
101
102 byte[] request = this.fsSocket.recv(0);
103 Object[] fields = SimulationMessage.decode(request);
104 String messageTypeId = fields[4].toString();
105 String receiverId = fields[3].toString();
106
107 System.out.println("Received " + SimulationMessage.print(fields));
108
109 if (receiverId.equals("FS"))
110 {
111 switch (messageTypeId)
112 {
113 case "FM.1":
114 processStartFederate(identity, fields);
115 break;
116
117 case "FM.8":
118 processKillFederate(identity, fields);
119 break;
120
121 case "FM.9":
122
123 break;
124
125 default:
126
127 System.err.println("Received unknown message -- not processed: " + messageTypeId);
128 }
129 }
130 else
131 {
132
133 System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
134 }
135 }
136 this.fsSocket.close();
137 this.fsContext.destroy();
138 }
139
140
141
142
143
144
145
146
147 private void processStartFederate(final String identity, final Object[] fields)
148 throws Sim0MQException, SerializationException
149 {
150 StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "FS");
151 String error = "";
152
153 int modelPort = findFreePortNumber();
154
155 if (modelPort == -1)
156 {
157 error = "No free port number";
158 }
159
160 else
161
162 {
163 try
164 {
165 ProcessBuilder pb = new ProcessBuilder();
166
167 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
168 pb.directory(workingPath.toFile());
169
170 String softwareCode = "";
171 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
172 {
173 System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
174 + " in software properties file");
175 }
176 else
177 {
178 softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
179
180 List<String> pbArgs = new ArrayList<>();
181 pbArgs.add(softwareCode);
182 pbArgs.add(startFederateMessage.getArgsBefore());
183 pbArgs.add(startFederateMessage.getModelPath());
184 pbArgs.addAll(Arrays.asList(
185 startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
186 pb.command(pbArgs);
187
188 String stdIn = startFederateMessage.getRedirectStdin();
189 String stdOut = startFederateMessage.getRedirectStdout();
190 String stdErr = startFederateMessage.getRedirectStderr();
191
192 if (stdIn.length() > 0)
193 {
194
195 File stdInFile = new File(stdIn);
196 pb.redirectInput(stdInFile);
197 }
198
199 if (stdOut.length() > 0)
200 {
201
202 File stdOutFile = new File(stdOut);
203 pb.redirectOutput(stdOutFile);
204 }
205
206 if (stdErr.length() > 0)
207 {
208
209 File stdErrFile = new File(stdErr);
210 pb.redirectError(stdErrFile);
211 }
212
213 new Thread()
214 {
215
216 @Override
217 public void run()
218 {
219 try
220 {
221 Process process = pb.start();
222 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
223 System.err.println("Process started:" + process.isAlive());
224 }
225 catch (IOException exception)
226 {
227 exception.printStackTrace();
228 }
229 }
230 }.start();
231
232 this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
233 this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
234
235
236
237
238 error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(),
239 modelPort);
240 }
241 }
242 catch (IOException exception)
243 {
244 exception.printStackTrace();
245 error = exception.getMessage();
246 }
247 }
248
249 System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
250
251
252 this.fsSocket.sendMore(identity);
253 this.fsSocket.sendMore("");
254
255 byte[] fs2Message = new FederateStartedMessage.Builder()
256 .setSimulationRunId(startFederateMessage.getSimulationRunId())
257 .setInstanceId(startFederateMessage.getInstanceId())
258 .setSenderId("FS")
259 .setReceiverId(startFederateMessage.getSenderId())
260 .setMessageId(++this.messageCount)
261 .setStatus(error.isEmpty() ? "started" : "error")
262 .setError(error)
263 .setModelPort(modelPort)
264 .build()
265 .createByteArray();
266 this.fsSocket.send(fs2Message, 0);
267
268 }
269
270
271
272
273
274 private int findFreePortNumber()
275 {
276 for (int port = this.startPort; port <= this.endPort; port++)
277 {
278 if (!this.modelPortMap.containsValue(port))
279 {
280
281 ZMQ.Socket testSocket = null;
282 try
283 {
284 testSocket = this.fsContext.createSocket(SocketType.REP);
285 testSocket.bind("tcp://127.0.0.1:" + port);
286 testSocket.unbind("tcp://127.0.0.1:" + port);
287 testSocket.close();
288 return port;
289 }
290 catch (Exception exception)
291 {
292
293 if (testSocket != null)
294 {
295 try
296 {
297 testSocket.close();
298 }
299 catch (Exception e)
300 {
301
302 }
303 }
304 }
305 }
306 }
307 return -1;
308 }
309
310
311
312
313
314
315
316
317
318
319 private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
320 throws Sim0MQException, SerializationException
321 {
322 boolean ok = true;
323 String error = "";
324 ZMQ.Socket modelSocket = null;
325 try
326 {
327 modelSocket = this.fsContext.createSocket(SocketType.REQ);
328 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
329 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
330 }
331 catch (Exception exception)
332 {
333 exception.printStackTrace();
334 ok = false;
335 error = exception.getMessage();
336 }
337
338 boolean started = false;
339 while (ok && !started)
340 {
341 byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
342 MessageStatus.NEW);
343 modelSocket.send(fs1Message, 0);
344
345 byte[] reply = modelSocket.recv(0);
346 Object[] replyMessage = SimulationMessage.decode(reply);
347 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
348
349 if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
350 && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
351 {
352 if (replyMessage[9].toString().equals("started"))
353 {
354 started = true;
355 }
356 else
357 {
358
359 try
360 {
361 Thread.sleep(100);
362 }
363 catch (InterruptedException ie)
364 {
365
366 }
367 }
368 }
369 else
370 {
371 ok = false;
372 error = replyMessage[10].toString();
373 System.err.println("Simulation start error -- status = " + replyMessage[9]);
374 System.err.println("Error message = " + replyMessage[10]);
375 }
376 }
377
378 if (modelSocket != null)
379 {
380 modelSocket.close();
381 }
382
383 return error;
384 }
385
386
387
388
389
390
391
392
393 private void processKillFederate(final String identity, final Object[] fields)
394 throws Sim0MQException, SerializationException
395 {
396 boolean status = true;
397 String error = "";
398
399 Object federationRunId = fields[1];
400 String senderId = fields[2].toString();
401
402 String modelId = fields[8].toString();
403 if (!this.modelPortMap.containsKey(modelId))
404 {
405 status = false;
406 error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
407 }
408 else
409 {
410 int modelPort = this.modelPortMap.remove(modelId);
411 Process process = this.runningProcessMap.remove(modelId);
412
413 try
414 {
415 try
416 {
417 ZMQ.Socket modelSocket = this.fsContext.createSocket(SocketType.REQ);
418 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
419 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
420
421 byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
422 ++this.messageCount, MessageStatus.NEW);
423 modelSocket.send(fs3Message, 0);
424
425 modelSocket.close();
426 }
427 catch (Exception exception)
428 {
429 exception.printStackTrace();
430 status = true;
431 error = exception.getMessage();
432 }
433
434 try
435 {
436 Thread.sleep(100);
437 }
438 catch (InterruptedException ie)
439 {
440
441 }
442
443 if (process != null && process.isAlive())
444 {
445 process.destroyForcibly();
446 }
447
448 StartFederateMessage sfm = this.startFederateMessages.get(modelId);
449 if (sfm.isDeleteStdout())
450 {
451 if (sfm.getRedirectStdout().length() > 0)
452 {
453 File stdOutFile = new File(sfm.getRedirectStdout());
454 stdOutFile.delete();
455 }
456 }
457
458 if (sfm.isDeleteStderr())
459 {
460 if (sfm.getRedirectStderr().length() > 0)
461 {
462 File stdErrFile = new File(sfm.getRedirectStderr());
463 stdErrFile.delete();
464 }
465 }
466
467 if (sfm.isDeleteWorkingDirectory())
468 {
469 File workingDir = new File(sfm.getWorkingDirectory());
470 workingDir.delete();
471 }
472 }
473 catch (Exception exception)
474 {
475 exception.printStackTrace();
476 status = false;
477 error = exception.getMessage();
478 }
479
480 byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
481 MessageStatus.NEW, modelId, status, error);
482 this.fsSocket.sendMore(identity);
483 this.fsSocket.sendMore("");
484 this.fsSocket.send(fs4Message, 0);
485 }
486 }
487
488
489
490
491
492
493
494
495 public static void main(final String[] args) throws Sim0MQException, SerializationException
496 {
497 if (args.length < 4)
498 {
499 System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
500 System.exit(-1);
501 }
502
503 String sPort = args[0];
504 int port = 0;
505 try
506 {
507 port = Integer.parseInt(sPort);
508 }
509 catch (NumberFormatException nfe)
510 {
511 System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
512 System.exit(-1);
513 }
514 if (port == 0 || port > 65535)
515 {
516 System.err.println("PortNumber should be between 1 and 65535");
517 System.exit(-1);
518 }
519
520 String propertiesFile = args[1];
521 Properties softwareProperties = new Properties();
522 InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
523 try
524 {
525 softwareProperties.load(propertiesStream);
526 }
527 catch (IOException | NullPointerException e)
528 {
529 System.err.println("Could not find or read software properties file " + propertiesFile);
530 System.exit(-1);
531 }
532
533 String sStartPort = args[2];
534 int startPort = 0;
535 try
536 {
537 startPort = Integer.parseInt(sStartPort);
538 }
539 catch (NumberFormatException nfe)
540 {
541 System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
542 System.exit(-1);
543 }
544 if (startPort == 0 || startPort > 65535)
545 {
546 System.err.println("startPort should be between 1 and 65535");
547 System.exit(-1);
548 }
549
550 String sEndPort = args[3];
551 int endPort = 0;
552 try
553 {
554 endPort = Integer.parseInt(sEndPort);
555 }
556 catch (NumberFormatException nfe)
557 {
558 System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
559 System.exit(-1);
560 }
561 if (endPort == 0 || endPort > 65535)
562 {
563 System.err.println("endPort should be between 1 and 65535");
564 System.exit(-1);
565 }
566
567 new FederateStarter(port, softwareProperties, startPort, endPort);
568 }
569
570 }