View Javadoc
1   package org.sim0mq.test.dealerrouter;
2   
3   import java.util.Random;
4   
5   import org.zeromq.SocketType;
6   import org.zeromq.ZContext;
7   import org.zeromq.ZFrame;
8   import org.zeromq.ZMQ;
9   import org.zeromq.ZMQ.Poller;
10  import org.zeromq.ZMQ.Socket;
11  import org.zeromq.ZMsg;
12  
13  /**
14   * 0MQ DEALER-ROUTER example. Based on <a href="http://zguide.zeromq.org/java:asyncsrv">
15   * http://zguide.zeromq.org/java:asyncsrv</a> or in the JeroMQ example at
16   * <a href= "https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/asyncsrv.java">
17   * https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/asyncsrv.java</a>.<br>
18   * In the 0MQ manual this example can be found at
19   * <a href="http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern">
20   * http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern</a>.
21   * <p>
22   * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
23   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
24   * </p>
25   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
26   */
27  public final class DealerRouter
28  {
29  
30      /** */
31      private DealerRouter()
32      {
33          // utility class
34      }
35  
36      /** random stream. */
37      @SuppressWarnings("checkstyle:visibilitymodifier")
38      static Random rand = new Random(System.nanoTime());
39  
40      /**
41       * This is our client task. It connects to the server, and then sends a request once per second. It collects responses as
42       * they arrive, and it prints them out. We will run several client tasks in parallel, each with a different random ID.
43       */
44      static class ClientTask implements Runnable
45      {
46          /** {@inheritDoc} */
47          @Override
48          public void run()
49          {
50              try (ZContext ctx = new ZContext())
51              {
52                  Socket client = ctx.createSocket(SocketType.DEALER);
53  
54                  // Set random identity to make tracing easier
55                  String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt());
56                  client.setIdentity(identity.getBytes(ZMQ.CHARSET));
57                  client.connect("tcp://localhost:5570");
58  
59                  Poller poller = ctx.createPoller(1);
60                  poller.register(client, Poller.POLLIN);
61  
62                  int requestNr = 0;
63                  while (!Thread.currentThread().isInterrupted())
64                  {
65                      // Tick once per second, pulling in arriving messages
66                      for (int centitick = 0; centitick < 100; centitick++)
67                      {
68                          poller.poll(10);
69                          if (poller.pollin(0))
70                          {
71                              ZMsg msg = ZMsg.recvMsg(client);
72                              msg.getLast().print(identity);
73                              msg.destroy();
74                          }
75                      }
76                      client.send(String.format("request #%d", ++requestNr), 0);
77                  }
78              }
79          }
80      }
81  
82      /**
83       * This is our server task. It uses the multithreaded server model to deal requests out to a pool of workers and route
84       * replies back to clients. One worker can handle one request at a time but one client can talk to multiple workers at once.
85       */
86      static class ServerTask implements Runnable
87      {
88          /** {@inheritDoc} */
89          @Override
90          public void run()
91          {
92              try (ZContext ctx = new ZContext())
93              {
94                  // Frontend socket talks to clients over TCP
95                  Socket frontend = ctx.createSocket(SocketType.ROUTER);
96                  frontend.bind("tcp://*:5570");
97  
98                  // Backend socket talks to workers over inproc
99                  Socket backend = ctx.createSocket(SocketType.DEALER);
100                 backend.bind("inproc://backend");
101 
102                 // Launch pool of worker threads, precise number is not critical
103                 for (int threadNbr = 0; threadNbr < 5; threadNbr++)
104                 {
105                     new Thread(new ServerWorker(ctx)).start();
106                 }
107 
108                 // Connect backend to frontend via a proxy
109                 ZMQ.proxy(frontend, backend, null);
110             }
111         }
112     }
113 
114     /**
115      * Each worker task works on one request at a time and sends a random number of replies back, with random delays between
116      * replies.
117      */
118     private static class ServerWorker implements Runnable
119     {
120         /** context. */
121         private ZContext ctx;
122 
123         /**
124          * Construct a worker.
125          * @param ctx the 0MQ context to use
126          */
127         ServerWorker(final ZContext ctx)
128         {
129             this.ctx = ctx;
130         }
131 
132         /** {@inheritDoc} */
133         @Override
134         public void run()
135         {
136             Socket worker = this.ctx.createSocket(SocketType.DEALER);
137             worker.connect("inproc://backend");
138 
139             while (!Thread.currentThread().isInterrupted())
140             {
141                 // The DEALER socket gives us the address envelope and message
142                 ZMsg msg = ZMsg.recvMsg(worker);
143                 ZFrame address = msg.pop();
144                 ZFrame content = msg.pop();
145                 assert (content != null);
146                 msg.destroy();
147 
148                 // Send 0..4 replies back
149                 int replies = rand.nextInt(5);
150                 for (int reply = 0; reply < replies; reply++)
151                 {
152                     // Sleep for some fraction of a second
153                     try
154                     {
155                         Thread.sleep(rand.nextInt(1000) + 1);
156                     }
157                     catch (InterruptedException e)
158                     {
159                         // ignore
160                     }
161                     address.send(worker, ZFrame.REUSE + ZFrame.MORE);
162                     content.send(worker, ZFrame.REUSE);
163                 }
164                 address.destroy();
165                 content.destroy();
166             }
167             this.ctx.destroy();
168         }
169     }
170 
171     /**
172      * The main thread simply starts several clients, and a server, and then waits for the server to finish.
173      * @param args unused
174      * @throws Exception on error
175      */
176     public static void main(final String[] args) throws Exception
177     {
178         new Thread(new ClientTask()).start();
179         new Thread(new ClientTask()).start();
180         new Thread(new ClientTask()).start();
181         new Thread(new ServerTask()).start();
182 
183         // Run for 20 seconds then quit
184         Thread.sleep(20 * 1000);
185 
186         System.exit(0);
187     }
188 }