FederateStarter.java

package org.sim0mq.federatestarter;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

import org.djutils.io.URLResource;
import org.djutils.serialization.SerializationException;
import org.sim0mq.Sim0MQException;
import org.sim0mq.message.Sim0MQMessage;
import org.sim0mq.message.federatestarter.FS1RequestStatusMessage;
import org.sim0mq.message.federatestarter.FS2FederateStartedMessage;
import org.sim0mq.message.federatestarter.FS4FederateKilledMessage;
import org.sim0mq.message.federationmanager.FM1StartFederateMessage;
import org.sim0mq.message.federationmanager.FM8KillFederateMessage;
import org.sim0mq.message.modelcontroller.MC1StatusMessage;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;

/**
 * The FederateStarter start listening on the given port for messages to start components. Report back via the call-back port on
 * the status of the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
 * <p>
 * Copyright (c) 2016-2024 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
 * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
 * </p>
 * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
 * initial version Mar 1, 2017 <br>
 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
 */
public class FederateStarter
{
    /** the port number to listen on. */
    private final int fsPort;

    /** the first port to be used for the models, inclusive. */
    private final int startPort;

    /** the last port to be used for the models, inclusive. */
    private final int endPort;

    /** the running programs this FederateStarter started. The String identifies the process (e.g., a UUID or a model id). */
    @SuppressWarnings("checkstyle:visibilitymodifier")
    protected Map<Object, Process> runningProcessMap = Collections.synchronizedMap(new LinkedHashMap<>());

    /** the ports where the models listen. The String identifies the process (e.g., a UUID or a model id). */
    private Map<Object, Integer> modelPortMap = Collections.synchronizedMap(new LinkedHashMap<>());

    /** the StartFederate messages. */
    private Map<Object, FM1StartFederateMessage> startFederateMessages = Collections.synchronizedMap(new LinkedHashMap<>());

    /** the software properties. */
    @SuppressWarnings("checkstyle:visibilitymodifier")
    final Properties softwareProperties;

    /** the 0mq socket. */
    private ZMQ.Socket fsSocket;

    /** the 0mq context. */
    private ZContext fsContext;

    /** message count. */
    private long messageCount = 0;

    /** does the Federate Starter concern models with an MC or just processes? */
    private final boolean modelController;

    /**
     * @param fsPort the port number to listen on
     * @param softwareProperties the software properties to use
     * @param startPort first port to be used for the models, inclusive
     * @param endPort last port to be used for the models, inclusive
     * @param modelController does the Federate Starter concern models with an MC or just processes?
     * @throws Sim0MQException on error
     * @throws SerializationException on error
     */
    public FederateStarter(final int fsPort, final Properties softwareProperties, final int startPort, final int endPort,
            final boolean modelController) throws Sim0MQException, SerializationException
    {
        this.softwareProperties = softwareProperties;
        this.fsPort = fsPort;
        this.startPort = startPort;
        this.endPort = endPort;
        this.modelController = modelController;

        this.fsContext = new ZContext(1);

        this.fsSocket = this.fsContext.createSocket(SocketType.ROUTER);
        this.fsSocket.bind("tcp://*:" + this.fsPort);

        while (!Thread.currentThread().isInterrupted())
        {
            // Wait for next request from the client -- first the identity (String) and the delimiter (#0)
            try
            {
                String identity = this.fsSocket.recvStr();
                this.fsSocket.recvStr();

                byte[] request = this.fsSocket.recv(0);
                Sim0MQMessage message = Sim0MQMessage.decode(request);
                String messageTypeId = message.getMessageTypeId().toString();
                String receiverId = message.getReceiverId().toString();

                System.out.println("Received " + Sim0MQMessage.print(message.createObjectArray()));

                if (receiverId.equals("FS"))
                {
                    switch (messageTypeId)
                    {
                        case "FM.1":
                            processStartFederate(identity, message);
                            break;

                        case "FM.8":
                            processKillFederate(identity, message);
                            break;

                        case "FM.9":
                            // processKillAllFederates(senderId, uniqueId);
                            break;

                        default:
                            // wrong message
                            System.err.println("Received unknown message -- not processed: " + messageTypeId);
                    }
                }
                else
                {
                    // wrong receiver
                    System.err.println("Received message not intended for FS but for " + receiverId + " -- not processed: ");
                }
            }
            catch (ZMQException e)
            {
                System.err.println(e.getMessage());
            }
        }

        try
        {
            this.fsSocket.close();
            this.fsContext.destroy();
        }
        catch (Exception e)
        {
            System.err.println(e.getMessage());
        }
    }

