MM1FederationManager20.java
package org.sim0mq.demo.mm1;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.djutils.serialization.SerializationException;
import org.sim0mq.Sim0MQException;
import org.sim0mq.federationmanager.ModelState;
import org.sim0mq.message.Sim0MQMessage;
import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
import org.sim0mq.message.federatestarter.FS4FederateKilledMessage;
import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
import org.sim0mq.message.federationmanager.FM2SimRunControlMessage;
import org.sim0mq.message.federationmanager.FM3SetParameterMessage;
import org.sim0mq.message.federationmanager.FM4SimStartMessage;
import org.sim0mq.message.federationmanager.FM5RequestStatus;
import org.sim0mq.message.federationmanager.FM6RequestStatisticsMessage;
import org.sim0mq.message.federationmanager.FM8KillFederateMessage;
import org.sim0mq.message.modelcontroller.MC1StatusMessage;
import org.sim0mq.message.modelcontroller.MC2AckNakMessage;
import org.sim0mq.message.modelcontroller.MC3StatisticsMessage;
import org.sim0mq.message.modelcontroller.MC4StatisticsErrorMessage;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
/**
* Example implementation of a FederationManager to start the MM1Queue41Application DSOL model.
* <p>
* Copyright (c) 2016-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
* BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
* </p>
* $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
* initial version April 10, 2017 <br>
* @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
*/
public final class MM1FederationManager20
{
/**
* @param args parameters for main
* @throws Sim0MQException on error
*/
public static void main(final String[] args) throws Sim0MQException
{
if (args.length < 5)
{
System.err.println("Use as FederationManager federationName federationManagerPortNumber "
+ "federateStarterIPorName federateStarterPortNumber modelFolder");
System.exit(-1);
}
String federationName = args[0];
String fmsPort = args[1];
int fmPort = 0;
try
{
fmPort = Integer.parseInt(fmsPort);
}
catch (NumberFormatException nfe)
{
System.err.println("Use as FederationManager fedName fmPort fsIP fsPort modelFolder, where fmPort is a number");
System.exit(-1);
}
if (fmPort == 0 || fmPort > 65535)
{
System.err.println("fmPort should be between 1 and 65535");
System.exit(-1);
}
String fsServerNameOrIP = args[2];
String fsPortString = args[3];
int fsPort = 0;
try
{
fsPort = Integer.parseInt(fsPortString);
}
catch (NumberFormatException nfe)
{
System.err.println("Use as FederationManager fedName fmPort fsIP fsPort modelFolder, where fmPort is a number");
System.exit(-1);
}
if (fsPort == 0 || fsPort > 65535)
{
System.err.println("fsPort should be between 1 and 65535");
System.exit(-1);
}
String mm1ModelFolder = args[4];
new MM1FederationManager20(federationName, fmPort, fsServerNameOrIP, fsPort, mm1ModelFolder);
}
/**
* Send an FM.1 message to the FederateStarter.
* @param federationName the name of the federation
* @param fmPort the port number to listen on
* @param fsServerNameOrIP name or IP address of the federate starter we are using
* @param fsPort the port where the federate starter can be reached
* @param mm1ModelFolder location on the computer of the federate starter where the model can be found
* @throws Sim0MQException on error
*/
private MM1FederationManager20(final String federationName, final int fmPort, final String fsServerNameOrIP,
final int fsPort, final String mm1ModelFolder) throws Sim0MQException
{
AtomicLong messageCount = new AtomicLong(0L);
AtomicInteger nrRunning = new AtomicInteger();
Map<Integer, Map<String, Number>> statMap =
Collections.synchronizedMap(new LinkedHashMap<Integer, Map<String, Number>>());
for (int modelNr = 0; modelNr < 20; modelNr++)
{
new Thread()
{
@Override
public void run()
{
final int nr = nrRunning.getAndIncrement();
StateMachine stateMachine = null;
System.out.println("inc modelNr to " + nr);
try
{
stateMachine =
new StateMachine(messageCount, federationName, fsServerNameOrIP, fsPort, mm1ModelFolder, nr);
}
catch (Sim0MQException | SerializationException exception)
{
exception.printStackTrace();
}
int decNr = nrRunning.decrementAndGet();
System.out.println("dec modelNr to " + decNr);
synchronized (statMap)
{
statMap.put(nr, stateMachine.getStatistics());
}
}
}.start();
}
while (nrRunning.get() > 0)
{
Thread.yield();
}
synchronized (statMap)
{
for (int nr : statMap.keySet())
{
Map<String, Number> stats = statMap.get(nr);
StringBuilder s = new StringBuilder();
s.append(String.format("%2d ", nr));
for (String code : stats.keySet())
{
s.append(String.format("%10s=%10.4f ", code, stats.get(code).doubleValue()));
}
System.out.println(s.toString());
}
}
}
/**
* State machine to run several models in parallel.
* <p>
* Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
* <br>
* BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
* </p>
* $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
* initial version May 5, 2017 <br>
* @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
*/
static class StateMachine
{
/** the state of the started model. */
private ModelState state;
/** the model socket. */
private ZMQ.Socket modelSocket;
/** the model name. */
private String modelName;
/** the federate starter socket. */
private ZMQ.Socket fsSocket;
/** the context. */
private ZContext fmContext;
/** the message counter. */
private AtomicLong messageCount;
/** statistics. */
private Map<String, Number> statistics = new LinkedHashMap<>();
/**
* @param messageCount AtomicLong; message counter
* @param federationName the name of the federation
* @param fsServerNameOrIP name or IP address of the federate starter we are using
* @param fsPort the port where the federate starter can be reached
* @param mm1ModelFolder location on the computer of the federate starter where the model can be found
* @param modelNr sequence number of the model to run
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
StateMachine(final AtomicLong messageCount, final String federationName, final String fsServerNameOrIP,
final int fsPort, final String mm1ModelFolder, final int modelNr) throws Sim0MQException, SerializationException
{
this.fmContext = new ZContext(1);
this.fsSocket = this.fmContext.createSocket(SocketType.REQ);
this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());
this.modelName = "MM1." + modelNr;
this.messageCount = messageCount;
this.modelSocket = this.fmContext.createSocket(SocketType.REQ);
this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
this.state = ModelState.NOT_STARTED;
boolean ready = false;
while (!ready)
{
System.out.println(this.state);
switch (this.state)
{
case NOT_STARTED:
// federationName, fsServerNameOrIP, fsPort, mm1ModelFolder
startModel(federationName, fsServerNameOrIP, fsPort, mm1ModelFolder);
break;
case STARTED:
sendSimRunControl(federationName);
break;
case RUNCONTROL:
setParameters(federationName);
break;
case PARAMETERS:
sendSimStart(federationName);
break;
case SIMULATORSTARTED:
waitForSimEnded(federationName);
break;
case SIMULATORENDED:
requestStatistics(federationName);
break;
case STATISTICSGATHERED:
killFederate(federationName);
ready = true;
break;
case ERROR:
killFederate(federationName);
ready = true;
break;
default:
break;
}
}
this.fsSocket.close();
this.modelSocket.close();
this.fmContext.destroy();
this.fmContext.close();
}
/**
* Send the FM.1 message to the FederateStarter to start the MM1 model.
* @param federationName the name of the federation
* @param fsServerNameOrIP name or IP address of the federate starter we are using
* @param fsPort the port where the federate starter can be reached
* @param mm1ModelFolder location on the computer of the federate starter where the model can be found
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void startModel(final String federationName, final String fsServerNameOrIP, final int fsPort,
final String mm1ModelFolder) throws Sim0MQException, SerializationException
{
// Start model mmm1.jar
byte[] fm1Message = new FM1StartFederateMessage(federationName, "FM", "FS", this.messageCount.getAndIncrement(),
this.modelName, "java8+", "-jar", mm1ModelFolder + "/mm1.jar", this.modelName + " %PORT%", mm1ModelFolder,
"", mm1ModelFolder + "/out_" + this.modelName + ".txt", mm1ModelFolder + "/err_" + this.modelName + ".txt",
false, true, true).createByteArray();
this.fsSocket.connect("tcp://" + fsServerNameOrIP + ":" + fsPort);
this.fsSocket.send(fm1Message);
byte[] reply = this.fsSocket.recv(0);
try
{
Object[] fs2Fields = Sim0MQMessage.decode(reply).createObjectArray();
System.out.println("Received\n" + Sim0MQMessage.print(fs2Fields));
FS2FederateStartedMessage message = new FS2FederateStartedMessage(fs2Fields);
if (message.getStatus().toString().equals("started") && message.getInstanceId().equals(this.modelName))
{
this.state = ModelState.STARTED;
this.modelSocket.connect("tcp://" + fsServerNameOrIP + ":" + message.getModelPort());
}
else
{
this.state = ModelState.ERROR;
System.err.println("Model not started correctly -- state = " + message.getStatus());
System.err.println("Started model = " + message.getInstanceId() + " on port " + message.getModelPort());
System.err.println("Error message = " + message.getError());
}
}
catch (Exception exception)
{
this.state = ModelState.ERROR;
System.err.println("Model not started correctly -- error = " + exception.getClass().getSimpleName());
System.err.println("Started instance of model = " + this.modelName);
System.err.println("Error message = " + exception.getMessage());
}
}
/**
* Send the SimRunControl message FM.2.
* @param federationName the name of the federation
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void sendSimRunControl(final String federationName) throws Sim0MQException, SerializationException
{
long messageNumber = this.messageCount.get();
byte[] fm2Message = new FM2SimRunControlMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement(),
100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0, new HashMap<Object, Long>()).createByteArray();
this.modelSocket.send(fm2Message);
byte[] reply = this.modelSocket.recv(0);
try
{
Object[] mc2Fields = Sim0MQMessage.decode(reply).createObjectArray();
System.out.println("Received\n" + Sim0MQMessage.print(mc2Fields));
MC2AckNakMessage message = new MC2AckNakMessage(mc2Fields);
if (message.getStatus() && (Long) message.getReplyToId() == messageNumber)
{
this.state = ModelState.RUNCONTROL;
}
else
{
this.state = ModelState.ERROR;
System.err.println("Model not started correctly -- state = " + message.getStatus());
System.err.println("Error message = " + message.getError());
}
}
catch (Exception exception)
{
this.state = ModelState.ERROR;
System.err.println("Model not started correctly -- error = " + exception.getClass().getSimpleName());
System.err.println("Error message = " + exception.getMessage());
}
}
/**
* Send the Parameters messages FM.3.
* @param federationName the name of the federation
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void setParameters(final String federationName) throws Sim0MQException, SerializationException
{
Map<String, Object> parameters = new LinkedHashMap<>();
parameters.put("iat", new Double(1.0));
parameters.put("servicetime", new Double(0.85));
parameters.put("seed", Math.abs(this.modelName.hashCode()));
for (String parameterName : parameters.keySet())
{
if (!this.state.isError())
{
long messageNumber = this.messageCount.get();
byte[] fm3Message =
new FM3SetParameterMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement(),
parameterName, parameters.get(parameterName)).createByteArray();
this.modelSocket.send(fm3Message);
byte[] reply = this.modelSocket.recv(0);
try
{
Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
MC2AckNakMessage message = new MC2AckNakMessage(replyFields);
if (message.getStatus() && (Long) message.getReplyToId() == messageNumber)
{
this.state = ModelState.PARAMETERS;
}
else
{
this.state = ModelState.ERROR;
System.err.println("Model parameter error -- status = " + message.getStatus());
System.err.println("Error message = " + message.getError());
}
}
catch (Exception exception)
{
this.state = ModelState.ERROR;
System.err.println("Model parameter error = " + exception.getClass().getSimpleName());
System.err.println("Error message = " + exception.getMessage());
}
}
}
if (!this.state.isError())
{
this.state = ModelState.PARAMETERS;
}
}
/**
* Send the SimStart message FM.4.
* @param federationName the name of the federation
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void sendSimStart(final String federationName) throws Sim0MQException, SerializationException
{
long messageNumber = this.messageCount.get();
byte[] fm4Message =
new FM4SimStartMessage(federationName, "FM", this.modelName, this.messageCount.getAndIncrement()).createByteArray();
this.modelSocket.send(fm4Message);
byte[] reply = this.modelSocket.recv(0);
try
{
Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
MC2AckNakMessage message = new MC2AckNakMessage(replyFields);
if (message.getStatus() && (Long) message.getReplyToId() == messageNumber)
{
this.state = ModelState.SIMULATORSTARTED;
}
else
{
this.state = ModelState.ERROR;
System.err.println("Model start error -- status = " + message.getStatus());
System.err.println("Error message = " + message.getError());
}
}
catch (Exception exception)
{
this.state = ModelState.ERROR;
System.err.println("Model start error = " + exception.getClass().getSimpleName());
System.err.println("Error message = " + exception.getMessage());
}
}
/**
* Wait for simulation to end using status polling with message FM.5.
* @param federationName the name of the federation
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void waitForSimEnded(final String federationName) throws Sim0MQException, SerializationException
{
while (!this.state.isSimulatorEnded() && !this.state.isError())
{
long messageNumber = this.messageCount.get();
byte[] fm5Message =
new FM5RequestStatus(federationName, "FM", this.modelName, this.messageCount.getAndIncrement()).createByteArray();
this.modelSocket.send(fm5Message);
byte[] reply = this.modelSocket.recv(0);
try
{
Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
MC1StatusMessage message = new MC1StatusMessage(replyFields);
if (!message.getStatus().equals("error") && !message.getStatus().equals("started")
&& (Long) message.getReplyToId() == messageNumber)
{
if (message.getStatus().equals("ended"))
{
this.state = ModelState.SIMULATORENDED;
}
else
{
// wait a second
try
{
Thread.sleep(1000);
}
catch (InterruptedException ie)
{
// ignore
}
}
}
else
{
this.state = ModelState.ERROR;
System.err.println("Model poll status error -- status = " + message.getStatus());
System.err.println("Error message = " + message.getError());
}
}
catch (Exception exception)
{
this.state = ModelState.ERROR;
System.err.println("Model poll status error = " + exception.getClass().getSimpleName());
System.err.println("Error message = " + exception.getMessage());
}
}
}
/**
* Request statistics with message FM.6.
* @param federationName the name of the federation
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void requestStatistics(final String federationName) throws Sim0MQException, SerializationException
{
List<String> stats = new ArrayList<>();
stats.add("dN.average");
stats.add("qN.max");
stats.add("uN.average");
for (String statName : stats)
{
if (!this.state.isError())
{
byte[] fm6Message = new FM6RequestStatisticsMessage(federationName, "FM", this.modelName,
this.messageCount.getAndIncrement(), statName).createByteArray();
this.modelSocket.send(fm6Message);
byte[] reply = this.modelSocket.recv(0);
try
{
Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
if (replyFields[5].toString().equals("MC.3"))
{
MC3StatisticsMessage message = new MC3StatisticsMessage(replyFields);
if (message.getVariableName().equals(statName))
{
System.out.println(
"Received statistic for " + statName + " = " + message.getVariableValue().toString());
this.statistics.put(statName, (Number) message.getVariableValue());
}
else
{
this.state = ModelState.ERROR;
System.err.println("Statistics Error: Stat variable expected = " + statName + ", got: "
+ message.getVariableName());
}
}
else if (replyFields[5].toString().equals("MC.4"))
{
MC4StatisticsErrorMessage message = new MC4StatisticsErrorMessage(replyFields);
this.state = ModelState.ERROR;
System.err.println("Statistics Error: Stat variable = " + message.getVariableName());
System.err.println("Error message = " + message.getError());
}
}
catch (Exception exception)
{
this.state = ModelState.ERROR;
System.err.println("Model get statistics error = " + exception.getClass().getSimpleName());
System.err.println("Error message = " + exception.getMessage());
}
}
}
if (!this.state.isError())
{
this.state = ModelState.STATISTICSGATHERED;
}
}
/**
* Send the FM.8 message to the FederateStarter to kill the MM1 model.
* @param federationName the name of the federation
* @throws Sim0MQException on error
* @throws SerializationException on serialization problem
*/
private void killFederate(final String federationName) throws Sim0MQException, SerializationException
{
byte[] fm8Message =
new FM8KillFederateMessage(federationName, "FM", "FS", this.messageCount.getAndIncrement(), this.modelName)
.createByteArray();
this.fsSocket.send(fm8Message);
byte[] reply = this.fsSocket.recv(0);
try
{
Object[] replyFields = Sim0MQMessage.decode(reply).createObjectArray();
System.out.println("Received\n" + Sim0MQMessage.print(replyFields));
FS4FederateKilledMessage message = new FS4FederateKilledMessage(replyFields);
if (message.isStatus() && message.getInstanceId().equals(this.modelName))
{
this.state = ModelState.TERMINATED;
}
else
{
this.state = ModelState.ERROR;
System.err.println("Model not killed correctly, model = " + this.modelName);
System.err.println("Error message = " + message.getError());
}
}
catch (Exception exception)
{
this.state = ModelState.ERROR;
System.err.println("Model not killed correctly, error = " + exception.getClass().getSimpleName());
System.err.println("Error message = " + exception.getMessage());
}
}
/**
* @return statistics
*/
public final Map<String, Number> getStatistics()
{
return this.statistics;
}
}
}