1 package org.sim0mq.demo.reqrep;
2
3 import org.sim0mq.Sim0MQException;
4 import org.sim0mq.message.MessageStatus;
5 import org.sim0mq.message.SimulationMessage;
6 import org.zeromq.ZContext;
7 import org.zeromq.ZMQ;
8
9
10
11
12
13
14
15
16
17
18 public class Rep
19 {
20
21
22
23
24 protected Rep(final String[] args) throws Sim0MQException
25 {
26 if (args.length < 2)
27 {
28 System.err.println("Use as Rep startport #threads [#contexts]");
29 System.exit(-1);
30 }
31
32 int startport = Integer.parseInt(args[0]);
33 long numthreads = Integer.parseInt(args[1]);
34 int numcontexts = args.length > 2 ? Integer.parseInt(args[2]) : 1;
35
36 ZContext context = new ZContext(numcontexts);
37
38 for (int i = 0; i < numthreads; i++)
39 {
40 new RepThread(context, startport + i).start();
41 }
42
43 context.destroy();
44 }
45
46
47
48
49
50 public static void main(final String[] args) throws Sim0MQException
51 {
52 new Rep(args);
53 }
54
55
56 protected class RepThread extends Thread
57 {
58
59 private final ZContext context;
60
61
62 private final int port;
63
64
65
66
67
68 public RepThread(final ZContext context, final int port)
69 {
70 super();
71 this.context = context;
72 this.port = port;
73 }
74
75
76 @Override
77 public void run()
78 {
79
80 System.out.println("REP: Connecting to server with thread on port " + this.port);
81
82
83 ZMQ.Socket socket = this.context.createSocket(ZMQ.REP);
84 socket.bind("tcp://*:" + this.port);
85
86 String senderId = "REP." + this.port;
87 String receiverId = "REQ." + this.port;
88
89 int messagenr = 0;
90
91 while (true)
92 {
93 try
94 {
95
96 byte[] request = socket.recv(0);
97 Object[] message = SimulationMessage.decode(request);
98
99 if (message[4].toString().equals("STOP"))
100 {
101
102 Object[] reply = new Object[] { "STOPPED" };
103 socket.send(SimulationMessage.encodeUTF8(message[1], senderId, receiverId, "STOPPED", messagenr,
104 MessageStatus.NEW, reply), 0);
105 break;
106 }
107
108
109 if (!message[3].toString().equals(senderId))
110 {
111 System.err.println(SimulationMessage.print(message));
112 System.err.println("receive message " + messagenr + " for port " + this.port + ", receiver = "
113 + message[3].toString() + ", expected " + senderId);
114 }
115 if (((Number) message[7]).intValue() == 0)
116 {
117 System.err.println(SimulationMessage.print(message));
118 System.err.println("receive message " + messagenr + " for port " + this.port + ", #fields = 0");
119 }
120 else if (((Number) message[5]).intValue() != messagenr)
121 {
122 System.err.println(SimulationMessage.print(message));
123 System.err.println(
124 "receive message " + messagenr + " for port " + this.port + ", payload# = " + message[5]);
125 }
126
127
128 Object[] reply = new Object[] { message[8], message[9] };
129 socket.send(SimulationMessage.encodeUTF8(message[1], senderId, receiverId, "REPLY", messagenr,
130 MessageStatus.NEW, reply), 0);
131 }
132 catch (Sim0MQException exception)
133 {
134 exception.printStackTrace();
135 }
136
137
138 messagenr++;
139 }
140
141 socket.close();
142
143 }
144
145 }
146
147 }