MM1FederationManager20.java

package org.sim0mq.demo.mm1;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.djutils.serialization.SerializationException;
import org.sim0mq.Sim0MQException;
import org.sim0mq.federationmanager.ModelState;
import org.sim0mq.message.MessageStatus;
import org.sim0mq.message.SimulationMessage;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

/**
 * Example implementation of a FederationManager to start the MM1Queue41Application DSOL model.
 * <p>
 * Copyright (c) 2016-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
 * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
 * </p>
 * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
 * initial version April 10, 2017 <br>
 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
 */
public final class MM1FederationManager20
{
    /**
     * @param args parameters for main
     * @throws Sim0MQException on error
     */
    public static void main(final String[] args) throws Sim0MQException
    {
        if (args.length < 4)
        {
            System.err.println(
                    "Use as FederationManager federationName federationManagerPortNumber federateStarterPortNumber local/sk-3");
            System.exit(-1);
        }
        String federationName = args[0];

        String fmsPort = args[1];
        int fmPort = 0;
        try
        {
            fmPort = Integer.parseInt(fmsPort);
        }
        catch (NumberFormatException nfe)
        {
            System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk-3, where fmPort is a number");
            System.exit(-1);
        }
        if (fmPort == 0 || fmPort > 65535)
        {
            System.err.println("fmPort should be between 1 and 65535");
            System.exit(-1);
        }

        String fssPort = args[2];
        int fsPort = 0;
        try
        {
            fsPort = Integer.parseInt(fssPort);
        }
        catch (NumberFormatException nfe)
        {
            System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where fsPort is a number");
            System.exit(-1);
        }
        if (fsPort == 0 || fsPort > 65535)
        {
            System.err.println("fsPort should be between 1 and 65535");
            System.exit(-1);
        }

        String localSk3 = args[3];
        if (!localSk3.equals("local") && !localSk3.equals("sk-3"))
        {
            System.err.println("Use as FederationManager fedNname fmPort fsPort local/sk3, where last arg is local/sk-3");
            System.exit(-1);
        }

        new MM1FederationManager20(federationName, fmPort, fsPort, localSk3);
    }

    /**
     * Send an FM.1 message to the FederateStarter.
     * @param federationName the name of the federation
     * @param fmPort the port number to listen on
     * @param fsPort the port where the federate starter can be reached
     * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
     * @throws Sim0MQException on error
     */
    private MM1FederationManager20(final String federationName, final int fmPort, final int fsPort, final String localSk3)
            throws Sim0MQException
    {
        AtomicLong messageCount = new AtomicLong(0L);
        AtomicInteger nrRunning = new AtomicInteger();

        Map<Integer, Map<String, Number>> statMap = Collections.synchronizedMap(new HashMap<Integer, Map<String, Number>>());

        for (int modelNr = 0; modelNr < 20; modelNr++)
        {
            new Thread()
            {
                @Override
                public void run()
                {
                    final int nr = nrRunning.getAndIncrement();
                    StateMachine stateMachine = null;
                    System.out.println("inc modelNr to " + nr);
                    try
                    {
                        stateMachine = new StateMachine(messageCount, federationName, fsPort, localSk3, nr);
                    }
                    catch (Sim0MQException | SerializationException exception)
                    {
                        exception.printStackTrace();
                    }
                    int decNr = nrRunning.decrementAndGet();
                    System.out.println("dec modelNr to " + decNr);
                    synchronized (statMap)
                    {
                        statMap.put(nr, stateMachine.getStatistics());
                    }
                }
            }.start();
        }

        while (nrRunning.get() > 0)
        {
            Thread.yield();
        }

        synchronized (statMap)
        {
            for (int nr : statMap.keySet())
            {
                Map<String, Number> stats = statMap.get(nr);
                StringBuilder s = new StringBuilder();
                s.append(String.format("%2d  ", nr));
                for (String code : stats.keySet())
                {
                    s.append(String.format("%10s=%10.4f   ", code, stats.get(code).doubleValue()));
                }
                System.out.println(s.toString());
            }
        }
    }

    /**
     * State machine to run several models in parallel.
     * <p>
     * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved.
     * <br>
     * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
     * </p>
     * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
     * initial version May 5, 2017 <br>
     * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
     */
    static class StateMachine
    {
        /** the state of the started model. */
        private ModelState state;

        /** the model socket. */
        private ZMQ.Socket modelSocket;

        /** the model name. */
        private String modelName;

