View Javadoc
1   package org.sim0mq.demo.reqrep;
2   
3   import java.util.concurrent.atomic.AtomicLong;
4   
5   import org.sim0mq.Sim0MQException;
6   import org.sim0mq.message.MessageStatus;
7   import org.sim0mq.message.SimulationMessage;
8   import org.zeromq.ZContext;
9   import org.zeromq.ZMQ;
10  
11  /**
12   * <p>
13   * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
14   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
15   * </p>
16   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
17   * initial version Aug 4, 2018 <br>
18   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
19   */
20  public class Req
21  {
22      /** counter to see if we are ready. */
23      @SuppressWarnings("checkstyle:visibilitymodifier")
24      protected AtomicLong counter = new AtomicLong(0);
25  
26      /**
27       * @param args command line arguments
28       * @throws Sim0MQException on error
29       */
30      protected Req(final String[] args) throws Sim0MQException
31      {
32          long time = System.currentTimeMillis();
33  
34          if (args.length < 3)
35          {
36              System.err.println("Use as Req startport #threads #calls/thread [#contexts]");
37              System.exit(-1);
38          }
39  
40          int startport = Integer.parseInt(args[0]);
41          long numthreads = Integer.parseInt(args[1]);
42          long numcalls = Integer.parseInt(args[2]);
43          int numcontexts = args.length > 3 ? Integer.parseInt(args[3]) : 1;
44  
45          ZContext context = new ZContext(numcontexts);
46  
47          for (int i = 0; i < numthreads; i++)
48          {
49              new ReqThread(context, startport + i, numcalls).start();
50          }
51  
52          // wait for all threads ready
53          while (this.counter.get() < numcalls * numthreads)
54          {
55              try
56              {
57                  Thread.sleep(1000);
58              }
59              catch (InterruptedException exception)
60              {
61                  //
62              }
63              System.out.println("REQ=" + this.counter.get() + " < " + numcalls * numthreads);
64          }
65  
66          context.destroy();
67  
68          long delta = System.currentTimeMillis() - time;
69          System.out.println("RUNTIME = " + delta + " ms");
70          System.out.println("Transactions/second = " + 1000.0 * numcalls * numthreads / delta + " tps");
71          System.out.println("Messages/second (req + rep) = " + 2000.0 * numcalls * numthreads / delta + " mps");
72      }
73  
74      /**
75       * @param args command line arguments
76       * @throws Sim0MQException on error
77       */
78      public static void main(final String[] args) throws Sim0MQException
79      {
80          new Req(args);
81      }
82  
83      /** The worker thread for the REQ requests. */
84      protected class ReqThread extends Thread
85      {
86          /** the (shared) context. */
87          private final ZContext context;
88  
89          /** the port to use. */
90          private final int port;
91  
92          /** the number of calls to use. */
93          private final long numcalls;
94  
95          /**
96           * @param context the (shared) context
97           * @param port the port to use
98           * @param numcalls the number of calls to use
99           */
100         public ReqThread(final ZContext context, final int port, final long numcalls)
101         {
102             super();
103             this.context = context;
104             this.port = port;
105             this.numcalls = numcalls;
106         }
107 
108         /** {@inheritDoc} */
109         @Override
110         public void run()
111         {
112             // Socket to talk to server
113             System.out.println("REQ: Connecting to server with thread on port " + this.port);
114 
115             ZMQ.Socket socket = this.context.createSocket(ZMQ.REQ);
116             socket.connect("tcp://127.0.0.1:" + this.port);
117             String runId = "RUN01";
118             String senderId = "REQ." + this.port;
119             String receiverId = "REP." + this.port;
120 
121             for (int i = 0; i < this.numcalls; i++)
122             {
123                 // send a request
124                 Object[] request = new Object[] { this.port, i };
125                 try
126                 {
127                     byte[] message =
128                             SimulationMessage.encodeUTF8(runId, senderId, receiverId, "TEST", i, MessageStatus.NEW, request);
129                     boolean ok = socket.send(message, 0);
130                     if (!ok)
131                     {
132                         System.err.println("send message " + i + " for port " + this.port + " returned FALSE");
133                     }
134 
135                     byte[] reply = socket.recv(0);
136                     if (reply == null)
137                     {
138                         System.err.println("receive message " + i + " for port " + this.port + " returned NULL");
139                     }
140 
141                     Object[] replyMessage = SimulationMessage.decode(reply);
142                     if (!replyMessage[3].toString().equals(senderId))
143                     {
144                         System.err.println(SimulationMessage.print(replyMessage));
145                         System.err.println("receive message " + i + " for port " + this.port + ", receiver = "
146                                 + replyMessage[3].toString() + ", expected " + senderId);
147                     }
148                     if (((Number) replyMessage[7]).intValue() == 0)
149                     {
150                         System.err.println(SimulationMessage.print(replyMessage));
151                         System.err.println("receive message " + i + " for port " + this.port + ", #fields = 0");
152                     }
153                     else if (((Number) replyMessage[5]).intValue() != i)
154                     {
155                         System.err.println(SimulationMessage.print(replyMessage));
156                         System.err
157                                 .println("receive message " + i + " for port " + this.port + ", payload# = " + replyMessage[5]);
158                     }
159                     Req.this.counter.incrementAndGet();
160                 }
161                 catch (Sim0MQException exception)
162                 {
163                     exception.printStackTrace();
164                 }
165             }
166 
167             // send stop message to REP client
168             try
169             {
170                 byte[] message =
171                         SimulationMessage.encodeUTF8(runId, senderId, receiverId, "STOP", -1, MessageStatus.NEW, new Object[] {});
172                 boolean ok = socket.send(message, 0);
173                 if (!ok)
174                 {
175                     System.err.println("send message STOP for port " + this.port + " returned FALSE");
176                 }
177 
178                 // wait until stopped
179                 byte[] reply = socket.recv(0);
180                 if (reply == null)
181                 {
182                     System.err.println("receive message STOP for port " + this.port + " returned NULL");
183                 }
184             }
185             catch (Sim0MQException exception)
186             {
187                 exception.printStackTrace();
188             }
189 
190             socket.close();
191         }
192 
193     }
194 
195 }