View Javadoc
1   package org.sim0mq.demo;
2   
3   import java.util.Random;
4   import java.util.concurrent.atomic.AtomicInteger;
5   
6   import org.zeromq.ZContext;
7   import org.zeromq.ZMQ;
8   
9   /**
10   * Example from http://stackoverflow.com/questions/20944140/zeromq-route-req-java-example-does-not-work. Added testing of send
11   * messages for correctly sent messages, plus a monitor that checks whether one of the conversations hangs. The lazy pirate
12   * pattern from the 0mq guide has been added to deal with possible timeouts.
13   * <p>
14   * BSD-style license. See <a href="http://opentrafficsim.org/docs/current/license.html">OpenTrafficSim License</a>.
15   * </p>
16   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
17   * initial version Apr 20, 2017 <br>
18   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
19   */
20  public final class RouterToReqExample
21  {
22      /** random stream. */
23      @SuppressWarnings("checkstyle:visibilitymodifier")
24      static Random rand = new Random();
25  
26      /** static counter for worker. */
27      @SuppressWarnings("checkstyle:visibilitymodifier")
28      static AtomicInteger staticWorkerRecv = new AtomicInteger();
29  
30      /** static counter for broker identity. */
31      @SuppressWarnings("checkstyle:visibilitymodifier")
32      static AtomicInteger staticBrokerIdRecv = new AtomicInteger();
33  
34      /** static counter for broker Message. */
35      @SuppressWarnings("checkstyle:visibilitymodifier")
36      static AtomicInteger staticBrokerMsgRecv = new AtomicInteger();
37  
38      /** completed. */
39      @SuppressWarnings("checkstyle:visibilitymodifier")
40      static AtomicInteger completed = new AtomicInteger();
41  
42      /** how many worker threads? */
43      private static final int NBR_WORKERS = 100;
44  
45      /** */
46      private RouterToReqExample()
47      {
48          // Utility clss
49      }
50  
51      /** a worker thread... */
52      private static class Worker extends Thread
53      {
54          /** the worker id. */
55          private String workerId;
56  
57          /**
58           * Construct a worker.
59           * @param workerId worker id
60           */
61          Worker(final String workerId)
62          {
63              this.workerId = workerId;
64          }
65  
66          /** */
67          private static final long TIMEOUT = 100; // ms
68          
69          /** */
70          private static final int RETRIES = 3;
71  
72          @Override
73          public void run()
74          {
75              String endpoint = "tcp://localhost:5671";
76  
77              ZContext context = new ZContext(1);
78              ZMQ.Socket worker = context.createSocket(ZMQ.REQ);
79              worker.setIdentity(this.workerId.getBytes());
80  
81              worker.connect(endpoint);
82  
83              int total = 0;
84              while (true)
85              {
86                  staticWorkerRecv.incrementAndGet();
87                  String message = "Hi Boss";
88                  String workloadResponse = "";
89                  int retriesLeft = RETRIES;
90                  boolean ok = false;
91                  while (retriesLeft > 0 && !ok)
92                  {
93                      // Tell the broker we're ready for work
94                      if (!worker.send(message))
95                      {
96                          System.err.println("worker " + this.workerId + " failed to send...");
97                      }
98                      while (workloadResponse == null || workloadResponse.isEmpty())
99                      {
100                         ZMQ.Poller poller = context.createPoller(1);
101                         poller.register(worker, ZMQ.Poller.POLLIN);
102                         int signalled = poller.poll(TIMEOUT);
103                         poller.unregister(worker);
104                         if (signalled == 1)
105                         {
106                             workloadResponse = worker.recvStr();
107                             // }
108                             //
109                             // // Poll socket for a reply, with timeout
110                             // ZMQ.PollItem items[] = { new ZMQ.PollItem(worker, ZMQ.Poller.POLLIN) };
111                             // int rc = ZMQ.poll(items, TIMEOUT);
112                             // if (rc == -1)
113                             // {
114                             // break; // Interrupted
115                             // }
116                             //
117                             // // Here we process a server reply and exit our loop if the reply is valid. If we didn't a reply
118                             // we close
119                             // // the client socket and resend the request. We try a number of times before finally abandoning:
120                             // if (items[0].isReadable())
121                             // {
122                             // workloadResponse = worker.recvStr();
123                             if (workloadResponse == null)
124                             {
125                                 break; // Interrupted
126                             }
127                             if (workloadResponse.equals("Work harder") || workloadResponse.equals("Fired!"))
128                             {
129                                 retriesLeft = RETRIES;
130                                 ok = true;
131                             }
132                             else
133                             {
134                                 System.err.printf("E: malformed reply from server: %s\n", workloadResponse);
135                             }
136 
137                         }
138                         else if (--retriesLeft == 0)
139                         {
140                             System.err.println("E: server seems to be offline, abandoning\n");
141                             break;
142                         }
143                         else
144                         {
145                             System.err.println("W: no response from server, retrying\n");
146                             // Old socket is confused; close it and open a new one
147                             worker.close();
148                             System.err.println("I: reconnecting to server\n");
149                             worker = context.createSocket(ZMQ.REQ);
150                             worker.setIdentity(this.workerId.getBytes());
151                             worker.connect(endpoint);
152                             // Send message again, on new socket
153                             worker.send(message);
154                         }
155                     }
156                 }
157                 staticWorkerRecv.decrementAndGet();
158                 boolean finished = workloadResponse.equals("Fired!");
159                 if (finished)
160                 {
161                     completed.incrementAndGet();
162                     System.out.printf(this.workerId + " completed: %d tasks\n", total);
163                     break;
164                 }
165                 total++;
166 
167                 // Do some random work
168                 try
169                 {
170                     Thread.sleep(rand.nextInt(10) + 1);
171                 }
172                 catch (InterruptedException e)
173                 {
174                     // ignore
175                 }
176             }
177 
178             worker.close();
179             context.destroy();
180             context.close();
181         }
182     }
183 
184     /**
185      * While this example runs in a single process, that is just to make it easier to start and stop the example. Each thread
186      * has its own context and conceptually acts as a separate process.
187      * @param args args, can be empty
188      * @throws Exception on error
189      */
190     public static void main(final String[] args) throws Exception
191     {
192         ZContext context = new ZContext(1);
193         ZMQ.Socket broker = context.createSocket(ZMQ.ROUTER);
194         broker.bind("tcp://*:5671");
195 
196         System.out.println("Recv buf size = " + broker.getReceiveBufferSize());
197         System.out.println("Send buf size = " + broker.getSendBufferSize());
198         System.out.println("Recv HWM      = " + broker.getRcvHWM());
199         System.out.println("Send HWM      = " + broker.getSndHWM());
200 
201         // starting all workers
202         for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
203         {
204             Thread worker = new Worker("worker-" + workerNbr);
205             worker.start();
206         }
207 
208         // start a monitoring thread of 6 seconds to check hanging program...
209         new Thread()
210         {
211             @Override
212             public void run()
213             {
214                 try
215                 {
216                     Thread.sleep(6000);
217                     System.err.println("staticWorkerRecv    = " + staticWorkerRecv);
218                     System.err.println("staticBrokerIdRecv  = " + staticBrokerIdRecv);
219                     System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
220                     System.exit(-1);
221                 }
222                 catch (InterruptedException exception)
223                 {
224                     // ignore
225                 }
226             }
227         }.start();
228 
229         // Run for five seconds and then tell workers to end
230         long endTime = System.currentTimeMillis() + 5000;
231         int workersFired = 0;
232         while (true)
233         {
234             // Next message gives us least recently used worker
235             staticBrokerIdRecv.incrementAndGet();
236             String identity = broker.recvStr(); // recvStringWithTimeout(broker, 100, "Work harder");
237             staticBrokerIdRecv.decrementAndGet();
238             if (!broker.sendMore(identity))
239             {
240                 System.err.println("broker failed to send identity...");
241             }
242             if (!identity.equals("FAILED"))
243             {
244                 staticBrokerMsgRecv.incrementAndGet();
245                 broker.recvStr(); // Envelope delimiter
246                 broker.recvStr(); // Response from worker
247                 staticBrokerMsgRecv.decrementAndGet();
248                 if (!broker.sendMore(""))
249                 {
250                     System.err.println("broker failed to send delimiter...");
251                 }
252 
253                 // Encourage workers until it's time to fire them
254                 if (System.currentTimeMillis() < endTime)
255                 {
256                     if (!broker.send("Work harder"))
257                     {
258                         System.err.println("broker failed to send work...");
259                     }
260                 }
261                 else
262                 {
263                     if (!broker.send("Fired!"))
264                     {
265                         System.err.println("broker failed to send fired...");
266                     }
267                     if (++workersFired == NBR_WORKERS)
268                     {
269                         break;
270                     }
271                 }
272             }
273         }
274 
275         try
276         {
277             Thread.sleep(500);
278         }
279         catch (InterruptedException exception)
280         {
281             // ignore
282         }
283 
284         System.out.println("completed = " + completed);
285         System.err.println("staticWorkerRecv    = " + staticWorkerRecv);
286         System.err.println("staticBrokerIdRecv  = " + staticBrokerIdRecv);
287         System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
288 
289         broker.close();
290         context.destroy();
291         context.close();
292         System.exit(0);
293     }
294 
295     /**
296      * @param context the context
297      * @param socket the socket
298      * @param timeoutMs timeout in milliseconds
299      * @param resend string to resend if it fails
300      * @return the read string after potential resending of the request or even reconnecting
301      */
302     static String recvStringWithTimeout(final ZContext context, final ZMQ.Socket socket, final long timeoutMs,
303             final String resend)
304     {
305         for (int i = 0; i < 5; i++)
306         {
307             ZMQ.Poller poller = context.createPoller(1);
308             poller.register(socket, ZMQ.Poller.POLLIN);
309             int signalled = poller.poll(timeoutMs);
310             poller.unregister(socket);
311             if (signalled == 1)
312             {
313                 return socket.recvStr();
314             }
315             System.err.println("RETRY... " + resend);
316             if (!socket.send(resend))
317             {
318                 System.err.println("broker failed to resend string");
319             }
320         }
321         System.err.println("FAILED");
322         return "FAILED";
323     }
324 }