        /** the federate starter socket. */
        private ZMQ.Socket fsSocket;

        /** the context. */
        private ZContext fmContext;

        /** the message counter. */
        private AtomicLong messageCount;

        /** statistics. */
        private Map<String, Number> statistics = new HashMap<>();

        /**
         * @param messageCount AtomicLong; message counter
         * @param federationName the name of the federation
         * @param fsPort FederateStarter port number
         * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
         * @param modelNr sequence number of the model to run
         * @throws Sim0MQException on error
         * @throws SerializationException on serialization problem
         */
        StateMachine(final AtomicLong messageCount, final String federationName, final int fsPort, final String localSk3,
                final int modelNr) throws Sim0MQException, SerializationException
        {
            this.fmContext = new ZContext(1);

            this.fsSocket = this.fmContext.createSocket(SocketType.REQ);
            this.fsSocket.setIdentity(UUID.randomUUID().toString().getBytes());

            this.modelName = "MM1." + modelNr;
            this.messageCount = messageCount;

            this.modelSocket = this.fmContext.createSocket(SocketType.REQ);
            this.modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());

            this.state = ModelState.NOT_STARTED;
            boolean ready = false;
            while (!ready)
            {
                System.out.println(this.state);

                switch (this.state)
                {
                    case NOT_STARTED:
                        startModel(federationName, fsPort, localSk3);
                        break;

                    case STARTED:
                        sendSimRunControl(federationName);
                        break;

                    case RUNCONTROL:
                        setParameters(federationName);
                        break;

                    case PARAMETERS:
                        sendSimStart(federationName);
                        break;

                    case SIMULATORSTARTED:
                        waitForSimEnded(federationName);
                        break;

                    case SIMULATORENDED:
                        requestStatistics(federationName);
                        break;

                    case STATISTICSGATHERED:
                        killFederate(federationName);
                        ready = true;
                        break;

                    case ERROR:
                        killFederate(federationName);
                        ready = true;
                        break;

                    default:
                        break;
                }
            }

            this.fsSocket.close();
            this.modelSocket.close();
            this.fmContext.destroy();
            this.fmContext.close();
        }

