View Javadoc
1   package org.sim0mq.demo.reqrep;
2   
3   import org.sim0mq.Sim0MQException;
4   import org.sim0mq.message.MessageStatus;
5   import org.sim0mq.message.SimulationMessage;
6   import org.zeromq.ZContext;
7   import org.zeromq.ZMQ;
8   
9   /**
10   * <p>
11   * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
12   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
13   * </p>
14   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
15   * initial version Aug 4, 2018 <br>
16   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
17   */
18  public class Rep
19  {
20      /**
21       * @param args command line arguments
22       * @throws Sim0MQException on error
23       */
24      protected Rep(final String[] args) throws Sim0MQException
25      {
26          if (args.length < 2)
27          {
28              System.err.println("Use as Rep startport #threads [#contexts]");
29              System.exit(-1);
30          }
31  
32          int startport = Integer.parseInt(args[0]);
33          long numthreads = Integer.parseInt(args[1]);
34          int numcontexts = args.length > 2 ? Integer.parseInt(args[2]) : 1;
35  
36          ZContext context = new ZContext(numcontexts);
37  
38          for (int i = 0; i < numthreads; i++)
39          {
40              new RepThread(context, startport + i).start();
41          }
42  
43          context.destroy();
44      }
45  
46      /**
47       * @param args command line arguments
48       * @throws Sim0MQException on error
49       */
50      public static void main(final String[] args) throws Sim0MQException
51      {
52          new Rep(args);
53      }
54  
55      /** The worker thread for the REP requests. */
56      protected class RepThread extends Thread
57      {
58          /** the (shared) context. */
59          private final ZContext context;
60  
61          /** the port to use. */
62          private final int port;
63  
64          /**
65           * @param context the (shared) context
66           * @param port the port to use
67           */
68          public RepThread(final ZContext context, final int port)
69          {
70              super();
71              this.context = context;
72              this.port = port;
73          }
74  
75          /** {@inheritDoc} */
76          @Override
77          public void run()
78          {
79              // Socket to talk to server
80              System.out.println("REP: Connecting to server with thread on port " + this.port);
81  
82              // Socket to talk to clients
83              ZMQ.Socket socket = this.context.createSocket(ZMQ.REP);
84              socket.bind("tcp://*:" + this.port);
85  
86              String senderId = "REP." + this.port;
87              String receiverId = "REQ." + this.port;
88  
89              int messagenr = 0;
90  
91              while (true)
92              {
93                  try
94                  {
95                      // Wait for next request from the client
96                      byte[] request = socket.recv(0);
97                      Object[] message = SimulationMessage.decode(request);
98  
99                      if (message[4].toString().equals("STOP"))
100                     {
101                         // send a reply
102                         Object[] reply = new Object[] { "STOPPED" };
103                         socket.send(SimulationMessage.encodeUTF8(message[1], senderId, receiverId, "STOPPED", messagenr,
104                                 MessageStatus.NEW, reply), 0);
105                         break;
106                     }
107 
108                     // check the message
109                     if (!message[3].toString().equals(senderId))
110                     {
111                         System.err.println(SimulationMessage.print(message));
112                         System.err.println("receive message " + messagenr + " for port " + this.port + ", receiver = "
113                                 + message[3].toString() + ", expected " + senderId);
114                     }
115                     if (((Number) message[7]).intValue() == 0)
116                     {
117                         System.err.println(SimulationMessage.print(message));
118                         System.err.println("receive message " + messagenr + " for port " + this.port + ", #fields = 0");
119                     }
120                     else if (((Number) message[5]).intValue() != messagenr)
121                     {
122                         System.err.println(SimulationMessage.print(message));
123                         System.err.println(
124                                 "receive message " + messagenr + " for port " + this.port + ", payload# = " + message[5]);
125                     }
126 
127                     // send a reply
128                     Object[] reply = new Object[] { message[8], message[9] };
129                     socket.send(SimulationMessage.encodeUTF8(message[1], senderId, receiverId, "REPLY", messagenr,
130                             MessageStatus.NEW, reply), 0);
131                 }
132                 catch (Sim0MQException exception)
133                 {
134                     exception.printStackTrace();
135                 }
136                 
137                 // increase the messagenr
138                 messagenr++;
139             }
140 
141             socket.close();
142 
143         }
144 
145     }
146 
147 }