View Javadoc
1   package org.sim0mq.demo;
2   
3   import java.util.Random;
4   import java.util.concurrent.atomic.AtomicInteger;
5   
6   import org.zeromq.SocketType;
7   import org.zeromq.ZContext;
8   import org.zeromq.ZMQ;
9   
10  /**
11   * Example from http://stackoverflow.com/questions/20944140/zeromq-route-req-java-example-does-not-work. Added testing of send
12   * messages for correctly sent messages, plus a monitor that checks whether one of the conversations hangs. The lazy pirate
13   * pattern from the 0mq guide has been added to deal with possible timeouts.
14   * <p>
15   * BSD-style license. See <a href="http://opentrafficsim.org/docs/current/license.html">OpenTrafficSim License</a>.
16   * </p>
17   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
18   * initial version Apr 20, 2017 <br>
19   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
20   */
21  public final class RouterToReqExample
22  {
23      /** random stream. */
24      @SuppressWarnings("checkstyle:visibilitymodifier")
25      static Random rand = new Random();
26  
27      /** static counter for worker. */
28      @SuppressWarnings("checkstyle:visibilitymodifier")
29      static AtomicInteger staticWorkerRecv = new AtomicInteger();
30  
31      /** static counter for broker identity. */
32      @SuppressWarnings("checkstyle:visibilitymodifier")
33      static AtomicInteger staticBrokerIdRecv = new AtomicInteger();
34  
35      /** static counter for broker Message. */
36      @SuppressWarnings("checkstyle:visibilitymodifier")
37      static AtomicInteger staticBrokerMsgRecv = new AtomicInteger();
38  
39      /** completed. */
40      @SuppressWarnings("checkstyle:visibilitymodifier")
41      static AtomicInteger completed = new AtomicInteger();
42  
43      /** how many worker threads? */
44      private static final int NBR_WORKERS = 100;
45  
46      /** */
47      private RouterToReqExample()
48      {
49          // Utility clss
50      }
51  
52      /** a worker thread... */
53      private static class Worker extends Thread
54      {
55          /** the worker id. */
56          private String workerId;
57  
58          /**
59           * Construct a worker.
60           * @param workerId worker id
61           */
62          Worker(final String workerId)
63          {
64              this.workerId = workerId;
65          }
66  
67          /** */
68          private static final long TIMEOUT = 100; // ms
69          
70          /** */
71          private static final int RETRIES = 3;
72  
73          @Override
74          public void run()
75          {
76              String endpoint = "tcp://localhost:5671";
77  
78              ZContext context = new ZContext(1);
79              ZMQ.Socket worker = context.createSocket(SocketType.REQ);
80              worker.setIdentity(this.workerId.getBytes());
81  
82              worker.connect(endpoint);
83  
84              int total = 0;
85              while (true)
86              {
87                  staticWorkerRecv.incrementAndGet();
88                  String message = "Hi Boss";
89                  String workloadResponse = "";
90                  int retriesLeft = RETRIES;
91                  boolean ok = false;
92                  while (retriesLeft > 0 && !ok)
93                  {
94                      // Tell the broker we're ready for work
95                      if (!worker.send(message))
96                      {
97                          System.err.println("worker " + this.workerId + " failed to send...");
98                      }
99                      while (workloadResponse == null || workloadResponse.isEmpty())
100                     {
101                         ZMQ.Poller poller = context.createPoller(1);
102                         poller.register(worker, ZMQ.Poller.POLLIN);
103                         int signalled = poller.poll(TIMEOUT);
104                         poller.unregister(worker);
105                         if (signalled == 1)
106                         {
107                             workloadResponse = worker.recvStr();
108                             // }
109                             //
110                             // // Poll socket for a reply, with timeout
111                             // ZMQ.PollItem items[] = { new ZMQ.PollItem(worker, ZMQ.Poller.POLLIN) };
112                             // int rc = ZMQ.poll(items, TIMEOUT);
113                             // if (rc == -1)
114                             // {
115                             // break; // Interrupted
116                             // }
117                             //
118                             // // Here we process a server reply and exit our loop if the reply is valid. If we didn't a reply
119                             // we close
120                             // // the client socket and resend the request. We try a number of times before finally abandoning:
121                             // if (items[0].isReadable())
122                             // {
123                             // workloadResponse = worker.recvStr();
124                             if (workloadResponse == null)
125                             {
126                                 break; // Interrupted
127                             }
128                             if (workloadResponse.equals("Work harder") || workloadResponse.equals("Fired!"))
129                             {
130                                 retriesLeft = RETRIES;
131                                 ok = true;
132                             }
133                             else
134                             {
135                                 System.err.printf("E: malformed reply from server: %s\n", workloadResponse);
136                             }
137 
138                         }
139                         else if (--retriesLeft == 0)
140                         {
141                             System.err.println("E: server seems to be offline, abandoning\n");
142                             break;
143                         }
144                         else
145                         {
146                             System.err.println("W: no response from server, retrying\n");
147                             // Old socket is confused; close it and open a new one
148                             worker.close();
149                             System.err.println("I: reconnecting to server\n");
150                             worker = context.createSocket(SocketType.REQ);
151                             worker.setIdentity(this.workerId.getBytes());
152                             worker.connect(endpoint);
153                             // Send message again, on new socket
154                             worker.send(message);
155                         }
156                     }
157                 }
158                 staticWorkerRecv.decrementAndGet();
159                 boolean finished = workloadResponse.equals("Fired!");
160                 if (finished)
161                 {
162                     completed.incrementAndGet();
163                     System.out.printf(this.workerId + " completed: %d tasks\n", total);
164                     break;
165                 }
166                 total++;
167 
168                 // Do some random work
169                 try
170                 {
171                     Thread.sleep(rand.nextInt(10) + 1);
172                 }
173                 catch (InterruptedException e)
174                 {
175                     // ignore
176                 }
177             }
178 
179             worker.close();
180             context.destroy();
181             context.close();
182         }
183     }
184 
185     /**
186      * While this example runs in a single process, that is just to make it easier to start and stop the example. Each thread
187      * has its own context and conceptually acts as a separate process.
188      * @param args args, can be empty
189      * @throws Exception on error
190      */
191     public static void main(final String[] args) throws Exception
192     {
193         ZContext context = new ZContext(1);
194         ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
195         broker.bind("tcp://*:5671");
196 
197         System.out.println("Recv buf size = " + broker.getReceiveBufferSize());
198         System.out.println("Send buf size = " + broker.getSendBufferSize());
199         System.out.println("Recv HWM      = " + broker.getRcvHWM());
200         System.out.println("Send HWM      = " + broker.getSndHWM());
201 
202         // starting all workers
203         for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
204         {
205             Thread worker = new Worker("worker-" + workerNbr);
206             worker.start();
207         }
208 
209         // start a monitoring thread of 6 seconds to check hanging program...
210         new Thread()
211         {
212             @Override
213             public void run()
214             {
215                 try
216                 {
217                     Thread.sleep(6000);
218                     System.err.println("staticWorkerRecv    = " + staticWorkerRecv);
219                     System.err.println("staticBrokerIdRecv  = " + staticBrokerIdRecv);
220                     System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
221                     System.exit(-1);
222                 }
223                 catch (InterruptedException exception)
224                 {
225                     // ignore
226                 }
227             }
228         }.start();
229 
230         // Run for five seconds and then tell workers to end
231         long endTime = System.currentTimeMillis() + 5000;
232         int workersFired = 0;
233         while (true)
234         {
235             // Next message gives us least recently used worker
236             staticBrokerIdRecv.incrementAndGet();
237             String identity = broker.recvStr(); // recvStringWithTimeout(broker, 100, "Work harder");
238             staticBrokerIdRecv.decrementAndGet();
239             if (!broker.sendMore(identity))
240             {
241                 System.err.println("broker failed to send identity...");
242             }
243             if (!identity.equals("FAILED"))
244             {
245                 staticBrokerMsgRecv.incrementAndGet();
246                 broker.recvStr(); // Envelope delimiter
247                 broker.recvStr(); // Response from worker
248                 staticBrokerMsgRecv.decrementAndGet();
249                 if (!broker.sendMore(""))
250                 {
251                     System.err.println("broker failed to send delimiter...");
252                 }
253 
254                 // Encourage workers until it's time to fire them
255                 if (System.currentTimeMillis() < endTime)
256                 {
257                     if (!broker.send("Work harder"))
258                     {
259                         System.err.println("broker failed to send work...");
260                     }
261                 }
262                 else
263                 {
264                     if (!broker.send("Fired!"))
265                     {
266                         System.err.println("broker failed to send fired...");
267                     }
268                     if (++workersFired == NBR_WORKERS)
269                     {
270                         break;
271                     }
272                 }
273             }
274         }
275 
276         try
277         {
278             Thread.sleep(500);
279         }
280         catch (InterruptedException exception)
281         {
282             // ignore
283         }
284 
285         System.out.println("completed = " + completed);
286         System.err.println("staticWorkerRecv    = " + staticWorkerRecv);
287         System.err.println("staticBrokerIdRecv  = " + staticBrokerIdRecv);
288         System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
289 
290         broker.close();
291         context.destroy();
292         context.close();
293         System.exit(0);
294     }
295 
296     /**
297      * @param context the context
298      * @param socket the socket
299      * @param timeoutMs timeout in milliseconds
300      * @param resend string to resend if it fails
301      * @return the read string after potential resending of the request or even reconnecting
302      */
303     static String recvStringWithTimeout(final ZContext context, final ZMQ.Socket socket, final long timeoutMs,
304             final String resend)
305     {
306         for (int i = 0; i < 5; i++)
307         {
308             ZMQ.Poller poller = context.createPoller(1);
309             poller.register(socket, ZMQ.Poller.POLLIN);
310             int signalled = poller.poll(timeoutMs);
311             poller.unregister(socket);
312             if (signalled == 1)
313             {
314                 return socket.recvStr();
315             }
316             System.err.println("RETRY... " + resend);
317             if (!socket.send(resend))
318             {
319                 System.err.println("broker failed to resend string");
320             }
321         }
322         System.err.println("FAILED");
323         return "FAILED";
324     }
325 }