        /**
         * Sed the FM.1 message to the FederateStarter to start the MM1 model.
         * @param federationName the name of the federation
         * @param fsPort the port where the federate starter can be reached
         * @param localSk3 local/sk-3 to indicate where the federate starter and model can be found
         * @throws Sim0MQException on error
         * @throws SerializationException on serialization problem
         */
        private void startModel(final String federationName, final int fsPort, final String localSk3)
                throws Sim0MQException, SerializationException
        {
            // Start model mmm1.jar
            byte[] fm1Message;
            if (localSk3.equals("sk-3"))
            {
                fm1Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1",
                        this.messageCount.getAndIncrement(), MessageStatus.NEW, this.modelName, "java8+", "-jar",
                        "/home/alexandv/sim0mq/MM1/mm1.jar", this.modelName + " %PORT%", "/home/alexandv/sim0mq/MM1", "",
                        "/home/alexandv/sim0mq/MM1/out_" + this.modelName + ".txt",
                        "/home/alexandv/sim0mq/MM1/err_" + this.modelName + ".txt", false, true, true);
                this.fsSocket.connect("tcp://130.161.3.179:" + fsPort);
            }
            else
            {
                fm1Message =
                        SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.1", this.messageCount.getAndIncrement(),
                                MessageStatus.NEW, this.modelName, "java8+", "-jar", "e:/sim0mq/MM1/mm1.jar",
                                this.modelName + " %PORT%", "e:/sim0mq/MM1", "", "e:/sim0mq/MM1/out_" + this.modelName + ".txt",
                                "e:/sim0mq/MM1/err_" + this.modelName + ".txt", false, true, true);
                this.fsSocket.connect("tcp://127.0.0.1:" + fsPort);
            }
            this.fsSocket.send(fm1Message);

            byte[] reply = this.fsSocket.recv(0);
            Object[] replyMessage = SimulationMessage.decode(reply);
            System.out.println("Received\n" + SimulationMessage.print(replyMessage));

            if (replyMessage[4].toString().equals("FS.2") && replyMessage[9].toString().equals("started")
                    && replyMessage[8].toString().equals(this.modelName))
            {
                this.state = ModelState.STARTED;
                this.modelSocket.connect("tcp://127.0.0.1:" + replyMessage[10].toString());
            }
            else
            {
                this.state = ModelState.ERROR;
                System.err.println("Model not started correctly -- state = " + replyMessage[9]);
                System.err.println("Started model = " + replyMessage[8]);
                System.err.println("Error message = " + replyMessage[10]);
            }

        }

        /**
         * Send the SimRunControl message FM.2.
         * @param federationName the name of the federation
         * @throws Sim0MQException on error
         * @throws SerializationException on serialization problem
         */
        private void sendSimRunControl(final String federationName) throws Sim0MQException, SerializationException
        {
            long messageNumber = this.messageCount.get();
            byte[] fm2Message;
            fm2Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.2",
                    this.messageCount.getAndIncrement(), MessageStatus.NEW, 100.0, 0.0, 0.0, Double.POSITIVE_INFINITY, 1, 0);
            this.modelSocket.send(fm2Message);

            byte[] reply = this.modelSocket.recv(0);
            Object[] replyMessage = SimulationMessage.decode(reply);
            System.out.println("Received\n" + SimulationMessage.print(replyMessage));

            if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
                    && ((Long) replyMessage[8]).longValue() == messageNumber)
            {
                this.state = ModelState.RUNCONTROL;
            }
            else
            {
                this.state = ModelState.ERROR;
                System.err.println("Model not started correctly -- status = " + replyMessage[9]);
                System.err.println("Error message = " + replyMessage[10]);
            }
        }

        /**
         * Send the Parameters messages FM.3.
         * @param federationName the name of the federation
         * @throws Sim0MQException on error
         * @throws SerializationException on serialization problem
         */
        private void setParameters(final String federationName) throws Sim0MQException, SerializationException
        {
            Map<String, Object> parameters = new HashMap<>();
            parameters.put("iat", new Double(1.0));
            parameters.put("servicetime", new Double(0.85));
            parameters.put("seed", Math.abs(this.modelName.hashCode()));

            for (String parameterName : parameters.keySet())
            {
                if (!this.state.isError())
                {
                    long messageNumber = this.messageCount.get();
                    byte[] fm3Message;
                    fm3Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.3",
                            this.messageCount.getAndIncrement(), MessageStatus.NEW, parameterName,
                            parameters.get(parameterName));
                    this.modelSocket.send(fm3Message);

                    byte[] reply = this.modelSocket.recv(0);
                    Object[] replyMessage = SimulationMessage.decode(reply);
                    System.out.println("Received\n" + SimulationMessage.print(replyMessage));

                    if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
                            && ((Long) replyMessage[8]).longValue() == messageNumber)
                    {
                        this.state = ModelState.PARAMETERS;
                    }
                    else
                    {
                        this.state = ModelState.ERROR;
                        System.err.println("Model parameter error -- status = " + replyMessage[9]);
                        System.err.println("Error message = " + replyMessage[10]);
                    }
                }
            }
            if (!this.state.isError())
            {
                this.state = ModelState.PARAMETERS;
            }
        }

        /**
         * Send the SimStart message FM.4.
         * @param federationName the name of the federation
         * @throws Sim0MQException on error
         * @throws SerializationException on serialization problem
         */
        private void sendSimStart(final String federationName) throws Sim0MQException, SerializationException
        {
            long messageNumber = this.messageCount.get();
            byte[] fm4Message;
            fm4Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.4",
                    this.messageCount.getAndIncrement(), MessageStatus.NEW);
            this.modelSocket.send(fm4Message);

            byte[] reply = this.modelSocket.recv(0);
            Object[] replyMessage = SimulationMessage.decode(reply);
            System.out.println("Received\n" + SimulationMessage.print(replyMessage));

            if (replyMessage[4].toString().equals("MC.2") && (boolean) replyMessage[9]
                    && ((Long) replyMessage[8]).longValue() == messageNumber)
            {
                this.state = ModelState.SIMULATORSTARTED;
            }
            else
            {
                this.state = ModelState.ERROR;
                System.err.println("Simulation start error -- status = " + replyMessage[9]);
                System.err.println("Error message = " + replyMessage[10]);
            }
        }

        /**
         * Wait for simulation to end using status polling with message FM.5.
         * @param federationName the name of the federation
         * @throws Sim0MQException on error
         * @throws SerializationException on serialization problem
         */
        private void waitForSimEnded(final String federationName) throws Sim0MQException, SerializationException
        {
            while (!this.state.isSimulatorEnded() && !this.state.isError())
            {
                long messageNumber = this.messageCount.get();
                byte[] fm5Message;
                fm5Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.5",
                        this.messageCount.getAndIncrement(), MessageStatus.NEW);
                this.modelSocket.send(fm5Message);

                byte[] reply = this.modelSocket.recv(0);
                Object[] replyMessage = SimulationMessage.decode(reply);
                System.out.println("Received\n" + SimulationMessage.print(replyMessage));

                if (replyMessage[4].toString().equals("MC.1") && !replyMessage[9].toString().equals("error")
                        && !replyMessage[9].toString().equals("started")
                        && ((Long) replyMessage[8]).longValue() == messageNumber)
                {
                    if (replyMessage[9].toString().equals("ended"))
                    {
                        this.state = ModelState.SIMULATORENDED;
                    }
                    else
                    {
                        // wait a second
                        try
                        {
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException ie)
                        {
                            // ignore
                        }
                    }
                }
                else
                {
                    this.state = ModelState.ERROR;
                    System.err.println("Simulation start error -- status = " + replyMessage[9]);
                    System.err.println("Error message = " + replyMessage[10]);
                }
            }
        }

        /**
         * Request statistics with message FM.6.
         * @param federationName the name of the federation
         * @throws Sim0MQException on error
         * @throws SerializationException on serialization problem
         */
        private void requestStatistics(final String federationName) throws Sim0MQException, SerializationException
        {
            List<String> stats = new ArrayList<>();
            stats.add("dN.average");
            stats.add("qN.max");
            stats.add("uN.average");

            for (String statName : stats)
            {
                if (!this.state.isError())
                {
                    byte[] fm6Message;
                    fm6Message = SimulationMessage.encodeUTF8(federationName, "FM", this.modelName, "FM.6",
                            this.messageCount.getAndIncrement(), MessageStatus.NEW, statName);
                    this.modelSocket.send(fm6Message);

                    byte[] reply = this.modelSocket.recv(0);
                    Object[] replyMessage = SimulationMessage.decode(reply);
                    System.out.println("Received\n" + SimulationMessage.print(replyMessage));

                    if (replyMessage[4].toString().equals("MC.3"))
                    {
                        if (replyMessage[9].toString().equals(statName))
                        {
                            System.out.println("Received statistic for " + statName + " = " + replyMessage[10].toString());
                            this.statistics.put(statName, (Number) replyMessage[10]);
                        }
                        else
                        {
                            this.state = ModelState.ERROR;
                            System.err.println(
                                    "Statistics Error: Stat variable expected = " + statName + ", got: " + replyMessage[8]);
                        }
                    }
                    else if (replyMessage[4].toString().equals("MC.4"))
                    {
                        this.state = ModelState.ERROR;
                        System.err.println("Statistics Error: Stat variable = " + replyMessage[8]);
                        System.err.println("Error message = " + replyMessage[9]);
                    }
                    else
                    {
                        this.state = ModelState.ERROR;
                        System.err.println("Statistics Error: Received unknown message as reply to FM6: " + replyMessage[4]);
                    }
                }
            }
            if (!this.state.isError())
            {
                this.state = ModelState.STATISTICSGATHERED;
            }
        }

        /**
         * Send the FM.8 message to the FederateStarter to kill the MM1 model.
         * @param federationName the name of the federation
         * @throws Sim0MQException on error
         * @throws SerializationException on serialization problem
         */
        private void killFederate(final String federationName) throws Sim0MQException, SerializationException
        {
            byte[] fm8Message;
            fm8Message = SimulationMessage.encodeUTF8(federationName, "FM", "FS", "FM.8", this.messageCount.getAndIncrement(),
                    MessageStatus.NEW, this.modelName);
            this.fsSocket.send(fm8Message);

            byte[] reply = this.fsSocket.recv(0);
            Object[] replyMessage = SimulationMessage.decode(reply);
            System.out.println("Received\n" + SimulationMessage.print(replyMessage));

            if (replyMessage[4].toString().equals("FS.4") && (boolean) replyMessage[9]
                    && replyMessage[8].toString().equals(this.modelName))
            {
                this.state = ModelState.TERMINATED;
            }
            else
            {
                this.state = ModelState.ERROR;
                System.err.println("Model not killed correctly");
                System.err.println("Tried to kill model = " + replyMessage[8]);
                System.err.println("Error message = " + replyMessage[10]);
            }
        }

        /**
         * @return statistics
         */
        public final Map<String, Number> getStatistics()
        {
            return this.statistics;
        }

    }

}