RouterToReqExample.java

package org.sim0mq.demo;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import org.zeromq.ZMQ;

/**
 * Example from http://stackoverflow.com/questions/20944140/zeromq-route-req-java-example-does-not-work. Added testing of send
 * messages for correctly sent messages, plus a monitor that checks whether one of the conversations hangs. The lazy pirate
 * pattern from the 0mq guide has been added to deal with possible timeouts.
 * <p>
 * BSD-style license. See <a href="http://opentrafficsim.org/docs/current/license.html">OpenTrafficSim License</a>.
 * </p>
 * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
 * initial version Apr 20, 2017 <br>
 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
 */
public class RouterToReqExample
{
    /** random stream. */
    static Random rand = new Random();

    /** static counter for worker. */
    static AtomicInteger staticWorkerRecv = new AtomicInteger();

    /** static counter for broker identity. */
    static AtomicInteger staticBrokerIdRecv = new AtomicInteger();

    /** static counter for broker Message. */
    static AtomicInteger staticBrokerMsgRecv = new AtomicInteger();

    /** completed. */
    static AtomicInteger completed = new AtomicInteger();

    /** how many worker threads? */
    private static final int NBR_WORKERS = 100;

    /** a worker thread... */
    private static class Worker extends Thread
    {
        /** the worker id. */
        private String workerId;

        /**
         * Construct a worker.
         * @param workerId worker id
         */
        Worker(final String workerId)
        {
            this.workerId = workerId;
        }

        @Override
        public void run()
        {
            long TIMEOUT = 100; // ms
            int RETRIES = 3;
            String ENDPOINT = "tcp://localhost:5671";

            ZMQ.Context context = ZMQ.context(1);
            ZMQ.Socket worker = context.socket(ZMQ.REQ);
            worker.setIdentity(this.workerId.getBytes());

            worker.connect(ENDPOINT);

            int total = 0;
            while (true)
            {
                staticWorkerRecv.incrementAndGet();
                String message = "Hi Boss";
                String workloadResponse = "";
                int retriesLeft = RETRIES;
                boolean ok = false;
                while (retriesLeft > 0 && !ok)
                {
                    // Tell the broker we're ready for work
                    if (!worker.send(message))
                    {
                        System.err.println("worker " + this.workerId + " failed to send...");
                    }
                    while (workloadResponse == null || workloadResponse.isEmpty())
                    {
                        ZMQ.Poller poller = context.poller(1);
                        poller.register(worker, ZMQ.Poller.POLLIN);
                        int signalled = poller.poll(TIMEOUT);
                        poller.unregister(worker);
                        if (signalled == 1)
                        {
                            workloadResponse = worker.recvStr();
                            // }
                            //
                            // // Poll socket for a reply, with timeout
                            // ZMQ.PollItem items[] = { new ZMQ.PollItem(worker, ZMQ.Poller.POLLIN) };
                            // int rc = ZMQ.poll(items, TIMEOUT);
                            // if (rc == -1)
                            // {
                            // break; // Interrupted
                            // }
                            //
                            // // Here we process a server reply and exit our loop if the reply is valid. If we didn't a reply
                            // we close
                            // // the client socket and resend the request. We try a number of times before finally abandoning:
                            // if (items[0].isReadable())
                            // {
                            // workloadResponse = worker.recvStr();
                            if (workloadResponse == null)
                                break; // Interrupted
                            if (workloadResponse.equals("Work harder") || workloadResponse.equals("Fired!"))
                            {
                                retriesLeft = RETRIES;
                                ok = true;
                            }
                            else
                                System.err.printf("E: malformed reply from server: %s\n", workloadResponse);

                        }
                        else if (--retriesLeft == 0)
                        {
                            System.err.println("E: server seems to be offline, abandoning\n");
                            break;
                        }
                        else
                        {
                            System.err.println("W: no response from server, retrying\n");
                            // Old socket is confused; close it and open a new one
                            worker.close();
                            System.err.println("I: reconnecting to server\n");
                            worker = context.socket(ZMQ.REQ);
                            worker.setIdentity(this.workerId.getBytes());
                            worker.connect(ENDPOINT);
                            // Send message again, on new socket
                            worker.send(message);
                        }
                    }
                }
                staticWorkerRecv.decrementAndGet();
                boolean finished = workloadResponse.equals("Fired!");
                if (finished)
                {
                    completed.incrementAndGet();
                    System.out.printf(this.workerId + " completed: %d tasks\n", total);
                    break;
                }
                total++;

                // Do some random work
                try
                {
                    Thread.sleep(rand.nextInt(10) + 1);
                }
                catch (InterruptedException e)
                {
                    // ignore
                }
            }

            worker.close();
            context.term();
        }
    }

