1 package org.sim0mq.demo.reqrep;
2
3 import java.util.concurrent.atomic.AtomicLong;
4
5 import org.djutils.serialization.SerializationException;
6 import org.sim0mq.Sim0MQException;
7 import org.sim0mq.message.MessageStatus;
8 import org.sim0mq.message.SimulationMessage;
9 import org.zeromq.ZContext;
10 import org.zeromq.ZMQ;
11
12
13
14
15
16
17
18
19
20
21 public class Req
22 {
23
24 @SuppressWarnings("checkstyle:visibilitymodifier")
25 protected AtomicLong counter = new AtomicLong(0);
26
27
28
29
30
31 protected Req(final String[] args) throws Sim0MQException
32 {
33 long time = System.currentTimeMillis();
34
35 if (args.length < 3)
36 {
37 System.err.println("Use as Req startport #threads #calls/thread [#contexts]");
38 System.exit(-1);
39 }
40
41 int startport = Integer.parseInt(args[0]);
42 long numthreads = Integer.parseInt(args[1]);
43 long numcalls = Integer.parseInt(args[2]);
44 int numcontexts = args.length > 3 ? Integer.parseInt(args[3]) : 1;
45
46 ZContext context = new ZContext(numcontexts);
47
48 for (int i = 0; i < numthreads; i++)
49 {
50 new ReqThread(context, startport + i, numcalls).start();
51 }
52
53
54 while (this.counter.get() < numcalls * numthreads)
55 {
56 try
57 {
58 Thread.sleep(1000);
59 }
60 catch (InterruptedException exception)
61 {
62
63 }
64 System.out.println("REQ=" + this.counter.get() + " < " + numcalls * numthreads);
65 }
66
67 context.destroy();
68
69 long delta = System.currentTimeMillis() - time;
70 System.out.println("RUNTIME = " + delta + " ms");
71 System.out.println("Transactions/second = " + 1000.0 * numcalls * numthreads / delta + " tps");
72 System.out.println("Messages/second (req + rep) = " + 2000.0 * numcalls * numthreads / delta + " mps");
73 }
74
75
76
77
78
79 public static void main(final String[] args) throws Sim0MQException
80 {
81 new Req(args);
82 }
83
84
85 protected class ReqThread extends Thread
86 {
87
88 private final ZContext context;
89
90
91 private final int port;
92
93
94 private final long numcalls;
95
96
97
98
99
100
101 public ReqThread(final ZContext context, final int port, final long numcalls)
102 {
103 super();
104 this.context = context;
105 this.port = port;
106 this.numcalls = numcalls;
107 }
108
109
110 @Override
111 public void run()
112 {
113
114 System.out.println("REQ: Connecting to server with thread on port " + this.port);
115
116 ZMQ.Socket socket = this.context.createSocket(ZMQ.REQ);
117 socket.connect("tcp://127.0.0.1:" + this.port);
118 String runId = "RUN01";
119 String senderId = "REQ." + this.port;
120 String receiverId = "REP." + this.port;
121
122 for (int i = 0; i < this.numcalls; i++)
123 {
124
125 Object[] request = new Object[] {this.port, i};
126 try
127 {
128 byte[] message =
129 SimulationMessage.encodeUTF8(runId, senderId, receiverId, "TEST", i, MessageStatus.NEW, request);
130 boolean ok = socket.send(message, 0);
131 if (!ok)
132 {
133 System.err.println("send message " + i + " for port " + this.port + " returned FALSE");
134 }
135
136 byte[] reply = socket.recv(0);
137 if (reply == null)
138 {
139 System.err.println("receive message " + i + " for port " + this.port + " returned NULL");
140 }
141
142 Object[] replyMessage = SimulationMessage.decode(reply);
143 if (!replyMessage[3].toString().equals(senderId))
144 {
145 System.err.println(SimulationMessage.print(replyMessage));
146 System.err.println("receive message " + i + " for port " + this.port + ", receiver = "
147 + replyMessage[3].toString() + ", expected " + senderId);
148 }
149 if (((Number) replyMessage[7]).intValue() == 0)
150 {
151 System.err.println(SimulationMessage.print(replyMessage));
152 System.err.println("receive message " + i + " for port " + this.port + ", #fields = 0");
153 }
154 else if (((Number) replyMessage[5]).intValue() != i)
155 {
156 System.err.println(SimulationMessage.print(replyMessage));
157 System.err
158 .println("receive message " + i + " for port " + this.port + ", payload# = " + replyMessage[5]);
159 }
160 Req.this.counter.incrementAndGet();
161 }
162 catch (Sim0MQException | SerializationException exception)
163 {
164 exception.printStackTrace();
165 }
166 }
167
168
169 try
170 {
171 byte[] message = SimulationMessage.encodeUTF8(runId, senderId, receiverId, "STOP", -1, MessageStatus.NEW,
172 new Object[] {});
173 boolean ok = socket.send(message, 0);
174 if (!ok)
175 {
176 System.err.println("send message STOP for port " + this.port + " returned FALSE");
177 }
178
179
180 byte[] reply = socket.recv(0);
181 if (reply == null)
182 {
183 System.err.println("receive message STOP for port " + this.port + " returned NULL");
184 }
185 }
186 catch (Sim0MQException | SerializationException exception)
187 {
188 exception.printStackTrace();
189 }
190
191 socket.close();
192 }
193
194 }
195
196 }