1 package org.sim0mq.demo.reqrep;
2
3 import java.util.concurrent.atomic.AtomicLong;
4
5 import org.sim0mq.Sim0MQException;
6 import org.sim0mq.message.MessageStatus;
7 import org.sim0mq.message.SimulationMessage;
8 import org.zeromq.ZContext;
9 import org.zeromq.ZMQ;
10
11
12
13
14
15
16
17
18
19
20 public class Req
21 {
22
23 @SuppressWarnings("checkstyle:visibilitymodifier")
24 protected AtomicLong counter = new AtomicLong(0);
25
26
27
28
29
30 protected Req(final String[] args) throws Sim0MQException
31 {
32 long time = System.currentTimeMillis();
33
34 if (args.length < 3)
35 {
36 System.err.println("Use as Req startport #threads #calls/thread [#contexts]");
37 System.exit(-1);
38 }
39
40 int startport = Integer.parseInt(args[0]);
41 long numthreads = Integer.parseInt(args[1]);
42 long numcalls = Integer.parseInt(args[2]);
43 int numcontexts = args.length > 3 ? Integer.parseInt(args[3]) : 1;
44
45 ZContext context = new ZContext(numcontexts);
46
47 for (int i = 0; i < numthreads; i++)
48 {
49 new ReqThread(context, startport + i, numcalls).start();
50 }
51
52
53 while (this.counter.get() < numcalls * numthreads)
54 {
55 try
56 {
57 Thread.sleep(1000);
58 }
59 catch (InterruptedException exception)
60 {
61
62 }
63 System.out.println("REQ=" + this.counter.get() + " < " + numcalls * numthreads);
64 }
65
66 context.destroy();
67
68 long delta = System.currentTimeMillis() - time;
69 System.out.println("RUNTIME = " + delta + " ms");
70 System.out.println("Transactions/second = " + 1000.0 * numcalls * numthreads / delta + " tps");
71 System.out.println("Messages/second (req + rep) = " + 2000.0 * numcalls * numthreads / delta + " mps");
72 }
73
74
75
76
77
78 public static void main(final String[] args) throws Sim0MQException
79 {
80 new Req(args);
81 }
82
83
84 protected class ReqThread extends Thread
85 {
86
87 private final ZContext context;
88
89
90 private final int port;
91
92
93 private final long numcalls;
94
95
96
97
98
99
100 public ReqThread(final ZContext context, final int port, final long numcalls)
101 {
102 super();
103 this.context = context;
104 this.port = port;
105 this.numcalls = numcalls;
106 }
107
108
109 @Override
110 public void run()
111 {
112
113 System.out.println("REQ: Connecting to server with thread on port " + this.port);
114
115 ZMQ.Socket socket = this.context.createSocket(ZMQ.REQ);
116 socket.connect("tcp://127.0.0.1:" + this.port);
117 String runId = "RUN01";
118 String senderId = "REQ." + this.port;
119 String receiverId = "REP." + this.port;
120
121 for (int i = 0; i < this.numcalls; i++)
122 {
123
124 Object[] request = new Object[] { this.port, i };
125 try
126 {
127 byte[] message =
128 SimulationMessage.encodeUTF8(runId, senderId, receiverId, "TEST", i, MessageStatus.NEW, request);
129 boolean ok = socket.send(message, 0);
130 if (!ok)
131 {
132 System.err.println("send message " + i + " for port " + this.port + " returned FALSE");
133 }
134
135 byte[] reply = socket.recv(0);
136 if (reply == null)
137 {
138 System.err.println("receive message " + i + " for port " + this.port + " returned NULL");
139 }
140
141 Object[] replyMessage = SimulationMessage.decode(reply);
142 if (!replyMessage[3].toString().equals(senderId))
143 {
144 System.err.println(SimulationMessage.print(replyMessage));
145 System.err.println("receive message " + i + " for port " + this.port + ", receiver = "
146 + replyMessage[3].toString() + ", expected " + senderId);
147 }
148 if (((Number) replyMessage[7]).intValue() == 0)
149 {
150 System.err.println(SimulationMessage.print(replyMessage));
151 System.err.println("receive message " + i + " for port " + this.port + ", #fields = 0");
152 }
153 else if (((Number) replyMessage[5]).intValue() != i)
154 {
155 System.err.println(SimulationMessage.print(replyMessage));
156 System.err
157 .println("receive message " + i + " for port " + this.port + ", payload# = " + replyMessage[5]);
158 }
159 Req.this.counter.incrementAndGet();
160 }
161 catch (Sim0MQException exception)
162 {
163 exception.printStackTrace();
164 }
165 }
166
167
168 try
169 {
170 byte[] message =
171 SimulationMessage.encodeUTF8(runId, senderId, receiverId, "STOP", -1, MessageStatus.NEW, new Object[] {});
172 boolean ok = socket.send(message, 0);
173 if (!ok)
174 {
175 System.err.println("send message STOP for port " + this.port + " returned FALSE");
176 }
177
178
179 byte[] reply = socket.recv(0);
180 if (reply == null)
181 {
182 System.err.println("receive message STOP for port " + this.port + " returned NULL");
183 }
184 }
185 catch (Sim0MQException exception)
186 {
187 exception.printStackTrace();
188 }
189
190 socket.close();
191 }
192
193 }
194
195 }