    /**
     * Process FM.2 message and send MC.2 message back.
     * @param identity reply id for REQ-ROUTER pattern
     * @param message Message; the message
     * @throws Sim0MQException on error
     * @throws SerializationException on error
     */
    private void processStartFederate(final String identity, final Sim0MQMessage message)
            throws Sim0MQException, SerializationException
    {
        FM1StartFederateMessage startFederateMessage = new FM1StartFederateMessage(message.createObjectArray());
        String error = "";

        int modelPort = findFreePortNumber();

        if (modelPort == -1)
        {
            error = "No free port number";
        }

        else

        {
            try
            {
                ProcessBuilder pb = new ProcessBuilder();

                Path workingPath = Files.createDirectories(Paths.get(startFederateMessage.getWorkingDirectory()));
                pb.directory(workingPath.toFile());

                String softwareCode = "";
                if (!this.softwareProperties.containsKey(startFederateMessage.getSoftwareCode()))
                {
                    System.err.println("Could not find software alias " + startFederateMessage.getSoftwareCode()
                            + " in software properties file");
                }
                else
                {
                    softwareCode = this.softwareProperties.getProperty(startFederateMessage.getSoftwareCode());

                    List<String> pbArgs = new ArrayList<>();
                    pbArgs.add(softwareCode);
                    pbArgs.add(startFederateMessage.getArgsBefore());
                    pbArgs.add(startFederateMessage.getModelPath());
                    pbArgs.addAll(Arrays.asList(
                            startFederateMessage.getArgsAfter().replaceAll("%PORT%", String.valueOf(modelPort)).split(" ")));
                    pb.command(pbArgs);

                    String stdIn = startFederateMessage.getRedirectStdin();
                    String stdOut = startFederateMessage.getRedirectStdout();
                    String stdErr = startFederateMessage.getRedirectStderr();

                    if (stdIn.length() > 0)
                    {
                        // TODO working dir path if not absolute?
                        File stdInFile = new File(stdIn);
                        pb.redirectInput(stdInFile);
                    }

                    if (stdOut.length() > 0)
                    {
                        // TODO working dir path if not absolute?
                        File stdOutFile = new File(stdOut);
                        pb.redirectOutput(stdOutFile);
                    }

                    if (stdErr.length() > 0)
                    {
                        // TODO working dir path if not absolute?
                        File stdErrFile = new File(stdErr);
                        pb.redirectError(stdErrFile);
                    }

                    new Thread()
                    {
                        @Override
                        public void run()
                        {
                            try
                            {
                                Process process = pb.start();
                                FederateStarter.this.runningProcessMap.put(startFederateMessage.getInstanceId(), process);
                                System.err.println("Process started:" + process.isAlive());
                            }
                            catch (IOException exception)
                            {
                                exception.printStackTrace();
                            }
                        }
                    }.start();

                    this.modelPortMap.put(startFederateMessage.getInstanceId(), modelPort);
                    this.startFederateMessages.put(startFederateMessage.getInstanceId(), startFederateMessage);

                    // Thread.sleep(1000);

                    // wait till the model is ready...
                    System.out.println("modelController : " + this.modelController);
                    if (this.modelController)
                    {
                        error = waitForModelStarted(startFederateMessage.getFederationId(),
                                startFederateMessage.getInstanceId(), modelPort);
                    }
                }
            }
            catch (IOException exception)
            {
                exception.printStackTrace();
                error = exception.getMessage();
            }
        }

        System.out.println("SEND MESSAGE FS.2 ABOUT MODEL " + startFederateMessage.getInstanceId() + " @ port " + modelPort);

        // Send reply back to client
        this.fsSocket.sendMore(identity);
        this.fsSocket.sendMore("");
        //@formatter:off
        byte[] fs2Message = new FS2FederateStartedMessage.Builder()
                .setSimulationRunId(startFederateMessage.getFederationId())
                .setInstanceId(startFederateMessage.getInstanceId())
                .setSenderId("FS")
                .setReceiverId(startFederateMessage.getSenderId())
                .setMessageId(++this.messageCount)
                .setStatus(error.isEmpty() ? "started" : "error")
                .setError(error)
                .setModelPort(modelPort)
                .build()
                .createByteArray();
        this.fsSocket.send(fs2Message, 0);
        //@formatter:on
    }

