1 package org.sim0mq.test.dealerrouter;
2
3 import java.util.Random;
4
5 import org.zeromq.SocketType;
6 import org.zeromq.ZContext;
7 import org.zeromq.ZFrame;
8 import org.zeromq.ZMQ;
9 import org.zeromq.ZMQ.Poller;
10 import org.zeromq.ZMQ.Socket;
11 import org.zeromq.ZMsg;
12
13 /**
14 * 0MQ DEALER-ROUTER example. Based on <a href="http://zguide.zeromq.org/java:asyncsrv">
15 * http://zguide.zeromq.org/java:asyncsrv</a> or in the JeroMQ example at
16 * <a href= "https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/asyncsrv.java">
17 * https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/asyncsrv.java</a>.<br>
18 * In the 0MQ manual this example can be found at
19 * <a href="http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern">
20 * http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern</a>.
21 * <p>
22 * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
23 * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
24 * </p>
25 * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
26 */
27 public final class DealerRouter
28 {
29
30 /** */
31 private DealerRouter()
32 {
33 // utility class
34 }
35
36 /** random stream. */
37 @SuppressWarnings("checkstyle:visibilitymodifier")
38 static Random rand = new Random(System.nanoTime());
39
40 /**
41 * This is our client task. It connects to the server, and then sends a request once per second. It collects responses as
42 * they arrive, and it prints them out. We will run several client tasks in parallel, each with a different random ID.
43 */
44 static class ClientTask implements Runnable
45 {
46 /** {@inheritDoc} */
47 @Override
48 public void run()
49 {
50 try (ZContext ctx = new ZContext())
51 {
52 Socket client = ctx.createSocket(SocketType.DEALER);
53
54 // Set random identity to make tracing easier
55 String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt());
56 client.setIdentity(identity.getBytes(ZMQ.CHARSET));
57 client.connect("tcp://localhost:5570");
58
59 Poller poller = ctx.createPoller(1);
60 poller.register(client, Poller.POLLIN);
61
62 int requestNr = 0;
63 while (!Thread.currentThread().isInterrupted())
64 {
65 // Tick once per second, pulling in arriving messages
66 for (int centitick = 0; centitick < 100; centitick++)
67 {
68 poller.poll(10);
69 if (poller.pollin(0))
70 {
71 ZMsg msg = ZMsg.recvMsg(client);
72 msg.getLast().print(identity);
73 msg.destroy();
74 }
75 }
76 client.send(String.format("request #%d", ++requestNr), 0);
77 }
78 }
79 }
80 }
81
82 /**
83 * This is our server task. It uses the multithreaded server model to deal requests out to a pool of workers and route
84 * replies back to clients. One worker can handle one request at a time but one client can talk to multiple workers at once.
85 */
86 static class ServerTask implements Runnable
87 {
88 /** {@inheritDoc} */
89 @Override
90 public void run()
91 {
92 try (ZContext ctx = new ZContext())
93 {
94 // Frontend socket talks to clients over TCP
95 Socket frontend = ctx.createSocket(SocketType.ROUTER);
96 frontend.bind("tcp://*:5570");
97
98 // Backend socket talks to workers over inproc
99 Socket backend = ctx.createSocket(SocketType.DEALER);
100 backend.bind("inproc://backend");
101
102 // Launch pool of worker threads, precise number is not critical
103 for (int threadNbr = 0; threadNbr < 5; threadNbr++)
104 {
105 new Thread(new ServerWorker(ctx)).start();
106 }
107
108 // Connect backend to frontend via a proxy
109 ZMQ.proxy(frontend, backend, null);
110 }
111 }
112 }
113
114 /**
115 * Each worker task works on one request at a time and sends a random number of replies back, with random delays between
116 * replies.
117 */
118 private static class ServerWorker implements Runnable
119 {
120 /** context. */
121 private ZContext ctx;
122
123 /**
124 * Construct a worker.
125 * @param ctx the 0MQ context to use
126 */
127 ServerWorker(final ZContext ctx)
128 {
129 this.ctx = ctx;
130 }
131
132 /** {@inheritDoc} */
133 @Override
134 public void run()
135 {
136 Socket worker = this.ctx.createSocket(SocketType.DEALER);
137 worker.connect("inproc://backend");
138
139 while (!Thread.currentThread().isInterrupted())
140 {
141 // The DEALER socket gives us the address envelope and message
142 ZMsg msg = ZMsg.recvMsg(worker);
143 ZFrame address = msg.pop();
144 ZFrame content = msg.pop();
145 assert (content != null);
146 msg.destroy();
147
148 // Send 0..4 replies back
149 int replies = rand.nextInt(5);
150 for (int reply = 0; reply < replies; reply++)
151 {
152 // Sleep for some fraction of a second
153 try
154 {
155 Thread.sleep(rand.nextInt(1000) + 1);
156 }
157 catch (InterruptedException e)
158 {
159 // ignore
160 }
161 address.send(worker, ZFrame.REUSE + ZFrame.MORE);
162 content.send(worker, ZFrame.REUSE);
163 }
164 address.destroy();
165 content.destroy();
166 }
167 this.ctx.destroy();
168 }
169 }
170
171 /**
172 * The main thread simply starts several clients, and a server, and then waits for the server to finish.
173 * @param args unused
174 * @throws Exception on error
175 */
176 public static void main(final String[] args) throws Exception
177 {
178 new Thread(new ClientTask()).start();
179 new Thread(new ClientTask()).start();
180 new Thread(new ClientTask()).start();
181 new Thread(new ServerTask()).start();
182
183 // Run for 20 seconds then quit
184 Thread.sleep(20 * 1000);
185
186 System.exit(0);
187 }
188 }