1 package org.sim0mq.demo;
2
3 import java.util.Random;
4 import java.util.concurrent.atomic.AtomicInteger;
5
6 import org.zeromq.SocketType;
7 import org.zeromq.ZContext;
8 import org.zeromq.ZMQ;
9
10
11
12
13
14
15
16
17
18
19
20
21 public final class RouterToReqExample
22 {
23
24 @SuppressWarnings("checkstyle:visibilitymodifier")
25 static Random rand = new Random();
26
27
28 @SuppressWarnings("checkstyle:visibilitymodifier")
29 static AtomicInteger staticWorkerRecv = new AtomicInteger();
30
31
32 @SuppressWarnings("checkstyle:visibilitymodifier")
33 static AtomicInteger staticBrokerIdRecv = new AtomicInteger();
34
35
36 @SuppressWarnings("checkstyle:visibilitymodifier")
37 static AtomicInteger staticBrokerMsgRecv = new AtomicInteger();
38
39
40 @SuppressWarnings("checkstyle:visibilitymodifier")
41 static AtomicInteger completed = new AtomicInteger();
42
43
44 private static final int NBR_WORKERS = 100;
45
46
47 private RouterToReqExample()
48 {
49
50 }
51
52
53 private static class Worker extends Thread
54 {
55
56 private String workerId;
57
58
59
60
61
62 Worker(final String workerId)
63 {
64 this.workerId = workerId;
65 }
66
67
68 private static final long TIMEOUT = 100;
69
70
71 private static final int RETRIES = 3;
72
73 @Override
74 public void run()
75 {
76 String endpoint = "tcp://localhost:5671";
77
78 ZContext context = new ZContext(1);
79 ZMQ.Socket worker = context.createSocket(SocketType.REQ);
80 worker.setIdentity(this.workerId.getBytes());
81
82 worker.connect(endpoint);
83
84 int total = 0;
85 while (true)
86 {
87 staticWorkerRecv.incrementAndGet();
88 String message = "Hi Boss";
89 String workloadResponse = "";
90 int retriesLeft = RETRIES;
91 boolean ok = false;
92 while (retriesLeft > 0 && !ok)
93 {
94
95 if (!worker.send(message))
96 {
97 System.err.println("worker " + this.workerId + " failed to send...");
98 }
99 while (workloadResponse == null || workloadResponse.isEmpty())
100 {
101 ZMQ.Poller poller = context.createPoller(1);
102 poller.register(worker, ZMQ.Poller.POLLIN);
103 int signalled = poller.poll(TIMEOUT);
104 poller.unregister(worker);
105 if (signalled == 1)
106 {
107 workloadResponse = worker.recvStr();
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 if (workloadResponse == null)
125 {
126 break;
127 }
128 if (workloadResponse.equals("Work harder") || workloadResponse.equals("Fired!"))
129 {
130 retriesLeft = RETRIES;
131 ok = true;
132 }
133 else
134 {
135 System.err.printf("E: malformed reply from server: %s\n", workloadResponse);
136 }
137
138 }
139 else if (--retriesLeft == 0)
140 {
141 System.err.println("E: server seems to be offline, abandoning\n");
142 break;
143 }
144 else
145 {
146 System.err.println("W: no response from server, retrying\n");
147
148 worker.close();
149 System.err.println("I: reconnecting to server\n");
150 worker = context.createSocket(SocketType.REQ);
151 worker.setIdentity(this.workerId.getBytes());
152 worker.connect(endpoint);
153
154 worker.send(message);
155 }
156 }
157 }
158 staticWorkerRecv.decrementAndGet();
159 boolean finished = workloadResponse.equals("Fired!");
160 if (finished)
161 {
162 completed.incrementAndGet();
163 System.out.printf(this.workerId + " completed: %d tasks\n", total);
164 break;
165 }
166 total++;
167
168
169 try
170 {
171 Thread.sleep(rand.nextInt(10) + 1);
172 }
173 catch (InterruptedException e)
174 {
175
176 }
177 }
178
179 worker.close();
180 context.destroy();
181 context.close();
182 }
183 }
184
185
186
187
188
189
190
191 public static void main(final String[] args) throws Exception
192 {
193 ZContext context = new ZContext(1);
194 ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
195 broker.bind("tcp://*:5671");
196
197 System.out.println("Recv buf size = " + broker.getReceiveBufferSize());
198 System.out.println("Send buf size = " + broker.getSendBufferSize());
199 System.out.println("Recv HWM = " + broker.getRcvHWM());
200 System.out.println("Send HWM = " + broker.getSndHWM());
201
202
203 for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
204 {
205 Thread worker = new Worker("worker-" + workerNbr);
206 worker.start();
207 }
208
209
210 new Thread()
211 {
212 @Override
213 public void run()
214 {
215 try
216 {
217 Thread.sleep(6000);
218 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
219 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
220 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
221 System.exit(-1);
222 }
223 catch (InterruptedException exception)
224 {
225
226 }
227 }
228 }.start();
229
230
231 long endTime = System.currentTimeMillis() + 5000;
232 int workersFired = 0;
233 while (true)
234 {
235
236 staticBrokerIdRecv.incrementAndGet();
237 String identity = broker.recvStr();
238 staticBrokerIdRecv.decrementAndGet();
239 if (!broker.sendMore(identity))
240 {
241 System.err.println("broker failed to send identity...");
242 }
243 if (!identity.equals("FAILED"))
244 {
245 staticBrokerMsgRecv.incrementAndGet();
246 broker.recvStr();
247 broker.recvStr();
248 staticBrokerMsgRecv.decrementAndGet();
249 if (!broker.sendMore(""))
250 {
251 System.err.println("broker failed to send delimiter...");
252 }
253
254
255 if (System.currentTimeMillis() < endTime)
256 {
257 if (!broker.send("Work harder"))
258 {
259 System.err.println("broker failed to send work...");
260 }
261 }
262 else
263 {
264 if (!broker.send("Fired!"))
265 {
266 System.err.println("broker failed to send fired...");
267 }
268 if (++workersFired == NBR_WORKERS)
269 {
270 break;
271 }
272 }
273 }
274 }
275
276 try
277 {
278 Thread.sleep(500);
279 }
280 catch (InterruptedException exception)
281 {
282
283 }
284
285 System.out.println("completed = " + completed);
286 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
287 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
288 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
289
290 broker.close();
291 context.destroy();
292 context.close();
293 System.exit(0);
294 }
295
296
297
298
299
300
301
302
303 static String recvStringWithTimeout(final ZContext context, final ZMQ.Socket socket, final long timeoutMs,
304 final String resend)
305 {
306 for (int i = 0; i < 5; i++)
307 {
308 ZMQ.Poller poller = context.createPoller(1);
309 poller.register(socket, ZMQ.Poller.POLLIN);
310 int signalled = poller.poll(timeoutMs);
311 poller.unregister(socket);
312 if (signalled == 1)
313 {
314 return socket.recvStr();
315 }
316 System.err.println("RETRY... " + resend);
317 if (!socket.send(resend))
318 {
319 System.err.println("broker failed to resend string");
320 }
321 }
322 System.err.println("FAILED");
323 return "FAILED";
324 }
325 }