View Javadoc
1   package org.sim0mq.demo;
2   
3   import java.util.Random;
4   import java.util.concurrent.atomic.AtomicInteger;
5   
6   import org.zeromq.ZMQ;
7   
8   /**
9    * Example from http://stackoverflow.com/questions/20944140/zeromq-route-req-java-example-does-not-work. Added testing of send
10   * messages for correctly sent messages, plus a monitor that checks whether one of the conversations hangs. The lazy pirate
11   * pattern from the 0mq guide has been added to deal with possible timeouts.
12   * <p>
13   * BSD-style license. See <a href="http://opentrafficsim.org/docs/current/license.html">OpenTrafficSim License</a>.
14   * </p>
15   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
16   * initial version Apr 20, 2017 <br>
17   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
18   */
19  public class RouterToReqExample
20  {
21      /** random stream. */
22      static Random rand = new Random();
23  
24      /** static counter for worker. */
25      static AtomicInteger staticWorkerRecv = new AtomicInteger();
26  
27      /** static counter for broker identity. */
28      static AtomicInteger staticBrokerIdRecv = new AtomicInteger();
29  
30      /** static counter for broker Message. */
31      static AtomicInteger staticBrokerMsgRecv = new AtomicInteger();
32  
33      /** completed. */
34      static AtomicInteger completed = new AtomicInteger();
35  
36      /** how many worker threads? */
37      private static final int NBR_WORKERS = 100;
38  
39      /** a worker thread... */
40      private static class Worker extends Thread
41      {
42          /** the worker id. */
43          private String workerId;
44  
45          /**
46           * Construct a worker.
47           * @param workerId worker id
48           */
49          Worker(final String workerId)
50          {
51              this.workerId = workerId;
52          }
53  
54          @Override
55          public void run()
56          {
57              long TIMEOUT = 100; // ms
58              int RETRIES = 3;
59              String ENDPOINT = "tcp://localhost:5671";
60  
61              ZMQ.Context context = ZMQ.context(1);
62              ZMQ.Socket worker = context.socket(ZMQ.REQ);
63              worker.setIdentity(this.workerId.getBytes());
64  
65              worker.connect(ENDPOINT);
66  
67              int total = 0;
68              while (true)
69              {
70                  staticWorkerRecv.incrementAndGet();
71                  String message = "Hi Boss";
72                  String workloadResponse = "";
73                  int retriesLeft = RETRIES;
74                  boolean ok = false;
75                  while (retriesLeft > 0 && !ok)
76                  {
77                      // Tell the broker we're ready for work
78                      if (!worker.send(message))
79                      {
80                          System.err.println("worker " + this.workerId + " failed to send...");
81                      }
82                      while (workloadResponse == null || workloadResponse.isEmpty())
83                      {
84                          ZMQ.Poller poller = context.poller(1);
85                          poller.register(worker, ZMQ.Poller.POLLIN);
86                          int signalled = poller.poll(TIMEOUT);
87                          poller.unregister(worker);
88                          if (signalled == 1)
89                          {
90                              workloadResponse = worker.recvStr();
91                              // }
92                              //
93                              // // Poll socket for a reply, with timeout
94                              // ZMQ.PollItem items[] = { new ZMQ.PollItem(worker, ZMQ.Poller.POLLIN) };
95                              // int rc = ZMQ.poll(items, TIMEOUT);
96                              // if (rc == -1)
97                              // {
98                              // break; // Interrupted
99                              // }
100                             //
101                             // // Here we process a server reply and exit our loop if the reply is valid. If we didn't a reply
102                             // we close
103                             // // the client socket and resend the request. We try a number of times before finally abandoning:
104                             // if (items[0].isReadable())
105                             // {
106                             // workloadResponse = worker.recvStr();
107                             if (workloadResponse == null)
108                                 break; // Interrupted
109                             if (workloadResponse.equals("Work harder") || workloadResponse.equals("Fired!"))
110                             {
111                                 retriesLeft = RETRIES;
112                                 ok = true;
113                             }
114                             else
115                                 System.err.printf("E: malformed reply from server: %s\n", workloadResponse);
116 
117                         }
118                         else if (--retriesLeft == 0)
119                         {
120                             System.err.println("E: server seems to be offline, abandoning\n");
121                             break;
122                         }
123                         else
124                         {
125                             System.err.println("W: no response from server, retrying\n");
126                             // Old socket is confused; close it and open a new one
127                             worker.close();
128                             System.err.println("I: reconnecting to server\n");
129                             worker = context.socket(ZMQ.REQ);
130                             worker.setIdentity(this.workerId.getBytes());
131                             worker.connect(ENDPOINT);
132                             // Send message again, on new socket
133                             worker.send(message);
134                         }
135                     }
136                 }
137                 staticWorkerRecv.decrementAndGet();
138                 boolean finished = workloadResponse.equals("Fired!");
139                 if (finished)
140                 {
141                     completed.incrementAndGet();
142                     System.out.printf(this.workerId + " completed: %d tasks\n", total);
143                     break;
144                 }
145                 total++;
146 
147                 // Do some random work
148                 try
149                 {
150                     Thread.sleep(rand.nextInt(10) + 1);
151                 }
152                 catch (InterruptedException e)
153                 {
154                     // ignore
155                 }
156             }
157 
158             worker.close();
159             context.term();
160         }
161     }
162 
163     /**
164      * While this example runs in a single process, that is just to make it easier to start and stop the example. Each thread
165      * has its own context and conceptually acts as a separate process.
166      * @param args args, can be empty
167      * @throws Exception on error
168      */
169     public static void main(String[] args) throws Exception
170     {
171         ZMQ.Context context = ZMQ.context(1);
172         ZMQ.Socket broker = context.socket(ZMQ.ROUTER);
173         broker.bind("tcp://*:5671");
174 
175         System.out.println("Recv buf size = " + broker.getReceiveBufferSize());
176         System.out.println("Send buf size = " + broker.getSendBufferSize());
177         System.out.println("Recv HWM      = " + broker.getRcvHWM());
178         System.out.println("Send HWM      = " + broker.getSndHWM());
179 
180         // starting all workers
181         for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
182         {
183             Thread worker = new Worker("worker-" + workerNbr);
184             worker.start();
185         }
186 
187         // start a monitoring thread of 6 seconds to check hanging program...
188         new Thread()
189         {
190             @Override
191             public void run()
192             {
193                 try
194                 {
195                     Thread.sleep(6000);
196                     System.err.println("staticWorkerRecv    = " + staticWorkerRecv);
197                     System.err.println("staticBrokerIdRecv  = " + staticBrokerIdRecv);
198                     System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
199                     System.exit(-1);
200                 }
201                 catch (InterruptedException exception)
202                 {
203                     // ignore
204                 }
205             }
206         }.start();
207 
208         // Run for five seconds and then tell workers to end
209         long endTime = System.currentTimeMillis() + 5000;
210         int workersFired = 0;
211         while (true)
212         {
213             // Next message gives us least recently used worker
214             staticBrokerIdRecv.incrementAndGet();
215             String identity = broker.recvStr(); // recvStringWithTimeout(broker, 100, "Work harder");
216             staticBrokerIdRecv.decrementAndGet();
217             if (!broker.sendMore(identity))
218             {
219                 System.err.println("broker failed to send identity...");
220             }
221             if (!identity.equals("FAILED"))
222             {
223                 staticBrokerMsgRecv.incrementAndGet();
224                 broker.recvStr(); // Envelope delimiter
225                 broker.recvStr(); // Response from worker
226                 staticBrokerMsgRecv.decrementAndGet();
227                 if (!broker.sendMore(""))
228                 {
229                     System.err.println("broker failed to send delimiter...");
230                 }
231 
232                 // Encourage workers until it's time to fire them
233                 if (System.currentTimeMillis() < endTime)
234                 {
235                     if (!broker.send("Work harder"))
236                     {
237                         System.err.println("broker failed to send work...");
238                     }
239                 }
240                 else
241                 {
242                     if (!broker.send("Fired!"))
243                     {
244                         System.err.println("broker failed to send fired...");
245                     }
246                     if (++workersFired == NBR_WORKERS)
247                     {
248                         break;
249                     }
250                 }
251             }
252         }
253 
254         try
255         {
256             Thread.sleep(500);
257         }
258         catch (InterruptedException exception)
259         {
260             // ignore
261         }
262 
263         System.out.println("completed = " + completed);
264         System.err.println("staticWorkerRecv    = " + staticWorkerRecv);
265         System.err.println("staticBrokerIdRecv  = " + staticBrokerIdRecv);
266         System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
267 
268         broker.close();
269         context.term();
270         System.exit(0);
271     }
272 
273     /**
274      * @param context the context
275      * @param socket the socket
276      * @param timeoutMs timeout in milliseconds
277      * @param resend string to resend if it fails
278      * @return the read string after potential resending of the request or even reconnecting
279      */
280     static String recvStringWithTimeout(final ZMQ.Context context, final ZMQ.Socket socket, final long timeoutMs,
281             final String resend)
282     {
283         for (int i = 0; i < 5; i++)
284         {
285             ZMQ.Poller poller = context.poller(1);
286             poller.register(socket, ZMQ.Poller.POLLIN);
287             int signalled = poller.poll(timeoutMs);
288             poller.unregister(socket);
289             if (signalled == 1)
290             {
291                 return socket.recvStr();
292             }
293             System.err.println("RETRY... " + resend);
294             if (!socket.send(resend))
295             {
296                 System.err.println("broker failed to resend string");
297             }
298         }
299         System.err.println("FAILED");
300         return "FAILED";
301     }
302 }