1 package org.sim0mq.demo;
2
3 import java.util.Random;
4 import java.util.concurrent.atomic.AtomicInteger;
5
6 import org.zeromq.ZMQ;
7
8 /**
9 * Example from http://stackoverflow.com/questions/20944140/zeromq-route-req-java-example-does-not-work. Added testing of send
10 * messages for correctly sent messages, plus a monitor that checks whether one of the conversations hangs. The lazy pirate
11 * pattern from the 0mq guide has been added to deal with possible timeouts.
12 * <p>
13 * BSD-style license. See <a href="http://opentrafficsim.org/docs/current/license.html">OpenTrafficSim License</a>.
14 * </p>
15 * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
16 * initial version Apr 20, 2017 <br>
17 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
18 */
19 public class RouterToReqExample
20 {
21 /** random stream. */
22 static Random rand = new Random();
23
24 /** static counter for worker. */
25 static AtomicInteger staticWorkerRecv = new AtomicInteger();
26
27 /** static counter for broker identity. */
28 static AtomicInteger staticBrokerIdRecv = new AtomicInteger();
29
30 /** static counter for broker Message. */
31 static AtomicInteger staticBrokerMsgRecv = new AtomicInteger();
32
33 /** completed. */
34 static AtomicInteger completed = new AtomicInteger();
35
36 /** how many worker threads? */
37 private static final int NBR_WORKERS = 100;
38
39 /** a worker thread... */
40 private static class Worker extends Thread
41 {
42 /** the worker id. */
43 private String workerId;
44
45 /**
46 * Construct a worker.
47 * @param workerId worker id
48 */
49 Worker(final String workerId)
50 {
51 this.workerId = workerId;
52 }
53
54 @Override
55 public void run()
56 {
57 long TIMEOUT = 100; // ms
58 int RETRIES = 3;
59 String ENDPOINT = "tcp://localhost:5671";
60
61 ZMQ.Context context = ZMQ.context(1);
62 ZMQ.Socket worker = context.socket(ZMQ.REQ);
63 worker.setIdentity(this.workerId.getBytes());
64
65 worker.connect(ENDPOINT);
66
67 int total = 0;
68 while (true)
69 {
70 staticWorkerRecv.incrementAndGet();
71 String message = "Hi Boss";
72 String workloadResponse = "";
73 int retriesLeft = RETRIES;
74 boolean ok = false;
75 while (retriesLeft > 0 && !ok)
76 {
77 // Tell the broker we're ready for work
78 if (!worker.send(message))
79 {
80 System.err.println("worker " + this.workerId + " failed to send...");
81 }
82 while (workloadResponse == null || workloadResponse.isEmpty())
83 {
84 ZMQ.Poller poller = context.poller(1);
85 poller.register(worker, ZMQ.Poller.POLLIN);
86 int signalled = poller.poll(TIMEOUT);
87 poller.unregister(worker);
88 if (signalled == 1)
89 {
90 workloadResponse = worker.recvStr();
91 // }
92 //
93 // // Poll socket for a reply, with timeout
94 // ZMQ.PollItem items[] = { new ZMQ.PollItem(worker, ZMQ.Poller.POLLIN) };
95 // int rc = ZMQ.poll(items, TIMEOUT);
96 // if (rc == -1)
97 // {
98 // break; // Interrupted
99 // }
100 //
101 // // Here we process a server reply and exit our loop if the reply is valid. If we didn't a reply
102 // we close
103 // // the client socket and resend the request. We try a number of times before finally abandoning:
104 // if (items[0].isReadable())
105 // {
106 // workloadResponse = worker.recvStr();
107 if (workloadResponse == null)
108 break; // Interrupted
109 if (workloadResponse.equals("Work harder") || workloadResponse.equals("Fired!"))
110 {
111 retriesLeft = RETRIES;
112 ok = true;
113 }
114 else
115 System.err.printf("E: malformed reply from server: %s\n", workloadResponse);
116
117 }
118 else if (--retriesLeft == 0)
119 {
120 System.err.println("E: server seems to be offline, abandoning\n");
121 break;
122 }
123 else
124 {
125 System.err.println("W: no response from server, retrying\n");
126 // Old socket is confused; close it and open a new one
127 worker.close();
128 System.err.println("I: reconnecting to server\n");
129 worker = context.socket(ZMQ.REQ);
130 worker.setIdentity(this.workerId.getBytes());
131 worker.connect(ENDPOINT);
132 // Send message again, on new socket
133 worker.send(message);
134 }
135 }
136 }
137 staticWorkerRecv.decrementAndGet();
138 boolean finished = workloadResponse.equals("Fired!");
139 if (finished)
140 {
141 completed.incrementAndGet();
142 System.out.printf(this.workerId + " completed: %d tasks\n", total);
143 break;
144 }
145 total++;
146
147 // Do some random work
148 try
149 {
150 Thread.sleep(rand.nextInt(10) + 1);
151 }
152 catch (InterruptedException e)
153 {
154 // ignore
155 }
156 }
157
158 worker.close();
159 context.term();
160 }
161 }
162
163 /**
164 * While this example runs in a single process, that is just to make it easier to start and stop the example. Each thread
165 * has its own context and conceptually acts as a separate process.
166 * @param args args, can be empty
167 * @throws Exception on error
168 */
169 public static void main(String[] args) throws Exception
170 {
171 ZMQ.Context context = ZMQ.context(1);
172 ZMQ.Socket broker = context.socket(ZMQ.ROUTER);
173 broker.bind("tcp://*:5671");
174
175 System.out.println("Recv buf size = " + broker.getReceiveBufferSize());
176 System.out.println("Send buf size = " + broker.getSendBufferSize());
177 System.out.println("Recv HWM = " + broker.getRcvHWM());
178 System.out.println("Send HWM = " + broker.getSndHWM());
179
180 // starting all workers
181 for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
182 {
183 Thread worker = new Worker("worker-" + workerNbr);
184 worker.start();
185 }
186
187 // start a monitoring thread of 6 seconds to check hanging program...
188 new Thread()
189 {
190 @Override
191 public void run()
192 {
193 try
194 {
195 Thread.sleep(6000);
196 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
197 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
198 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
199 System.exit(-1);
200 }
201 catch (InterruptedException exception)
202 {
203 // ignore
204 }
205 }
206 }.start();
207
208 // Run for five seconds and then tell workers to end
209 long endTime = System.currentTimeMillis() + 5000;
210 int workersFired = 0;
211 while (true)
212 {
213 // Next message gives us least recently used worker
214 staticBrokerIdRecv.incrementAndGet();
215 String identity = broker.recvStr(); // recvStringWithTimeout(broker, 100, "Work harder");
216 staticBrokerIdRecv.decrementAndGet();
217 if (!broker.sendMore(identity))
218 {
219 System.err.println("broker failed to send identity...");
220 }
221 if (!identity.equals("FAILED"))
222 {
223 staticBrokerMsgRecv.incrementAndGet();
224 broker.recvStr(); // Envelope delimiter
225 broker.recvStr(); // Response from worker
226 staticBrokerMsgRecv.decrementAndGet();
227 if (!broker.sendMore(""))
228 {
229 System.err.println("broker failed to send delimiter...");
230 }
231
232 // Encourage workers until it's time to fire them
233 if (System.currentTimeMillis() < endTime)
234 {
235 if (!broker.send("Work harder"))
236 {
237 System.err.println("broker failed to send work...");
238 }
239 }
240 else
241 {
242 if (!broker.send("Fired!"))
243 {
244 System.err.println("broker failed to send fired...");
245 }
246 if (++workersFired == NBR_WORKERS)
247 {
248 break;
249 }
250 }
251 }
252 }
253
254 try
255 {
256 Thread.sleep(500);
257 }
258 catch (InterruptedException exception)
259 {
260 // ignore
261 }
262
263 System.out.println("completed = " + completed);
264 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
265 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
266 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
267
268 broker.close();
269 context.term();
270 System.exit(0);
271 }
272
273 /**
274 * @param context the context
275 * @param socket the socket
276 * @param timeoutMs timeout in milliseconds
277 * @param resend string to resend if it fails
278 * @return the read string after potential resending of the request or even reconnecting
279 */
280 static String recvStringWithTimeout(final ZMQ.Context context, final ZMQ.Socket socket, final long timeoutMs,
281 final String resend)
282 {
283 for (int i = 0; i < 5; i++)
284 {
285 ZMQ.Poller poller = context.poller(1);
286 poller.register(socket, ZMQ.Poller.POLLIN);
287 int signalled = poller.poll(timeoutMs);
288 poller.unregister(socket);
289 if (signalled == 1)
290 {
291 return socket.recvStr();
292 }
293 System.err.println("RETRY... " + resend);
294 if (!socket.send(resend))
295 {
296 System.err.println("broker failed to resend string");
297 }
298 }
299 System.err.println("FAILED");
300 return "FAILED";
301 }
302 }