1 package org.sim0mq.test.dealerrouter;
2
3 import java.util.Random;
4
5 import org.zeromq.SocketType;
6 import org.zeromq.ZContext;
7 import org.zeromq.ZFrame;
8 import org.zeromq.ZMQ;
9 import org.zeromq.ZMQ.Poller;
10 import org.zeromq.ZMQ.Socket;
11 import org.zeromq.ZMsg;
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 public final class DealerRouter
28 {
29
30
31 private DealerRouter()
32 {
33
34 }
35
36
37 @SuppressWarnings("checkstyle:visibilitymodifier")
38 static Random rand = new Random(System.nanoTime());
39
40
41
42
43
44 static class ClientTask implements Runnable
45 {
46
47 @Override
48 public void run()
49 {
50 try (ZContext ctx = new ZContext())
51 {
52 Socket client = ctx.createSocket(SocketType.DEALER);
53
54
55 String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt());
56 client.setIdentity(identity.getBytes(ZMQ.CHARSET));
57 client.connect("tcp://localhost:5570");
58
59 Poller poller = ctx.createPoller(1);
60 poller.register(client, Poller.POLLIN);
61
62 int requestNr = 0;
63 while (!Thread.currentThread().isInterrupted())
64 {
65
66 for (int centitick = 0; centitick < 100; centitick++)
67 {
68 poller.poll(10);
69 if (poller.pollin(0))
70 {
71 ZMsg msg = ZMsg.recvMsg(client);
72 msg.getLast().print(identity);
73 msg.destroy();
74 }
75 }
76 client.send(String.format("request #%d", ++requestNr), 0);
77 }
78 }
79 }
80 }
81
82
83
84
85
86 static class ServerTask implements Runnable
87 {
88
89 @Override
90 public void run()
91 {
92 try (ZContext ctx = new ZContext())
93 {
94
95 Socket frontend = ctx.createSocket(SocketType.ROUTER);
96 frontend.bind("tcp://*:5570");
97
98
99 Socket backend = ctx.createSocket(SocketType.DEALER);
100 backend.bind("inproc://backend");
101
102
103 for (int threadNbr = 0; threadNbr < 5; threadNbr++)
104 {
105 new Thread(new ServerWorker(ctx)).start();
106 }
107
108
109 ZMQ.proxy(frontend, backend, null);
110 }
111 }
112 }
113
114
115
116
117
118 private static class ServerWorker implements Runnable
119 {
120
121 private ZContext ctx;
122
123
124
125
126
127 ServerWorker(final ZContext ctx)
128 {
129 this.ctx = ctx;
130 }
131
132
133 @Override
134 public void run()
135 {
136 Socket worker = this.ctx.createSocket(SocketType.DEALER);
137 worker.connect("inproc://backend");
138
139 while (!Thread.currentThread().isInterrupted())
140 {
141
142 ZMsg msg = ZMsg.recvMsg(worker);
143 ZFrame address = msg.pop();
144 ZFrame content = msg.pop();
145 assert (content != null);
146 msg.destroy();
147
148
149 int replies = rand.nextInt(5);
150 for (int reply = 0; reply < replies; reply++)
151 {
152
153 try
154 {
155 Thread.sleep(rand.nextInt(1000) + 1);
156 }
157 catch (InterruptedException e)
158 {
159
160 }
161 address.send(worker, ZFrame.REUSE + ZFrame.MORE);
162 content.send(worker, ZFrame.REUSE);
163 }
164 address.destroy();
165 content.destroy();
166 }
167 this.ctx.destroy();
168 }
169 }
170
171
172
173
174
175
176 public static void main(final String[] args) throws Exception
177 {
178 new Thread(new ClientTask()).start();
179 new Thread(new ClientTask()).start();
180 new Thread(new ClientTask()).start();
181 new Thread(new ServerTask()).start();
182
183
184 Thread.sleep(20 * 1000);
185
186 System.exit(0);
187 }
188 }