View Javadoc
1   package org.sim0mq.demo.reqrep;
2   
3   import java.util.concurrent.atomic.AtomicLong;
4   
5   import org.djutils.serialization.SerializationException;
6   import org.sim0mq.Sim0MQException;
7   import org.sim0mq.message.Sim0MQMessage;
8   import org.zeromq.SocketType;
9   import org.zeromq.ZContext;
10  import org.zeromq.ZMQ;
11  
12  /**
13   * <p>
14   * Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
15   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
16   * </p>
17   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
18   * initial version Aug 4, 2018 <br>
19   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
20   */
21  public class Req
22  {
23      /** counter to see if we are ready. */
24      @SuppressWarnings("checkstyle:visibilitymodifier")
25      protected AtomicLong counter = new AtomicLong(0);
26  
27      /**
28       * @param args command line arguments
29       * @throws Sim0MQException on error
30       */
31      protected Req(final String[] args) throws Sim0MQException
32      {
33          long time = System.currentTimeMillis();
34  
35          if (args.length < 3)
36          {
37              System.err.println("Use as Req startport #threads #calls/thread [#contexts]");
38              System.exit(-1);
39          }
40  
41          int startport = Integer.parseInt(args[0]);
42          long numthreads = Integer.parseInt(args[1]);
43          long numcalls = Integer.parseInt(args[2]);
44          int numcontexts = args.length > 3 ? Integer.parseInt(args[3]) : 1;
45  
46          ZContext context = new ZContext(numcontexts);
47  
48          for (int i = 0; i < numthreads; i++)
49          {
50              new ReqThread(context, startport + i, numcalls).start();
51          }
52  
53          // wait for all threads ready
54          while (this.counter.get() < numcalls * numthreads)
55          {
56              try
57              {
58                  Thread.sleep(1000);
59              }
60              catch (InterruptedException exception)
61              {
62                  //
63              }
64              System.out.println("REQ=" + this.counter.get() + " < " + numcalls * numthreads);
65          }
66  
67          context.destroy();
68  
69          long delta = System.currentTimeMillis() - time;
70          System.out.println("RUNTIME = " + delta + " ms");
71          System.out.println("Transactions/second = " + 1000.0 * numcalls * numthreads / delta + " tps");
72          System.out.println("Messages/second (req + rep) = " + 2000.0 * numcalls * numthreads / delta + " mps");
73      }
74  
75      /**
76       * @param args command line arguments
77       * @throws Sim0MQException on error
78       */
79      public static void main(final String[] args) throws Sim0MQException
80      {
81          new Req(args);
82      }
83  
84      /** The worker thread for the REQ requests. */
85      protected class ReqThread extends Thread
86      {
87          /** the (shared) context. */
88          private final ZContext context;
89  
90          /** the port to use. */
91          private final int port;
92  
93          /** the number of calls to use. */
94          private final long numcalls;
95  
96          /**
97           * @param context the (shared) context
98           * @param port the port to use
99           * @param numcalls the number of calls to use
100          */
101         public ReqThread(final ZContext context, final int port, final long numcalls)
102         {
103             this.context = context;
104             this.port = port;
105             this.numcalls = numcalls;
106         }
107 
108         /** {@inheritDoc} */
109         @Override
110         public void run()
111         {
112             // Socket to talk to server
113             System.out.println("REQ: Connecting to server with thread on port " + this.port);
114 
115             ZMQ.Socket socket = this.context.createSocket(SocketType.REQ);
116             socket.connect("tcp://127.0.0.1:" + this.port);
117             String runId = "RUN01";
118             String senderId = "REQ." + this.port;
119             String receiverId = "REP." + this.port;
120 
121             for (int i = 0; i < this.numcalls; i++)
122             {
123                 // send a request
124                 Object[] request = new Object[] {this.port, i};
125                 try
126                 {
127                     byte[] message =
128                             Sim0MQMessage.encodeUTF8(true, runId, senderId, receiverId, "TEST", i, request);
129                     boolean ok = socket.send(message, 0);
130                     if (!ok)
131                     {
132                         System.err.println("send message " + i + " for port " + this.port + " returned FALSE");
133                     }
134 
135                     byte[] reply = socket.recv(0);
136                     if (reply == null)
137                     {
138                         System.err.println("receive message " + i + " for port " + this.port + " returned NULL");
139                     }
140 
141                     Object[] replyMessage = Sim0MQMessage.decode(reply).createObjectArray();
142                     if (!replyMessage[3].toString().equals(senderId))
143                     {
144                         System.err.println(Sim0MQMessage.print(replyMessage));
145                         System.err.println("receive message " + i + " for port " + this.port + ", receiver = "
146                                 + replyMessage[3].toString() + ", expected " + senderId);
147                     }
148                     if (((Number) replyMessage[7]).intValue() == 0)
149                     {
150                         System.err.println(Sim0MQMessage.print(replyMessage));
151                         System.err.println("receive message " + i + " for port " + this.port + ", #fields = 0");
152                     }
153                     else if (((Number) replyMessage[5]).intValue() != i)
154                     {
155                         System.err.println(Sim0MQMessage.print(replyMessage));
156                         System.err
157                                 .println("receive message " + i + " for port " + this.port + ", payload# = " + replyMessage[5]);
158                     }
159                     Req.this.counter.incrementAndGet();
160                 }
161                 catch (Sim0MQException | SerializationException exception)
162                 {
163                     exception.printStackTrace();
164                 }
165             }
166 
167             // send stop message to REP client
168             try
169             {
170                 byte[] message = Sim0MQMessage.encodeUTF8(true, runId, senderId, receiverId, "STOP", -1,
171                         new Object[] {});
172                 boolean ok = socket.send(message, 0);
173                 if (!ok)
174                 {
175                     System.err.println("send message STOP for port " + this.port + " returned FALSE");
176                 }
177 
178                 // wait until stopped
179                 byte[] reply = socket.recv(0);
180                 if (reply == null)
181                 {
182                     System.err.println("receive message STOP for port " + this.port + " returned NULL");
183                 }
184             }
185             catch (Sim0MQException | SerializationException exception)
186             {
187                 exception.printStackTrace();
188             }
189 
190             socket.close();
191         }
192 
193     }
194 
195 }