1 package org.sim0mq.demo;
2
3 import java.util.Random;
4 import java.util.concurrent.atomic.AtomicInteger;
5
6 import org.zeromq.ZContext;
7 import org.zeromq.ZMQ;
8
9 /**
10 * Example from http://stackoverflow.com/questions/20944140/zeromq-route-req-java-example-does-not-work. Added testing of send
11 * messages for correctly sent messages, plus a monitor that checks whether one of the conversations hangs. The lazy pirate
12 * pattern from the 0mq guide has been added to deal with possible timeouts.
13 * <p>
14 * BSD-style license. See <a href="http://opentrafficsim.org/docs/current/license.html">OpenTrafficSim License</a>.
15 * </p>
16 * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
17 * initial version Apr 20, 2017 <br>
18 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
19 */
20 public final class RouterToReqExample
21 {
22 /** random stream. */
23 @SuppressWarnings("checkstyle:visibilitymodifier")
24 static Random rand = new Random();
25
26 /** static counter for worker. */
27 @SuppressWarnings("checkstyle:visibilitymodifier")
28 static AtomicInteger staticWorkerRecv = new AtomicInteger();
29
30 /** static counter for broker identity. */
31 @SuppressWarnings("checkstyle:visibilitymodifier")
32 static AtomicInteger staticBrokerIdRecv = new AtomicInteger();
33
34 /** static counter for broker Message. */
35 @SuppressWarnings("checkstyle:visibilitymodifier")
36 static AtomicInteger staticBrokerMsgRecv = new AtomicInteger();
37
38 /** completed. */
39 @SuppressWarnings("checkstyle:visibilitymodifier")
40 static AtomicInteger completed = new AtomicInteger();
41
42 /** how many worker threads? */
43 private static final int NBR_WORKERS = 100;
44
45 /** */
46 private RouterToReqExample()
47 {
48 // Utility clss
49 }
50
51 /** a worker thread... */
52 private static class Worker extends Thread
53 {
54 /** the worker id. */
55 private String workerId;
56
57 /**
58 * Construct a worker.
59 * @param workerId worker id
60 */
61 Worker(final String workerId)
62 {
63 this.workerId = workerId;
64 }
65
66 /** */
67 private static final long TIMEOUT = 100; // ms
68
69 /** */
70 private static final int RETRIES = 3;
71
72 @Override
73 public void run()
74 {
75 String endpoint = "tcp://localhost:5671";
76
77 ZContext context = new ZContext(1);
78 ZMQ.Socket worker = context.createSocket(ZMQ.REQ);
79 worker.setIdentity(this.workerId.getBytes());
80
81 worker.connect(endpoint);
82
83 int total = 0;
84 while (true)
85 {
86 staticWorkerRecv.incrementAndGet();
87 String message = "Hi Boss";
88 String workloadResponse = "";
89 int retriesLeft = RETRIES;
90 boolean ok = false;
91 while (retriesLeft > 0 && !ok)
92 {
93 // Tell the broker we're ready for work
94 if (!worker.send(message))
95 {
96 System.err.println("worker " + this.workerId + " failed to send...");
97 }
98 while (workloadResponse == null || workloadResponse.isEmpty())
99 {
100 ZMQ.Poller poller = context.createPoller(1);
101 poller.register(worker, ZMQ.Poller.POLLIN);
102 int signalled = poller.poll(TIMEOUT);
103 poller.unregister(worker);
104 if (signalled == 1)
105 {
106 workloadResponse = worker.recvStr();
107 // }
108 //
109 // // Poll socket for a reply, with timeout
110 // ZMQ.PollItem items[] = { new ZMQ.PollItem(worker, ZMQ.Poller.POLLIN) };
111 // int rc = ZMQ.poll(items, TIMEOUT);
112 // if (rc == -1)
113 // {
114 // break; // Interrupted
115 // }
116 //
117 // // Here we process a server reply and exit our loop if the reply is valid. If we didn't a reply
118 // we close
119 // // the client socket and resend the request. We try a number of times before finally abandoning:
120 // if (items[0].isReadable())
121 // {
122 // workloadResponse = worker.recvStr();
123 if (workloadResponse == null)
124 {
125 break; // Interrupted
126 }
127 if (workloadResponse.equals("Work harder") || workloadResponse.equals("Fired!"))
128 {
129 retriesLeft = RETRIES;
130 ok = true;
131 }
132 else
133 {
134 System.err.printf("E: malformed reply from server: %s\n", workloadResponse);
135 }
136
137 }
138 else if (--retriesLeft == 0)
139 {
140 System.err.println("E: server seems to be offline, abandoning\n");
141 break;
142 }
143 else
144 {
145 System.err.println("W: no response from server, retrying\n");
146 // Old socket is confused; close it and open a new one
147 worker.close();
148 System.err.println("I: reconnecting to server\n");
149 worker = context.createSocket(ZMQ.REQ);
150 worker.setIdentity(this.workerId.getBytes());
151 worker.connect(endpoint);
152 // Send message again, on new socket
153 worker.send(message);
154 }
155 }
156 }
157 staticWorkerRecv.decrementAndGet();
158 boolean finished = workloadResponse.equals("Fired!");
159 if (finished)
160 {
161 completed.incrementAndGet();
162 System.out.printf(this.workerId + " completed: %d tasks\n", total);
163 break;
164 }
165 total++;
166
167 // Do some random work
168 try
169 {
170 Thread.sleep(rand.nextInt(10) + 1);
171 }
172 catch (InterruptedException e)
173 {
174 // ignore
175 }
176 }
177
178 worker.close();
179 context.destroy();
180 context.close();
181 }
182 }
183
184 /**
185 * While this example runs in a single process, that is just to make it easier to start and stop the example. Each thread
186 * has its own context and conceptually acts as a separate process.
187 * @param args args, can be empty
188 * @throws Exception on error
189 */
190 public static void main(final String[] args) throws Exception
191 {
192 ZContext context = new ZContext(1);
193 ZMQ.Socket broker = context.createSocket(ZMQ.ROUTER);
194 broker.bind("tcp://*:5671");
195
196 System.out.println("Recv buf size = " + broker.getReceiveBufferSize());
197 System.out.println("Send buf size = " + broker.getSendBufferSize());
198 System.out.println("Recv HWM = " + broker.getRcvHWM());
199 System.out.println("Send HWM = " + broker.getSndHWM());
200
201 // starting all workers
202 for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
203 {
204 Thread worker = new Worker("worker-" + workerNbr);
205 worker.start();
206 }
207
208 // start a monitoring thread of 6 seconds to check hanging program...
209 new Thread()
210 {
211 @Override
212 public void run()
213 {
214 try
215 {
216 Thread.sleep(6000);
217 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
218 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
219 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
220 System.exit(-1);
221 }
222 catch (InterruptedException exception)
223 {
224 // ignore
225 }
226 }
227 }.start();
228
229 // Run for five seconds and then tell workers to end
230 long endTime = System.currentTimeMillis() + 5000;
231 int workersFired = 0;
232 while (true)
233 {
234 // Next message gives us least recently used worker
235 staticBrokerIdRecv.incrementAndGet();
236 String identity = broker.recvStr(); // recvStringWithTimeout(broker, 100, "Work harder");
237 staticBrokerIdRecv.decrementAndGet();
238 if (!broker.sendMore(identity))
239 {
240 System.err.println("broker failed to send identity...");
241 }
242 if (!identity.equals("FAILED"))
243 {
244 staticBrokerMsgRecv.incrementAndGet();
245 broker.recvStr(); // Envelope delimiter
246 broker.recvStr(); // Response from worker
247 staticBrokerMsgRecv.decrementAndGet();
248 if (!broker.sendMore(""))
249 {
250 System.err.println("broker failed to send delimiter...");
251 }
252
253 // Encourage workers until it's time to fire them
254 if (System.currentTimeMillis() < endTime)
255 {
256 if (!broker.send("Work harder"))
257 {
258 System.err.println("broker failed to send work...");
259 }
260 }
261 else
262 {
263 if (!broker.send("Fired!"))
264 {
265 System.err.println("broker failed to send fired...");
266 }
267 if (++workersFired == NBR_WORKERS)
268 {
269 break;
270 }
271 }
272 }
273 }
274
275 try
276 {
277 Thread.sleep(500);
278 }
279 catch (InterruptedException exception)
280 {
281 // ignore
282 }
283
284 System.out.println("completed = " + completed);
285 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
286 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
287 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
288
289 broker.close();
290 context.destroy();
291 context.close();
292 System.exit(0);
293 }
294
295 /**
296 * @param context the context
297 * @param socket the socket
298 * @param timeoutMs timeout in milliseconds
299 * @param resend string to resend if it fails
300 * @return the read string after potential resending of the request or even reconnecting
301 */
302 static String recvStringWithTimeout(final ZContext context, final ZMQ.Socket socket, final long timeoutMs,
303 final String resend)
304 {
305 for (int i = 0; i < 5; i++)
306 {
307 ZMQ.Poller poller = context.createPoller(1);
308 poller.register(socket, ZMQ.Poller.POLLIN);
309 int signalled = poller.poll(timeoutMs);
310 poller.unregister(socket);
311 if (signalled == 1)
312 {
313 return socket.recvStr();
314 }
315 System.err.println("RETRY... " + resend);
316 if (!socket.send(resend))
317 {
318 System.err.println("broker failed to resend string");
319 }
320 }
321 System.err.println("FAILED");
322 return "FAILED";
323 }
324 }