    /**
     * Find a free port for the model.
     * @return the first free fort number in the range startPort - endPort, inclusive
     */
    private int findFreePortNumber()
    {
        for (int port = this.startPort; port <= this.endPort; port++)
        {
            if (!this.modelPortMap.containsValue(port))
            {
                // try if the port is really free
                ZMQ.Socket testSocket = null;
                try
                {
                    testSocket = this.fsContext.createSocket(SocketType.REP);
                    testSocket.bind("tcp://127.0.0.1:" + port);
                    testSocket.unbind("tcp://127.0.0.1:" + port);
                    testSocket.close();
                    return port;
                }
                catch (Exception exception)
                {
                    // port was not free
                    if (testSocket != null)
                    {
                        try
                        {
                            testSocket.close();
                        }
                        catch (Exception e)
                        {
                            // ignore.
                        }
                    }
                }
            }
        }
        return -1;
    }

    /**
     * Wait for simulation to end using status polling with message FM.5.
     * @param federationRunId the name of the federation
     * @param modelId the String id of the model
     * @param modelPort port on which the model is listening
     * @return empty String for no error, filled String for error
     * @throws Sim0MQException on error
     * @throws SerializationException on error
     */
    private String waitForModelStarted(final Object federationRunId, final Object modelId, final int modelPort)
            throws Sim0MQException, SerializationException
    {
        boolean ok = true;
        String error = "";
        ZMQ.Socket modelSocket = null;
        try
        {
            modelSocket = this.fsContext.createSocket(SocketType.REQ);
            modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
            modelSocket.connect("tcp://127.0.0.1:" + modelPort);
        }
        catch (Exception exception)
        {
            exception.printStackTrace();
            ok = false;
            error = exception.getMessage();
        }

        boolean started = false;
        while (ok && !started)
        {
            byte[] fs1Message =
                    new FS1RequestStatusMessage(federationRunId, "FS", modelId, ++this.messageCount).createByteArray();
            modelSocket.send(fs1Message, 0);
            System.out.println("Sent: FS.1 to " + modelId + ", waiting on MC1");

            byte[] reply = modelSocket.recv(0);
            Object[] objectArray = Sim0MQMessage.decodeToArray(reply);
            System.out.println("Received\n" + Sim0MQMessage.print(objectArray));
            MC1StatusMessage replyMessage = new MC1StatusMessage(objectArray);

            // Synchronous and single-threaded, so the messageCount cannot change between send and receive
            if (!replyMessage.getStatus().equals("error") && !replyMessage.getStatus().equals("ended")
                    && ((Long) replyMessage.getReplyToId()).longValue() == this.messageCount)
            {
                if (replyMessage.getStatus().equals("started"))
                {
                    started = true;
                }
                else
                {
                    // wait a second
                    try
                    {
                        Thread.sleep(100);
                    }
                    catch (InterruptedException ie)
                    {
                        // ignore
                    }
                }
            }
            else
            {
                ok = false;
                error = replyMessage.getError();
                System.err.println("Simulation start error -- status = " + replyMessage.getStatus());
                System.err.println("Error message = " + error);
            }
        }

        if (modelSocket != null)
        {
            modelSocket.close();
        }

        return error;
    }

