1 package org.sim0mq.demo.reqrep;
2
3 import org.djutils.serialization.SerializationException;
4 import org.sim0mq.Sim0MQException;
5 import org.sim0mq.message.MessageStatus;
6 import org.sim0mq.message.SimulationMessage;
7 import org.zeromq.ZContext;
8 import org.zeromq.ZMQ;
9
10
11
12
13
14
15
16
17
18
19 public class Rep
20 {
21
22
23
24
25 protected Rep(final String[] args) throws Sim0MQException
26 {
27 if (args.length < 2)
28 {
29 System.err.println("Use as Rep startport #threads [#contexts]");
30 System.exit(-1);
31 }
32
33 int startport = Integer.parseInt(args[0]);
34 long numthreads = Integer.parseInt(args[1]);
35 int numcontexts = args.length > 2 ? Integer.parseInt(args[2]) : 1;
36
37 ZContext context = new ZContext(numcontexts);
38
39 for (int i = 0; i < numthreads; i++)
40 {
41 new RepThread(context, startport + i).start();
42 }
43
44 context.destroy();
45 }
46
47
48
49
50
51 public static void main(final String[] args) throws Sim0MQException
52 {
53 new Rep(args);
54 }
55
56
57 protected class RepThread extends Thread
58 {
59
60 private final ZContext context;
61
62
63 private final int port;
64
65
66
67
68
69 public RepThread(final ZContext context, final int port)
70 {
71 super();
72 this.context = context;
73 this.port = port;
74 }
75
76
77 @Override
78 public void run()
79 {
80
81 System.out.println("REP: Connecting to server with thread on port " + this.port);
82
83
84 ZMQ.Socket socket = this.context.createSocket(ZMQ.REP);
85 socket.bind("tcp://*:" + this.port);
86
87 String senderId = "REP." + this.port;
88 String receiverId = "REQ." + this.port;
89
90 int messagenr = 0;
91
92 while (true)
93 {
94 try
95 {
96
97 byte[] request = socket.recv(0);
98 Object[] message = SimulationMessage.decode(request);
99
100 if (message[4].toString().equals("STOP"))
101 {
102
103 Object[] reply = new Object[] { "STOPPED" };
104 socket.send(SimulationMessage.encodeUTF8(message[1], senderId, receiverId, "STOPPED", messagenr,
105 MessageStatus.NEW, reply), 0);
106 break;
107 }
108
109
110 if (!message[3].toString().equals(senderId))
111 {
112 System.err.println(SimulationMessage.print(message));
113 System.err.println("receive message " + messagenr + " for port " + this.port + ", receiver = "
114 + message[3].toString() + ", expected " + senderId);
115 }
116 if (((Number) message[7]).intValue() == 0)
117 {
118 System.err.println(SimulationMessage.print(message));
119 System.err.println("receive message " + messagenr + " for port " + this.port + ", #fields = 0");
120 }
121 else if (((Number) message[5]).intValue() != messagenr)
122 {
123 System.err.println(SimulationMessage.print(message));
124 System.err.println(
125 "receive message " + messagenr + " for port " + this.port + ", payload# = " + message[5]);
126 }
127
128
129 Object[] reply = new Object[] { message[8], message[9] };
130 socket.send(SimulationMessage.encodeUTF8(message[1], senderId, receiverId, "REPLY", messagenr,
131 MessageStatus.NEW, reply), 0);
132 }
133 catch (Sim0MQException | SerializationException exception)
134 {
135 exception.printStackTrace();
136 }
137
138
139 messagenr++;
140 }
141
142 socket.close();
143
144 }
145
146 }
147
148 }