View Javadoc
1   package org.sim0mq.demo.reqrep;
2   
3   import java.util.concurrent.atomic.AtomicLong;
4   
5   import org.djutils.serialization.SerializationException;
6   import org.sim0mq.Sim0MQException;
7   import org.sim0mq.message.MessageStatus;
8   import org.sim0mq.message.SimulationMessage;
9   import org.zeromq.ZContext;
10  import org.zeromq.ZMQ;
11  
12  /**
13   * <p>
14   * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
15   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
16   * </p>
17   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
18   * initial version Aug 4, 2018 <br>
19   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
20   */
21  public class Req
22  {
23      /** counter to see if we are ready. */
24      @SuppressWarnings("checkstyle:visibilitymodifier")
25      protected AtomicLong counter = new AtomicLong(0);
26  
27      /**
28       * @param args command line arguments
29       * @throws Sim0MQException on error
30       */
31      protected Req(final String[] args) throws Sim0MQException
32      {
33          long time = System.currentTimeMillis();
34  
35          if (args.length < 3)
36          {
37              System.err.println("Use as Req startport #threads #calls/thread [#contexts]");
38              System.exit(-1);
39          }
40  
41          int startport = Integer.parseInt(args[0]);
42          long numthreads = Integer.parseInt(args[1]);
43          long numcalls = Integer.parseInt(args[2]);
44          int numcontexts = args.length > 3 ? Integer.parseInt(args[3]) : 1;
45  
46          ZContext context = new ZContext(numcontexts);
47  
48          for (int i = 0; i < numthreads; i++)
49          {
50              new ReqThread(context, startport + i, numcalls).start();
51          }
52  
53          // wait for all threads ready
54          while (this.counter.get() < numcalls * numthreads)
55          {
56              try
57              {
58                  Thread.sleep(1000);
59              }
60              catch (InterruptedException exception)
61              {
62                  //
63              }
64              System.out.println("REQ=" + this.counter.get() + " < " + numcalls * numthreads);
65          }
66  
67          context.destroy();
68  
69          long delta = System.currentTimeMillis() - time;
70          System.out.println("RUNTIME = " + delta + " ms");
71          System.out.println("Transactions/second = " + 1000.0 * numcalls * numthreads / delta + " tps");
72          System.out.println("Messages/second (req + rep) = " + 2000.0 * numcalls * numthreads / delta + " mps");
73      }
74  
75      /**
76       * @param args command line arguments
77       * @throws Sim0MQException on error
78       */
79      public static void main(final String[] args) throws Sim0MQException
80      {
81          new Req(args);
82      }
83  
84      /** The worker thread for the REQ requests. */
85      protected class ReqThread extends Thread
86      {
87          /** the (shared) context. */
88          private final ZContext context;
89  
90          /** the port to use. */
91          private final int port;
92  
93          /** the number of calls to use. */
94          private final long numcalls;
95  
96          /**
97           * @param context the (shared) context
98           * @param port the port to use
99           * @param numcalls the number of calls to use
100          */
101         public ReqThread(final ZContext context, final int port, final long numcalls)
102         {
103             super();
104             this.context = context;
105             this.port = port;
106             this.numcalls = numcalls;
107         }
108 
109         /** {@inheritDoc} */
110         @Override
111         public void run()
112         {
113             // Socket to talk to server
114             System.out.println("REQ: Connecting to server with thread on port " + this.port);
115 
116             ZMQ.Socket socket = this.context.createSocket(ZMQ.REQ);
117             socket.connect("tcp://127.0.0.1:" + this.port);
118             String runId = "RUN01";
119             String senderId = "REQ." + this.port;
120             String receiverId = "REP." + this.port;
121 
122             for (int i = 0; i < this.numcalls; i++)
123             {
124                 // send a request
125                 Object[] request = new Object[] {this.port, i};
126                 try
127                 {
128                     byte[] message =
129                             SimulationMessage.encodeUTF8(runId, senderId, receiverId, "TEST", i, MessageStatus.NEW, request);
130                     boolean ok = socket.send(message, 0);
131                     if (!ok)
132                     {
133                         System.err.println("send message " + i + " for port " + this.port + " returned FALSE");
134                     }
135 
136                     byte[] reply = socket.recv(0);
137                     if (reply == null)
138                     {
139                         System.err.println("receive message " + i + " for port " + this.port + " returned NULL");
140                     }
141 
142                     Object[] replyMessage = SimulationMessage.decode(reply);
143                     if (!replyMessage[3].toString().equals(senderId))
144                     {
145                         System.err.println(SimulationMessage.print(replyMessage));
146                         System.err.println("receive message " + i + " for port " + this.port + ", receiver = "
147                                 + replyMessage[3].toString() + ", expected " + senderId);
148                     }
149                     if (((Number) replyMessage[7]).intValue() == 0)
150                     {
151                         System.err.println(SimulationMessage.print(replyMessage));
152                         System.err.println("receive message " + i + " for port " + this.port + ", #fields = 0");
153                     }
154                     else if (((Number) replyMessage[5]).intValue() != i)
155                     {
156                         System.err.println(SimulationMessage.print(replyMessage));
157                         System.err
158                                 .println("receive message " + i + " for port " + this.port + ", payload# = " + replyMessage[5]);
159                     }
160                     Req.this.counter.incrementAndGet();
161                 }
162                 catch (Sim0MQException | SerializationException exception)
163                 {
164                     exception.printStackTrace();
165                 }
166             }
167 
168             // send stop message to REP client
169             try
170             {
171                 byte[] message = SimulationMessage.encodeUTF8(runId, senderId, receiverId, "STOP", -1, MessageStatus.NEW,
172                         new Object[] {});
173                 boolean ok = socket.send(message, 0);
174                 if (!ok)
175                 {
176                     System.err.println("send message STOP for port " + this.port + " returned FALSE");
177                 }
178 
179                 // wait until stopped
180                 byte[] reply = socket.recv(0);
181                 if (reply == null)
182                 {
183                     System.err.println("receive message STOP for port " + this.port + " returned NULL");
184                 }
185             }
186             catch (Sim0MQException | SerializationException exception)
187             {
188                 exception.printStackTrace();
189             }
190 
191             socket.close();
192         }
193 
194     }
195 
196 }