1 package org.sim0mq.demo.reqrep;
2
3 import org.djutils.serialization.SerializationException;
4 import org.sim0mq.Sim0MQException;
5 import org.sim0mq.message.Sim0MQMessage;
6 import org.zeromq.SocketType;
7 import org.zeromq.ZContext;
8 import org.zeromq.ZMQ;
9
10
11
12
13
14
15
16
17
18
19 public class Rep
20 {
21
22
23
24
25 protected Rep(final String[] args) throws Sim0MQException
26 {
27 if (args.length < 2)
28 {
29 System.err.println("Use as Rep startport #threads [#contexts]");
30 System.exit(-1);
31 }
32
33 int startport = Integer.parseInt(args[0]);
34 long numthreads = Integer.parseInt(args[1]);
35 int numcontexts = args.length > 2 ? Integer.parseInt(args[2]) : 1;
36
37 ZContext context = new ZContext(numcontexts);
38
39 for (int i = 0; i < numthreads; i++)
40 {
41 new RepThread(context, startport + i).start();
42 }
43
44 context.destroy();
45 }
46
47
48
49
50
51 public static void main(final String[] args) throws Sim0MQException
52 {
53 new Rep(args);
54 }
55
56
57 protected class RepThread extends Thread
58 {
59
60 private final ZContext context;
61
62
63 private final int port;
64
65
66
67
68
69 public RepThread(final ZContext context, final int port)
70 {
71 this.context = context;
72 this.port = port;
73 }
74
75
76 @Override
77 public void run()
78 {
79
80 System.out.println("REP: Connecting to server with thread on port " + this.port);
81
82
83 ZMQ.Socket socket = this.context.createSocket(SocketType.REP);
84 socket.bind("tcp://*:" + this.port);
85
86 String senderId = "REP." + this.port;
87 String receiverId = "REQ." + this.port;
88
89 int messagenr = 0;
90
91 while (true)
92 {
93 try
94 {
95
96 byte[] request = socket.recv(0);
97 Object[] message = Sim0MQMessage.decode(request).createObjectArray();
98
99 if (message[4].toString().equals("STOP"))
100 {
101
102 Object[] reply = new Object[] { "STOPPED" };
103 socket.send(Sim0MQMessage.encodeUTF8(true, message[1], senderId, receiverId, "STOPPED", messagenr,
104 reply), 0);
105 break;
106 }
107
108
109 if (!message[3].toString().equals(senderId))
110 {
111 System.err.println(Sim0MQMessage.print(message));
112 System.err.println("receive message " + messagenr + " for port " + this.port + ", receiver = "
113 + message[3].toString() + ", expected " + senderId);
114 }
115 if (((Number) message[7]).intValue() == 0)
116 {
117 System.err.println(Sim0MQMessage.print(message));
118 System.err.println("receive message " + messagenr + " for port " + this.port + ", #fields = 0");
119 }
120 else if (((Number) message[5]).intValue() != messagenr)
121 {
122 System.err.println(Sim0MQMessage.print(message));
123 System.err.println(
124 "receive message " + messagenr + " for port " + this.port + ", payload# = " + message[5]);
125 }
126
127
128 Object[] reply = new Object[] { message[8], message[9] };
129 socket.send(Sim0MQMessage.encodeUTF8(true, message[1], senderId, receiverId, "REPLY", messagenr,
130 reply), 0);
131 }
132 catch (Sim0MQException | SerializationException exception)
133 {
134 exception.printStackTrace();
135 }
136
137
138 messagenr++;
139 }
140
141 socket.close();
142
143 }
144
145 }
146
147 }