1 package org.sim0mq.demo;
2
3 import java.util.Random;
4 import java.util.concurrent.atomic.AtomicInteger;
5
6 import org.zeromq.ZContext;
7 import org.zeromq.ZMQ;
8
9
10
11
12
13
14
15
16
17
18
19
20 public final class RouterToReqExample
21 {
22
23 @SuppressWarnings("checkstyle:visibilitymodifier")
24 static Random rand = new Random();
25
26
27 @SuppressWarnings("checkstyle:visibilitymodifier")
28 static AtomicInteger staticWorkerRecv = new AtomicInteger();
29
30
31 @SuppressWarnings("checkstyle:visibilitymodifier")
32 static AtomicInteger staticBrokerIdRecv = new AtomicInteger();
33
34
35 @SuppressWarnings("checkstyle:visibilitymodifier")
36 static AtomicInteger staticBrokerMsgRecv = new AtomicInteger();
37
38
39 @SuppressWarnings("checkstyle:visibilitymodifier")
40 static AtomicInteger completed = new AtomicInteger();
41
42
43 private static final int NBR_WORKERS = 100;
44
45
46 private RouterToReqExample()
47 {
48
49 }
50
51
52 private static class Worker extends Thread
53 {
54
55 private String workerId;
56
57
58
59
60
61 Worker(final String workerId)
62 {
63 this.workerId = workerId;
64 }
65
66
67 private static final long TIMEOUT = 100;
68
69
70 private static final int RETRIES = 3;
71
72 @Override
73 public void run()
74 {
75 String endpoint = "tcp://localhost:5671";
76
77 ZContext context = new ZContext(1);
78 ZMQ.Socket worker = context.createSocket(ZMQ.REQ);
79 worker.setIdentity(this.workerId.getBytes());
80
81 worker.connect(endpoint);
82
83 int total = 0;
84 while (true)
85 {
86 staticWorkerRecv.incrementAndGet();
87 String message = "Hi Boss";
88 String workloadResponse = "";
89 int retriesLeft = RETRIES;
90 boolean ok = false;
91 while (retriesLeft > 0 && !ok)
92 {
93
94 if (!worker.send(message))
95 {
96 System.err.println("worker " + this.workerId + " failed to send...");
97 }
98 while (workloadResponse == null || workloadResponse.isEmpty())
99 {
100 ZMQ.Poller poller = context.createPoller(1);
101 poller.register(worker, ZMQ.Poller.POLLIN);
102 int signalled = poller.poll(TIMEOUT);
103 poller.unregister(worker);
104 if (signalled == 1)
105 {
106 workloadResponse = worker.recvStr();
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 if (workloadResponse == null)
124 {
125 break;
126 }
127 if (workloadResponse.equals("Work harder") || workloadResponse.equals("Fired!"))
128 {
129 retriesLeft = RETRIES;
130 ok = true;
131 }
132 else
133 {
134 System.err.printf("E: malformed reply from server: %s\n", workloadResponse);
135 }
136
137 }
138 else if (--retriesLeft == 0)
139 {
140 System.err.println("E: server seems to be offline, abandoning\n");
141 break;
142 }
143 else
144 {
145 System.err.println("W: no response from server, retrying\n");
146
147 worker.close();
148 System.err.println("I: reconnecting to server\n");
149 worker = context.createSocket(ZMQ.REQ);
150 worker.setIdentity(this.workerId.getBytes());
151 worker.connect(endpoint);
152
153 worker.send(message);
154 }
155 }
156 }
157 staticWorkerRecv.decrementAndGet();
158 boolean finished = workloadResponse.equals("Fired!");
159 if (finished)
160 {
161 completed.incrementAndGet();
162 System.out.printf(this.workerId + " completed: %d tasks\n", total);
163 break;
164 }
165 total++;
166
167
168 try
169 {
170 Thread.sleep(rand.nextInt(10) + 1);
171 }
172 catch (InterruptedException e)
173 {
174
175 }
176 }
177
178 worker.close();
179 context.destroy();
180 context.close();
181 }
182 }
183
184
185
186
187
188
189
190 public static void main(final String[] args) throws Exception
191 {
192 ZContext context = new ZContext(1);
193 ZMQ.Socket broker = context.createSocket(ZMQ.ROUTER);
194 broker.bind("tcp://*:5671");
195
196 System.out.println("Recv buf size = " + broker.getReceiveBufferSize());
197 System.out.println("Send buf size = " + broker.getSendBufferSize());
198 System.out.println("Recv HWM = " + broker.getRcvHWM());
199 System.out.println("Send HWM = " + broker.getSndHWM());
200
201
202 for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
203 {
204 Thread worker = new Worker("worker-" + workerNbr);
205 worker.start();
206 }
207
208
209 new Thread()
210 {
211 @Override
212 public void run()
213 {
214 try
215 {
216 Thread.sleep(6000);
217 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
218 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
219 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
220 System.exit(-1);
221 }
222 catch (InterruptedException exception)
223 {
224
225 }
226 }
227 }.start();
228
229
230 long endTime = System.currentTimeMillis() + 5000;
231 int workersFired = 0;
232 while (true)
233 {
234
235 staticBrokerIdRecv.incrementAndGet();
236 String identity = broker.recvStr();
237 staticBrokerIdRecv.decrementAndGet();
238 if (!broker.sendMore(identity))
239 {
240 System.err.println("broker failed to send identity...");
241 }
242 if (!identity.equals("FAILED"))
243 {
244 staticBrokerMsgRecv.incrementAndGet();
245 broker.recvStr();
246 broker.recvStr();
247 staticBrokerMsgRecv.decrementAndGet();
248 if (!broker.sendMore(""))
249 {
250 System.err.println("broker failed to send delimiter...");
251 }
252
253
254 if (System.currentTimeMillis() < endTime)
255 {
256 if (!broker.send("Work harder"))
257 {
258 System.err.println("broker failed to send work...");
259 }
260 }
261 else
262 {
263 if (!broker.send("Fired!"))
264 {
265 System.err.println("broker failed to send fired...");
266 }
267 if (++workersFired == NBR_WORKERS)
268 {
269 break;
270 }
271 }
272 }
273 }
274
275 try
276 {
277 Thread.sleep(500);
278 }
279 catch (InterruptedException exception)
280 {
281
282 }
283
284 System.out.println("completed = " + completed);
285 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
286 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
287 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
288
289 broker.close();
290 context.destroy();
291 context.close();
292 System.exit(0);
293 }
294
295
296
297
298
299
300
301
302 static String recvStringWithTimeout(final ZContext context, final ZMQ.Socket socket, final long timeoutMs,
303 final String resend)
304 {
305 for (int i = 0; i < 5; i++)
306 {
307 ZMQ.Poller poller = context.createPoller(1);
308 poller.register(socket, ZMQ.Poller.POLLIN);
309 int signalled = poller.poll(timeoutMs);
310 poller.unregister(socket);
311 if (signalled == 1)
312 {
313 return socket.recvStr();
314 }
315 System.err.println("RETRY... " + resend);
316 if (!socket.send(resend))
317 {
318 System.err.println("broker failed to resend string");
319 }
320 }
321 System.err.println("FAILED");
322 return "FAILED";
323 }
324 }