1 package org.sim0mq.demo.reqrep;
2
3 import java.util.concurrent.atomic.AtomicLong;
4
5 import org.djutils.serialization.SerializationException;
6 import org.sim0mq.Sim0MQException;
7 import org.sim0mq.message.Sim0MQMessage;
8 import org.zeromq.SocketType;
9 import org.zeromq.ZContext;
10 import org.zeromq.ZMQ;
11
12
13
14
15
16
17
18
19
20
21 public class Req
22 {
23
24 @SuppressWarnings("checkstyle:visibilitymodifier")
25 protected AtomicLong counter = new AtomicLong(0);
26
27
28
29
30
31 protected Req(final String[] args) throws Sim0MQException
32 {
33 long time = System.currentTimeMillis();
34
35 if (args.length < 3)
36 {
37 System.err.println("Use as Req startport #threads #calls/thread [#contexts]");
38 System.exit(-1);
39 }
40
41 int startport = Integer.parseInt(args[0]);
42 long numthreads = Integer.parseInt(args[1]);
43 long numcalls = Integer.parseInt(args[2]);
44 int numcontexts = args.length > 3 ? Integer.parseInt(args[3]) : 1;
45
46 ZContext context = new ZContext(numcontexts);
47
48 for (int i = 0; i < numthreads; i++)
49 {
50 new ReqThread(context, startport + i, numcalls).start();
51 }
52
53
54 while (this.counter.get() < numcalls * numthreads)
55 {
56 try
57 {
58 Thread.sleep(1000);
59 }
60 catch (InterruptedException exception)
61 {
62
63 }
64 System.out.println("REQ=" + this.counter.get() + " < " + numcalls * numthreads);
65 }
66
67 context.destroy();
68
69 long delta = System.currentTimeMillis() - time;
70 System.out.println("RUNTIME = " + delta + " ms");
71 System.out.println("Transactions/second = " + 1000.0 * numcalls * numthreads / delta + " tps");
72 System.out.println("Messages/second (req + rep) = " + 2000.0 * numcalls * numthreads / delta + " mps");
73 }
74
75
76
77
78
79 public static void main(final String[] args) throws Sim0MQException
80 {
81 new Req(args);
82 }
83
84
85 protected class ReqThread extends Thread
86 {
87
88 private final ZContext context;
89
90
91 private final int port;
92
93
94 private final long numcalls;
95
96
97
98
99
100
101 public ReqThread(final ZContext context, final int port, final long numcalls)
102 {
103 this.context = context;
104 this.port = port;
105 this.numcalls = numcalls;
106 }
107
108
109 @Override
110 public void run()
111 {
112
113 System.out.println("REQ: Connecting to server with thread on port " + this.port);
114
115 ZMQ.Socket socket = this.context.createSocket(SocketType.REQ);
116 socket.connect("tcp://127.0.0.1:" + this.port);
117 String runId = "RUN01";
118 String senderId = "REQ." + this.port;
119 String receiverId = "REP." + this.port;
120
121 for (int i = 0; i < this.numcalls; i++)
122 {
123
124 Object[] request = new Object[] {this.port, i};
125 try
126 {
127 byte[] message =
128 Sim0MQMessage.encodeUTF8(true, runId, senderId, receiverId, "TEST", i, request);
129 boolean ok = socket.send(message, 0);
130 if (!ok)
131 {
132 System.err.println("send message " + i + " for port " + this.port + " returned FALSE");
133 }
134
135 byte[] reply = socket.recv(0);
136 if (reply == null)
137 {
138 System.err.println("receive message " + i + " for port " + this.port + " returned NULL");
139 }
140
141 Object[] replyMessage = Sim0MQMessage.decode(reply).createObjectArray();
142 if (!replyMessage[3].toString().equals(senderId))
143 {
144 System.err.println(Sim0MQMessage.print(replyMessage));
145 System.err.println("receive message " + i + " for port " + this.port + ", receiver = "
146 + replyMessage[3].toString() + ", expected " + senderId);
147 }
148 if (((Number) replyMessage[7]).intValue() == 0)
149 {
150 System.err.println(Sim0MQMessage.print(replyMessage));
151 System.err.println("receive message " + i + " for port " + this.port + ", #fields = 0");
152 }
153 else if (((Number) replyMessage[5]).intValue() != i)
154 {
155 System.err.println(Sim0MQMessage.print(replyMessage));
156 System.err
157 .println("receive message " + i + " for port " + this.port + ", payload# = " + replyMessage[5]);
158 }
159 Req.this.counter.incrementAndGet();
160 }
161 catch (Sim0MQException | SerializationException exception)
162 {
163 exception.printStackTrace();
164 }
165 }
166
167
168 try
169 {
170 byte[] message = Sim0MQMessage.encodeUTF8(true, runId, senderId, receiverId, "STOP", -1,
171 new Object[] {});
172 boolean ok = socket.send(message, 0);
173 if (!ok)
174 {
175 System.err.println("send message STOP for port " + this.port + " returned FALSE");
176 }
177
178
179 byte[] reply = socket.recv(0);
180 if (reply == null)
181 {
182 System.err.println("receive message STOP for port " + this.port + " returned NULL");
183 }
184 }
185 catch (Sim0MQException | SerializationException exception)
186 {
187 exception.printStackTrace();
188 }
189
190 socket.close();
191 }
192
193 }
194
195 }