PushPullThreads.java

package org.sim0mq.test.threads;

import java.util.LinkedHashMap;
import java.util.Map;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

/**
 * Play with three event producing threads that need to send their message via a shared method to a receiving thread that is
 * listening. The PUSH-PULL over the inproc protocol with a synchronized send method is used to implement this. Messages are
 * sent without waiting to not block the sending threads. Therefore, the HWM is set considerably higher to not lose any
 * messages. A map of thread id to socket is used to create one socket per sending thread.
 * <p>
 * Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
 * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
 * </p>
 * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
 * initial version 30 Apr 2020 <br>
 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
 */
public class PushPullThreads
{
    /** the context for this program. */
    private ZContext ctx;
    
    /** map of thread ids to inproc sockets. */
    private Map<Long, ZMQ.Socket> socketMap = new LinkedHashMap<>(); 
    
    /** Total number of push threads. */
    private final int totalPushThreads;

    /**
     * @param args empty
     */
    public static void main(final String[] args)
    {
        new PushPullThreads();
    }

    /**
     * Constructor.
     */
    public PushPullThreads()
    {
        this.totalPushThreads = 1000;
        this.ctx = new ZContext(1);
        for (int i = 0; i < this.totalPushThreads; i++)
        {
            new ProducerThread(this, i);
        }
        new ConsumerThread(this.ctx).start();
    }

    /**
     * Process message and push to central thread.
     * @param message the message to send
     */
    public synchronized void call(final String message)
    {
        long threadId = Thread.currentThread().getId();
        ZMQ.Socket pushSocket = this.socketMap.get(threadId);
        if (pushSocket == null)
        {
            pushSocket = this.ctx.createSocket(SocketType.PUSH);
            pushSocket.setHWM(100000);
            pushSocket.connect("inproc://bus");
            this.socketMap.put(threadId, pushSocket);
            System.out.println("Socket added for thread " + threadId);
        }
        pushSocket.send(message, ZMQ.DONTWAIT); // don't block the sending thread
    }

    /** */
    class ProducerThread extends Thread
    {
        /** the thread number. */
        private int threadNr;

        /** the calling program. */
        private PushPullThreads program;

        /**
         * @param program the calling program with the notify() method
         * @param threadNr the thread number
         */
        ProducerThread(final PushPullThreads program, final int threadNr)
        {
            this.program = program;
            this.threadNr = threadNr;
            start();
        }

        /** {@inheritDoc} */
        @Override
        public void run()
        {
            for (int i = 0; i < 1000; i++)
            {
                this.program.call("Message from thread " + this.threadNr + " #" + i);
            }
            this.program.call("STOP");
        }
    }

    /** */
    class ConsumerThread extends Thread
    {
        /** the context. Should be the same for inproc messages. */
        private ZContext ctx;

        /**
         * Constructor.
         * @param ctx the context
         */
        ConsumerThread(final ZContext ctx)
        {
            this.ctx = ctx;
        }

        /** {@inheritDoc} */
        @Override
        public void run()
        {
            ZMQ.Socket pullSocket = this.ctx.createSocket(SocketType.PULL);
            pullSocket.bind("inproc://bus");
            int stopCount = 0;
            int msgCount = 0;
            while (true)
            {
                String msg = pullSocket.recvStr(0);
                if ("STOP".equals(msg))
                {
                    stopCount++;
                    if (stopCount == totalPushThreads)
                    {
                        break;
                    }
                }
                else
                {
                    msgCount++;
                    System.out.println(msg);
                }
            }
            System.out.println("# messages received = " + msgCount);
        }

    }
}