1 package org.sim0mq.demo;
2
3 import java.util.Random;
4 import java.util.concurrent.atomic.AtomicInteger;
5
6 import org.zeromq.ZMQ;
7
8
9
10
11
12
13
14
15
16
17
18
19 public class RouterToReqExample
20 {
21
22 static Random rand = new Random();
23
24
25 static AtomicInteger staticWorkerRecv = new AtomicInteger();
26
27
28 static AtomicInteger staticBrokerIdRecv = new AtomicInteger();
29
30
31 static AtomicInteger staticBrokerMsgRecv = new AtomicInteger();
32
33
34 static AtomicInteger completed = new AtomicInteger();
35
36
37 private static final int NBR_WORKERS = 100;
38
39
40 private static class Worker extends Thread
41 {
42
43 private String workerId;
44
45
46
47
48
49 Worker(final String workerId)
50 {
51 this.workerId = workerId;
52 }
53
54 @Override
55 public void run()
56 {
57 long TIMEOUT = 100;
58 int RETRIES = 3;
59 String ENDPOINT = "tcp://localhost:5671";
60
61 ZMQ.Context context = ZMQ.context(1);
62 ZMQ.Socket worker = context.socket(ZMQ.REQ);
63 worker.setIdentity(this.workerId.getBytes());
64
65 worker.connect(ENDPOINT);
66
67 int total = 0;
68 while (true)
69 {
70 staticWorkerRecv.incrementAndGet();
71 String message = "Hi Boss";
72 String workloadResponse = "";
73 int retriesLeft = RETRIES;
74 boolean ok = false;
75 while (retriesLeft > 0 && !ok)
76 {
77
78 if (!worker.send(message))
79 {
80 System.err.println("worker " + this.workerId + " failed to send...");
81 }
82 while (workloadResponse == null || workloadResponse.isEmpty())
83 {
84 ZMQ.Poller poller = context.poller(1);
85 poller.register(worker, ZMQ.Poller.POLLIN);
86 int signalled = poller.poll(TIMEOUT);
87 poller.unregister(worker);
88 if (signalled == 1)
89 {
90 workloadResponse = worker.recvStr();
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 if (workloadResponse == null)
108 break;
109 if (workloadResponse.equals("Work harder") || workloadResponse.equals("Fired!"))
110 {
111 retriesLeft = RETRIES;
112 ok = true;
113 }
114 else
115 System.err.printf("E: malformed reply from server: %s\n", workloadResponse);
116
117 }
118 else if (--retriesLeft == 0)
119 {
120 System.err.println("E: server seems to be offline, abandoning\n");
121 break;
122 }
123 else
124 {
125 System.err.println("W: no response from server, retrying\n");
126
127 worker.close();
128 System.err.println("I: reconnecting to server\n");
129 worker = context.socket(ZMQ.REQ);
130 worker.setIdentity(this.workerId.getBytes());
131 worker.connect(ENDPOINT);
132
133 worker.send(message);
134 }
135 }
136 }
137 staticWorkerRecv.decrementAndGet();
138 boolean finished = workloadResponse.equals("Fired!");
139 if (finished)
140 {
141 completed.incrementAndGet();
142 System.out.printf(this.workerId + " completed: %d tasks\n", total);
143 break;
144 }
145 total++;
146
147
148 try
149 {
150 Thread.sleep(rand.nextInt(10) + 1);
151 }
152 catch (InterruptedException e)
153 {
154
155 }
156 }
157
158 worker.close();
159 context.term();
160 }
161 }
162
163
164
165
166
167
168
169 public static void main(String[] args) throws Exception
170 {
171 ZMQ.Context context = ZMQ.context(1);
172 ZMQ.Socket broker = context.socket(ZMQ.ROUTER);
173 broker.bind("tcp://*:5671");
174
175 System.out.println("Recv buf size = " + broker.getReceiveBufferSize());
176 System.out.println("Send buf size = " + broker.getSendBufferSize());
177 System.out.println("Recv HWM = " + broker.getRcvHWM());
178 System.out.println("Send HWM = " + broker.getSndHWM());
179
180
181 for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
182 {
183 Thread worker = new Worker("worker-" + workerNbr);
184 worker.start();
185 }
186
187
188 new Thread()
189 {
190 @Override
191 public void run()
192 {
193 try
194 {
195 Thread.sleep(6000);
196 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
197 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
198 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
199 System.exit(-1);
200 }
201 catch (InterruptedException exception)
202 {
203
204 }
205 }
206 }.start();
207
208
209 long endTime = System.currentTimeMillis() + 5000;
210 int workersFired = 0;
211 while (true)
212 {
213
214 staticBrokerIdRecv.incrementAndGet();
215 String identity = broker.recvStr();
216 staticBrokerIdRecv.decrementAndGet();
217 if (!broker.sendMore(identity))
218 {
219 System.err.println("broker failed to send identity...");
220 }
221 if (!identity.equals("FAILED"))
222 {
223 staticBrokerMsgRecv.incrementAndGet();
224 broker.recvStr();
225 broker.recvStr();
226 staticBrokerMsgRecv.decrementAndGet();
227 if (!broker.sendMore(""))
228 {
229 System.err.println("broker failed to send delimiter...");
230 }
231
232
233 if (System.currentTimeMillis() < endTime)
234 {
235 if (!broker.send("Work harder"))
236 {
237 System.err.println("broker failed to send work...");
238 }
239 }
240 else
241 {
242 if (!broker.send("Fired!"))
243 {
244 System.err.println("broker failed to send fired...");
245 }
246 if (++workersFired == NBR_WORKERS)
247 {
248 break;
249 }
250 }
251 }
252 }
253
254 try
255 {
256 Thread.sleep(500);
257 }
258 catch (InterruptedException exception)
259 {
260
261 }
262
263 System.out.println("completed = " + completed);
264 System.err.println("staticWorkerRecv = " + staticWorkerRecv);
265 System.err.println("staticBrokerIdRecv = " + staticBrokerIdRecv);
266 System.err.println("staticBrokerMsgRecv = " + staticBrokerMsgRecv);
267
268 broker.close();
269 context.term();
270 System.exit(0);
271 }
272
273
274
275
276
277
278
279
280 static String recvStringWithTimeout(final ZMQ.Context context, final ZMQ.Socket socket, final long timeoutMs,
281 final String resend)
282 {
283 for (int i = 0; i < 5; i++)
284 {
285 ZMQ.Poller poller = context.poller(1);
286 poller.register(socket, ZMQ.Poller.POLLIN);
287 int signalled = poller.poll(timeoutMs);
288 poller.unregister(socket);
289 if (signalled == 1)
290 {
291 return socket.recvStr();
292 }
293 System.err.println("RETRY... " + resend);
294 if (!socket.send(resend))
295 {
296 System.err.println("broker failed to resend string");
297 }
298 }
299 System.err.println("FAILED");
300 return "FAILED";
301 }
302 }