Req.java
package org.sim0mq.demo.reqrep;
import java.util.concurrent.atomic.AtomicLong;
import org.djutils.serialization.SerializationException;
import org.sim0mq.Sim0MQException;
import org.sim0mq.message.MessageStatus;
import org.sim0mq.message.SimulationMessage;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
/**
* <p>
* Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
* BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
* </p>
* $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
* initial version Aug 4, 2018 <br>
* @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
*/
public class Req
{
/** counter to see if we are ready. */
@SuppressWarnings("checkstyle:visibilitymodifier")
protected AtomicLong counter = new AtomicLong(0);
/**
* @param args command line arguments
* @throws Sim0MQException on error
*/
protected Req(final String[] args) throws Sim0MQException
{
long time = System.currentTimeMillis();
if (args.length < 3)
{
System.err.println("Use as Req startport #threads #calls/thread [#contexts]");
System.exit(-1);
}
int startport = Integer.parseInt(args[0]);
long numthreads = Integer.parseInt(args[1]);
long numcalls = Integer.parseInt(args[2]);
int numcontexts = args.length > 3 ? Integer.parseInt(args[3]) : 1;
ZContext context = new ZContext(numcontexts);
for (int i = 0; i < numthreads; i++)
{
new ReqThread(context, startport + i, numcalls).start();
}
// wait for all threads ready
while (this.counter.get() < numcalls * numthreads)
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException exception)
{
//
}
System.out.println("REQ=" + this.counter.get() + " < " + numcalls * numthreads);
}
context.destroy();
long delta = System.currentTimeMillis() - time;
System.out.println("RUNTIME = " + delta + " ms");
System.out.println("Transactions/second = " + 1000.0 * numcalls * numthreads / delta + " tps");
System.out.println("Messages/second (req + rep) = " + 2000.0 * numcalls * numthreads / delta + " mps");
}
/**
* @param args command line arguments
* @throws Sim0MQException on error
*/
public static void main(final String[] args) throws Sim0MQException
{
new Req(args);
}
/** The worker thread for the REQ requests. */
protected class ReqThread extends Thread
{
/** the (shared) context. */
private final ZContext context;
/** the port to use. */
private final int port;
/** the number of calls to use. */
private final long numcalls;
/**
* @param context the (shared) context
* @param port the port to use
* @param numcalls the number of calls to use
*/
public ReqThread(final ZContext context, final int port, final long numcalls)
{
super();
this.context = context;
this.port = port;
this.numcalls = numcalls;
}
/** {@inheritDoc} */
@Override
public void run()
{
// Socket to talk to server
System.out.println("REQ: Connecting to server with thread on port " + this.port);
ZMQ.Socket socket = this.context.createSocket(ZMQ.REQ);
socket.connect("tcp://127.0.0.1:" + this.port);
String runId = "RUN01";
String senderId = "REQ." + this.port;
String receiverId = "REP." + this.port;
for (int i = 0; i < this.numcalls; i++)
{
// send a request
Object[] request = new Object[] {this.port, i};
try
{
byte[] message =
SimulationMessage.encodeUTF8(runId, senderId, receiverId, "TEST", i, MessageStatus.NEW, request);
boolean ok = socket.send(message, 0);
if (!ok)
{
System.err.println("send message " + i + " for port " + this.port + " returned FALSE");
}
byte[] reply = socket.recv(0);
if (reply == null)
{
System.err.println("receive message " + i + " for port " + this.port + " returned NULL");
}
Object[] replyMessage = SimulationMessage.decode(reply);
if (!replyMessage[3].toString().equals(senderId))
{
System.err.println(SimulationMessage.print(replyMessage));
System.err.println("receive message " + i + " for port " + this.port + ", receiver = "
+ replyMessage[3].toString() + ", expected " + senderId);
}
if (((Number) replyMessage[7]).intValue() == 0)
{
System.err.println(SimulationMessage.print(replyMessage));
System.err.println("receive message " + i + " for port " + this.port + ", #fields = 0");
}
else if (((Number) replyMessage[5]).intValue() != i)
{
System.err.println(SimulationMessage.print(replyMessage));
System.err
.println("receive message " + i + " for port " + this.port + ", payload# = " + replyMessage[5]);
}
Req.this.counter.incrementAndGet();
}
catch (Sim0MQException | SerializationException exception)
{
exception.printStackTrace();
}
}
// send stop message to REP client
try
{
byte[] message = SimulationMessage.encodeUTF8(runId, senderId, receiverId, "STOP", -1, MessageStatus.NEW,
new Object[] {});
boolean ok = socket.send(message, 0);
if (!ok)
{
System.err.println("send message STOP for port " + this.port + " returned FALSE");
}
// wait until stopped
byte[] reply = socket.recv(0);
if (reply == null)
{
System.err.println("receive message STOP for port " + this.port + " returned NULL");
}
}
catch (Sim0MQException | SerializationException exception)
{
exception.printStackTrace();
}
socket.close();
}
}
}