1 package org.sim0mq.federatestarter;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.nio.file.Files;
7 import java.nio.file.Path;
8 import java.nio.file.Paths;
9 import java.util.ArrayList;
10 import java.util.Arrays;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Properties;
16 import java.util.UUID;
17
18 import org.sim0mq.Sim0MQException;
19 import org.sim0mq.message.MessageStatus;
20 import org.sim0mq.message.SimulationMessage;
21 import org.sim0mq.message.federatestarter.FederateStartedMessage;
22 import org.sim0mq.message.federationmanager.StartFederateMessage;
23 import org.zeromq.ZContext;
24 import org.zeromq.ZMQ;
25
26 import nl.tudelft.simulation.language.io.URLResource;
27
28
29
30
31
32
33
34
35
36
37
38
39 public class FederateStarter
40 {
41
42 private final int fsPort;
43
44
45 private final int startPort;
46
47
48 private final int endPort;
49
50
51 @SuppressWarnings("checkstyle:visibilitymodifier")
52 protected Map<String, Process> runningProcessMap = Collections.synchronizedMap(new HashMap<>());
53
54
55 private Map<String, Integer> modelPortMap = Collections.synchronizedMap(new HashMap<>());
56
57
58 private Map<String, StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new HashMap<>());
59
60
61 @SuppressWarnings("checkstyle:visibilitymodifier")
62 final Properties softwareProperties;
63
64
65 private ZMQ.Socket fsSocket;
66
67
68 private ZContext fsContext;
69
70
71 private long messageCount = 0;
72
73
74
75
76
77
78
79
80 public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort)
81 throws Sim0MQException
82 {
83 super();
84 this.softwareProperties = softwareProperties;
85 this.fsPort = fsPort;
86 this.startPort = startPort;
87 this.endPort = endPort;
88
89 this.fsContext = new ZContext(1);
90
91 this.fsSocket = this.fsContext.createSocket(ZMQ.ROUTER);
92 this.fsSocket.bind("tcp://*:" + this.fsPort);
93
94 while (!Thread.currentThread().isInterrupted())
95 {
96
97 String identity = this.fsSocket.recvStr();
98 this.fsSocket.recvStr();
99
100 byte[] request = this.fsSocket.recv(0);
101 Object[] fields = SimulationMessage.decode(request);
102 String messageTypeId = fields[4].toString();
103 String receiverId = fields[3].toString();
104
105 System.out.println("Received " + SimulationMessage.print(fields));
106
107 if (receiverId.equals("FS"))
108 {
109 switch (messageTypeId)
110 {
111 case "FM.1":
112 processStartFederate(identity, fields);
113 break;
114
115 case "FM.8":
116 processKillFederate(identity, fields);
117 break;
118
119 case "FM.9":
120
121 break;
122
123 default:
124
125 System.err.println("Received unknown message -- not processed: " + messageTypeId);
126 }
127 }
128 else
129 {
130
131 System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
132 }
133 }
134 this.fsSocket.close();
135 this.fsContext.destroy();
136 }
137
138
139
140
141
142
143
144 private void processStartFederate(final String identity, final Object[] fields) throws Sim0MQException
145 {
146 StartFederateMessage startFederateMessage = StartFederateMessage.createMessage(fields, "FS");
147 String error = "";
148
149 int modelPort = findFreePortNumber();
150
151 if (modelPort == -1)
152 {
153 error = "No free port number";
154 }
155
156 else
157
158 {
159 try
160 {
161 ProcessBuilder pb = new ProcessBuilder();
162
163 Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
164 pb.directory(workingPath.toFile());
165
166 String softwareCode = "";
167 if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
168 {
169 System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
170 + " in software properties file");
171 }
172 else
173 {
174 softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());
175
176 List<String> pbArgs = new ArrayList<>();
177 pbArgs.add(softwareCode);
178 pbArgs.add(startFederateMessage.getArgsBefore());
179 pbArgs.add(startFederateMessage.getModelPath());
180 pbArgs.addAll(Arrays.asList(
181 startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
182 pb.command(pbArgs);
183
184 String stdIn = startFederateMessage.getRedirectStdin();
185 String stdOut = startFederateMessage.getRedirectStdout();
186 String stdErr = startFederateMessage.getRedirectStderr();
187
188 if (stdIn.length() > 0)
189 {
190
191 File stdInFile = new File(stdIn);
192 pb.redirectInput(stdInFile);
193 }
194
195 if (stdOut.length() > 0)
196 {
197
198 File stdOutFile = new File(stdOut);
199 pb.redirectOutput(stdOutFile);
200 }
201
202 if (stdErr.length() > 0)
203 {
204
205 File stdErrFile = new File(stdErr);
206 pb.redirectError(stdErrFile);
207 }
208
209 new Thread()
210 {
211
212 @Override
213 public void run()
214 {
215 try
216 {
217 Process process = pb.start();
218 FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
219 System.err.println("Process started:" + process.isAlive());
220 }
221 catch (IOException exception)
222 {
223 exception.printStackTrace();
224 }
225 }
226 }.start();
227
228 this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
229 this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);
230
231
232
233
234 error = waitForModelStarted(startFederateMessage.getSimulationRunId(), startFederateMessage.getInstanceId(),
235 modelPort);
236 }
237 }
238 catch (IOException exception)
239 {
240 exception.printStackTrace();
241 error = exception.getMessage();
242 }
243 }
244
245 System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);
246
247
248 this.fsSocket.sendMore(identity);
249 this.fsSocket.sendMore("");
250
251 byte[] fs2Message = new FederateStartedMessage.Builder()
252 .setSimulationRunId(startFederateMessage.getSimulationRunId())
253 .setInstanceId(startFederateMessage.getInstanceId())
254 .setSenderId("FS")
255 .setReceiverId(startFederateMessage.getSenderId())
256 .setMessageId(++this.messageCount)
257 .setStatus(error.isEmpty() ? "started" : "error")
258 .setError(error)
259 .setModelPort(modelPort)
260 .build()
261 .createByteArray();
262 this.fsSocket.send(fs2Message, 0);
263
264 }
265
266
267
268
269
270 private int findFreePortNumber()
271 {
272 for (int port = this.startPort; port <= this.endPort; port++)
273 {
274 if (!this.modelPortMap.containsValue(port))
275 {
276
277 ZMQ.Socket testSocket = null;
278 try
279 {
280 testSocket = this.fsContext.createSocket(ZMQ.REP);
281 testSocket.bind("tcp://127.0.0.1:" + port);
282 testSocket.unbind("tcp://127.0.0.1:" + port);
283 testSocket.close();
284 return port;
285 }
286 catch (Exception exception)
287 {
288
289 if (testSocket != null)
290 {
291 try
292 {
293 testSocket.close();
294 }
295 catch (Exception e)
296 {
297
298 }
299 }
300 }
301 }
302 }
303 return -1;
304 }
305
306
307
308
309
310
311
312
313
314 private String waitForModelStarted(final Object federationRunId, final String modelId, final int modelPort)
315 throws Sim0MQException
316 {
317 boolean ok = true;
318 String error = "";
319 ZMQ.Socket modelSocket = null;
320 try
321 {
322 modelSocket = this.fsContext.createSocket(ZMQ.REQ);
323 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
324 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
325 }
326 catch (Exception exception)
327 {
328 exception.printStackTrace();
329 ok = false;
330 error = exception.getMessage();
331 }
332
333 boolean started = false;
334 while (ok && !started)
335 {
336 byte[] fs1Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.1", ++this.messageCount,
337 MessageStatus.NEW);
338 modelSocket.send(fs1Message, 0);
339
340 byte[] reply = modelSocket.recv(0);
341 Object[] replyMessage = SimulationMessage.decode(reply);
342 System.out.println("Received\n" + SimulationMessage.print(replyMessage));
343
344 if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
345 && !replyMessage[9].toString().equals("ended") && ((Long) replyMessage[8]).longValue() == this.messageCount)
346 {
347 if (replyMessage[9].toString().equals("started"))
348 {
349 started = true;
350 }
351 else
352 {
353
354 try
355 {
356 Thread.sleep(100);
357 }
358 catch (InterruptedException ie)
359 {
360
361 }
362 }
363 }
364 else
365 {
366 ok = false;
367 error = replyMessage[10].toString();
368 System.err.println("Simulation start error -- status = " + replyMessage[9]);
369 System.err.println("Error message = " + replyMessage[10]);
370 }
371 }
372
373 if (modelSocket != null)
374 {
375 modelSocket.close();
376 }
377
378 return error;
379 }
380
381
382
383
384
385
386
387 private void processKillFederate(final String identity, final Object[] fields) throws Sim0MQException
388 {
389 boolean status = true;
390 String error = "";
391
392 Object federationRunId = fields[1];
393 String senderId = fields[2].toString();
394
395 String modelId = fields[8].toString();
396 if (!this.modelPortMap.containsKey(modelId))
397 {
398 status = false;
399 error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
400 }
401 else
402 {
403 int modelPort = this.modelPortMap.remove(modelId);
404 Process process = this.runningProcessMap.remove(modelId);
405
406 try
407 {
408 try
409 {
410 ZMQ.Socket modelSocket = this.fsContext.createSocket(ZMQ.REQ);
411 modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
412 modelSocket.connect("tcp://127.0.0.1:" + modelPort);
413
414 byte[] fs3Message = SimulationMessage.encodeUTF8(federationRunId, "FS", modelId, "FS.3",
415 ++this.messageCount, MessageStatus.NEW);
416 modelSocket.send(fs3Message, 0);
417
418 modelSocket.close();
419 }
420 catch (Exception exception)
421 {
422 exception.printStackTrace();
423 status = true;
424 error = exception.getMessage();
425 }
426
427 try
428 {
429 Thread.sleep(100);
430 }
431 catch (InterruptedException ie)
432 {
433
434 }
435
436 if (process != null && process.isAlive())
437 {
438 process.destroyForcibly();
439 }
440
441 StartFederateMessage sfm = this.startFederateMessages.get(modelId);
442 if (sfm.isDeleteStdout())
443 {
444 if (sfm.getRedirectStdout().length() > 0)
445 {
446 File stdOutFile = new File(sfm.getRedirectStdout());
447 stdOutFile.delete();
448 }
449 }
450
451 if (sfm.isDeleteStderr())
452 {
453 if (sfm.getRedirectStderr().length() > 0)
454 {
455 File stdErrFile = new File(sfm.getRedirectStderr());
456 stdErrFile.delete();
457 }
458 }
459
460 if (sfm.isDeleteWorkingDirectory())
461 {
462 File workingDir = new File(sfm.getWorkingDirectory());
463 workingDir.delete();
464 }
465 }
466 catch (Exception exception)
467 {
468 exception.printStackTrace();
469 status = false;
470 error = exception.getMessage();
471 }
472
473 byte[] fs4Message = SimulationMessage.encodeUTF8(federationRunId, "FS", senderId, "FS.4", ++this.messageCount,
474 MessageStatus.NEW, modelId, status, error);
475 this.fsSocket.sendMore(identity);
476 this.fsSocket.sendMore("");
477 this.fsSocket.send(fs4Message, 0);
478 }
479 }
480
481
482
483
484
485
486
487 public static void main(final String[] args) throws Sim0MQException
488 {
489 if (args.length < 4)
490 {
491 System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
492 System.exit(-1);
493 }
494
495 String sPort = args[0];
496 int port = 0;
497 try
498 {
499 port = Integer.parseInt(sPort);
500 }
501 catch (NumberFormatException nfe)
502 {
503 System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
504 System.exit(-1);
505 }
506 if (port == 0 || port > 65535)
507 {
508 System.err.println("PortNumber should be between 1 and 65535");
509 System.exit(-1);
510 }
511
512 String propertiesFile = args[1];
513 Properties softwareProperties = new Properties();
514 InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
515 try
516 {
517 softwareProperties.load(propertiesStream);
518 }
519 catch (IOException e)
520 {
521 System.err.println("Could not find software properties file " + propertiesFile);
522 System.exit(-1);
523 }
524
525 String sStartPort = args[2];
526 int startPort = 0;
527 try
528 {
529 startPort = Integer.parseInt(sStartPort);
530 }
531 catch (NumberFormatException nfe)
532 {
533 System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
534 System.exit(-1);
535 }
536 if (startPort == 0 || startPort > 65535)
537 {
538 System.err.println("startPort should be between 1 and 65535");
539 System.exit(-1);
540 }
541
542 String sEndPort = args[3];
543 int endPort = 0;
544 try
545 {
546 endPort = Integer.parseInt(sEndPort);
547 }
548 catch (NumberFormatException nfe)
549 {
550 System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
551 System.exit(-1);
552 }
553 if (endPort == 0 || endPort > 65535)
554 {
555 System.err.println("endPort should be between 1 and 65535");
556 System.exit(-1);
557 }
558
559 new FederateStarter(port, softwareProperties, startPort, endPort);
560 }
561
562 }