    /**
     * Process FM.8 message and send FS.4 message back.
     * @param identity reply id for REQ-ROUTER pattern
     * @param message the message
     * @throws Sim0MQException on error
     * @throws SerializationException on error
     */
    private void processKillFederate(final String identity, final Sim0MQMessage message)
            throws Sim0MQException, SerializationException
    {
        boolean status = true;
        String error = "";

        Object federationRunId = message.getFederationId();
        Object senderId = message.getSenderId();

        FM8KillFederateMessage killMessage = new FM8KillFederateMessage(message.createObjectArray());
        Object modelId = killMessage.getInstanceId();
        if (!this.modelPortMap.containsKey(modelId))
        {
            status = false;
            error = "model " + modelId + " unknown -- this model is unknown to the FederateStarter";
        }
        else
        {
            int modelPort = this.modelPortMap.remove(modelId);
            Process process = this.runningProcessMap.remove(modelId);

            try
            {
                try
                {
                    ZMQ.Socket modelSocket = this.fsContext.createSocket(SocketType.REQ);
                    modelSocket.setIdentity(UUID.randomUUID().toString().getBytes());
                    modelSocket.connect("tcp://127.0.0.1:" + modelPort);

                    byte[] fs3Message =
                            Sim0MQMessage.encodeUTF8(true, federationRunId, "FS", modelId, "FS.3", ++this.messageCount);
                    modelSocket.send(fs3Message, 0);

                    modelSocket.close();
                }
                catch (Exception exception)
                {
                    exception.printStackTrace();
                    status = true;
                    error = exception.getMessage();
                }

                try
                {
                    Thread.sleep(100);
                }
                catch (InterruptedException ie)
                {
                    // ignore
                }

                if (process != null && process.isAlive())
                {
                    process.destroyForcibly();
                }

                FM1StartFederateMessage sfm = this.startFederateMessages.get(modelId);
                if (sfm.isDeleteStdout())
                {
                    if (sfm.getRedirectStdout().length() > 0)
                    {
                        File stdOutFile = new File(sfm.getRedirectStdout());
                        stdOutFile.delete();
                    }
                }

                if (sfm.isDeleteStderr())
                {
                    if (sfm.getRedirectStderr().length() > 0)
                    {
                        File stdErrFile = new File(sfm.getRedirectStderr());
                        stdErrFile.delete();
                    }
                }

                if (sfm.isDeleteWorkingDirectory())
                {
                    File workingDir = new File(sfm.getWorkingDirectory());
                    workingDir.delete();
                }
            }
            catch (Exception exception)
            {
                exception.printStackTrace();
                status = false;
                error = exception.getMessage();
            }

            byte[] fs4Message =
                    new FS4FederateKilledMessage(federationRunId, "FS", senderId, ++this.messageCount, modelId, status, error)
                            .createByteArray();
            this.fsSocket.sendMore(identity);
            this.fsSocket.sendMore("");
            this.fsSocket.send(fs4Message, 0);
        }
    }

    /**
     * @return modelController
     */
    public boolean isModelController()
    {
        return this.modelController;
    }

    /**
     * Start listening on the given port for messages to start components. Report back via the call-back port on the status of
     * the started components. If necessary, the FederateStarter can also forcefully stop a started (sub)process.
     * @param args the federation name and port on which the FederateStarter is listening
     * @throws Sim0MQException on error
     * @throws SerializationException on error
     */
    public static void main(final String[] args) throws Sim0MQException, SerializationException
    {
        if (args.length < 4)
        {
            System.err.println("Use as FederateStarter portNumber software_properties_file startPort endPort");
            System.exit(-1);
        }

        String sPort = args[0];
        int port = 0;
        try
        {
            port = Integer.parseInt(sPort);
        }
        catch (NumberFormatException nfe)
        {
            System.err.println("Use as FederateStarter portNumber, where portNumber is a number");
            System.exit(-1);
        }
        if (port == 0 || port > 65535)
        {
            System.err.println("PortNumber should be between 1 and 65535");
            System.exit(-1);
        }

        String propertiesFile = args[1];
        Properties softwareProperties = new Properties();
        InputStream propertiesStream = URLResource.getResourceAsStream(propertiesFile);
        try
        {
            softwareProperties.load(propertiesStream);
        }
        catch (IOException | NullPointerException e)
        {
            System.err.println("Could not find or read software properties file " + propertiesFile);
            System.exit(-1);
        }

        String sStartPort = args[2];
        int startPort = 0;
        try
        {
            startPort = Integer.parseInt(sStartPort);
        }
        catch (NumberFormatException nfe)
        {
            System.err.println("Use as FederateStarter pn file startPort endPort, where startPort is a number");
            System.exit(-1);
        }
        if (startPort == 0 || startPort > 65535)
        {
            System.err.println("startPort should be between 1 and 65535");
            System.exit(-1);
        }

        String sEndPort = args[3];
        int endPort = 0;
        try
        {
            endPort = Integer.parseInt(sEndPort);
        }
        catch (NumberFormatException nfe)
        {
            System.err.println("Use as FederateStarter pn file startPort endPort, where endPort is a number");
            System.exit(-1);
        }
        if (endPort == 0 || endPort > 65535)
        {
            System.err.println("endPort should be between 1 and 65535");
            System.exit(-1);
        }

        new FederateStarter(port, softwareProperties, startPort, endPort, true);
    }

}