    /**
     * While this example runs in a single process, that is just to make it easier to start and stop the example. Each thread
     * has its own context and conceptually acts as a separate process.
     * @param args args, can be empty
     * @throws Exception on error
     */
    public static void main(String[] args) throws Exception
    {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket broker = context.socket(ZMQ.ROUTER);
        broker.bind("tcp://*:5671");

        System.out.println("Recv buf size = " + broker.getReceiveBufferSize());
        System.out.println("Send buf size = " + broker.getSendBufferSize());
        System.out.println("Recv HWM      = " + broker.getRcvHWM());
        System.out.println("Send HWM      = " + broker.getSndHWM());

        // starting all workers
        for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
        {
            Thread worker = new Worker("worker-" + workerNbr);
            worker.start();
        }

        // start a monitoring thread of 6 seconds to check hanging program...
        new Thread()
        {
            @Override
            public void run()
            {
                try
                {
                    Thread.sleep(6000);
                    System.err.println("staticWorkerRecv    = " + staticWorkerRecv);
                    System.err.println("staticBrokerIdRecv  = " + staticBrokerIdRecv);
                    System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
                    System.exit(-1);
                }
                catch (InterruptedException exception)
                {
                    // ignore
                }
            }
        }.start();

        // Run for five seconds and then tell workers to end
        long endTime = System.currentTimeMillis() + 5000;
        int workersFired = 0;
        while (true)
        {
            // Next message gives us least recently used worker
            staticBrokerIdRecv.incrementAndGet();
            String identity = broker.recvStr(); // recvStringWithTimeout(broker, 100, "Work harder");
            staticBrokerIdRecv.decrementAndGet();
            if (!broker.sendMore(identity))
            {
                System.err.println("broker failed to send identity...");
            }
            if (!identity.equals("FAILED"))
            {
                staticBrokerMsgRecv.incrementAndGet();
                broker.recvStr(); // Envelope delimiter
                broker.recvStr(); // Response from worker
                staticBrokerMsgRecv.decrementAndGet();
                if (!broker.sendMore(""))
                {
                    System.err.println("broker failed to send delimiter...");
                }

                // Encourage workers until it's time to fire them
                if (System.currentTimeMillis() < endTime)
                {
                    if (!broker.send("Work harder"))
                    {
                        System.err.println("broker failed to send work...");
                    }
                }
                else
                {
                    if (!broker.send("Fired!"))
                    {
                        System.err.println("broker failed to send fired...");
                    }
                    if (++workersFired == NBR_WORKERS)
                    {
                        break;
                    }
                }
            }
        }

        try
        {
            Thread.sleep(500);
        }
        catch (InterruptedException exception)
        {
            // ignore
        }

        System.out.println("completed = " + completed);
        System.err.println("staticWorkerRecv    = " + staticWorkerRecv);
        System.err.println("staticBrokerIdRecv  = " + staticBrokerIdRecv);
        System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);

        broker.close();
        context.term();
        System.exit(0);
    }

    /**
     * @param context the context
     * @param socket the socket
     * @param timeoutMs timeout in milliseconds
     * @param resend string to resend if it fails
     * @return the read string after potential resending of the request or even reconnecting
     */
    static String recvStringWithTimeout(final ZMQ.Context context, final ZMQ.Socket socket, final long timeoutMs,
            final String resend)
    {
        for (int i = 0; i < 5; i++)
        {
            ZMQ.Poller poller = context.poller(1);
            poller.register(socket, ZMQ.Poller.POLLIN);
            int signalled = poller.poll(timeoutMs);
            poller.unregister(socket);
            if (signalled == 1)
            {
                return socket.recvStr();
            }
            System.err.println("RETRY... " + resend);
            if (!socket.send(resend))
            {
                System.err.println("broker failed to resend string");
            }
        }
        System.err.println("FAILED");
        return "FAILED";
    }
}