View Javadoc
1   package org.sim0mq.test.threads;
2   
3   import java.util.LinkedHashMap;
4   import java.util.Map;
5   
6   import org.zeromq.SocketType;
7   import org.zeromq.ZContext;
8   import org.zeromq.ZMQ;
9   
10  /**
11   * Play with three event producing threads that need to send their message via a shared method to a receiving thread that is
12   * listening. The PUSH-PULL over the inproc protocol with a synchronized send method is used to implement this. Messages are
13   * sent without waiting to not block the sending threads. Therefore, the HWM is set considerably higher to not lose any
14   * messages. A map of thread id to socket is used to create one socket per sending thread.
15   * <p>
16   * Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
17   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
18   * </p>
19   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
20   * initial version 30 Apr 2020 <br>
21   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
22   */
23  public class PushPullThreads
24  {
25      /** the context for this program. */
26      private ZContext ctx;
27      
28      /** map of thread ids to inproc sockets. */
29      private Map<Long, ZMQ.Socket> socketMap = new LinkedHashMap<>(); 
30      
31      /** Total number of push threads. */
32      private final int totalPushThreads;
33  
34      /**
35       * @param args empty
36       */
37      public static void main(final String[] args)
38      {
39          new PushPullThreads();
40      }
41  
42      /**
43       * Constructor.
44       */
45      public PushPullThreads()
46      {
47          this.totalPushThreads = 1000;
48          this.ctx = new ZContext(1);
49          for (int i = 0; i < this.totalPushThreads; i++)
50          {
51              new ProducerThread(this, i);
52          }
53          new ConsumerThread(this.ctx).start();
54      }
55  
56      /**
57       * Process message and push to central thread.
58       * @param message the message to send
59       */
60      public synchronized void call(final String message)
61      {
62          long threadId = Thread.currentThread().getId();
63          ZMQ.Socket pushSocket = this.socketMap.get(threadId);
64          if (pushSocket == null)
65          {
66              pushSocket = this.ctx.createSocket(SocketType.PUSH);
67              pushSocket.setHWM(100000);
68              pushSocket.connect("inproc://bus");
69              this.socketMap.put(threadId, pushSocket);
70              System.out.println("Socket added for thread " + threadId);
71          }
72          pushSocket.send(message, ZMQ.DONTWAIT); // don't block the sending thread
73      }
74  
75      /** */
76      class ProducerThread extends Thread
77      {
78          /** the thread number. */
79          private int threadNr;
80  
81          /** the calling program. */
82          private PushPullThreads program;
83  
84          /**
85           * @param program the calling program with the notify() method
86           * @param threadNr the thread number
87           */
88          ProducerThread(final PushPullThreads program, final int threadNr)
89          {
90              this.program = program;
91              this.threadNr = threadNr;
92              start();
93          }
94  
95          /** {@inheritDoc} */
96          @Override
97          public void run()
98          {
99              for (int i = 0; i < 1000; i++)
100             {
101                 this.program.call("Message from thread " + this.threadNr + " #" + i);
102             }
103             this.program.call("STOP");
104         }
105     }
106 
107     /** */
108     class ConsumerThread extends Thread
109     {
110         /** the context. Should be the same for inproc messages. */
111         private ZContext ctx;
112 
113         /**
114          * Constructor.
115          * @param ctx the context
116          */
117         ConsumerThread(final ZContext ctx)
118         {
119             this.ctx = ctx;
120         }
121 
122         /** {@inheritDoc} */
123         @Override
124         public void run()
125         {
126             ZMQ.Socket pullSocket = this.ctx.createSocket(SocketType.PULL);
127             pullSocket.bind("inproc://bus");
128             int stopCount = 0;
129             int msgCount = 0;
130             while (true)
131             {
132                 String msg = pullSocket.recvStr(0);
133                 if ("STOP".equals(msg))
134                 {
135                     stopCount++;
136                     if (stopCount == totalPushThreads)
137                     {
138                         break;
139                     }
140                 }
141                 else
142                 {
143                     msgCount++;
144                     System.out.println(msg);
145                 }
146             }
147             System.out.println("# messages received = " + msgCount);
148         }
149 
150     }
151 }