View Javadoc
1   package org.sim0mq.demo.reqrep;
2   
3   import org.djutils.serialization.SerializationException;
4   import org.sim0mq.Sim0MQException;
5   import org.sim0mq.message.MessageStatus;
6   import org.sim0mq.message.SimulationMessage;
7   import org.zeromq.ZContext;
8   import org.zeromq.ZMQ;
9   
10  /**
11   * <p>
12   * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
13   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
14   * </p>
15   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
16   * initial version Aug 4, 2018 <br>
17   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
18   */
19  public class Rep
20  {
21      /**
22       * @param args command line arguments
23       * @throws Sim0MQException on error
24       */
25      protected Rep(final String[] args) throws Sim0MQException
26      {
27          if (args.length < 2)
28          {
29              System.err.println("Use as Rep startport #threads [#contexts]");
30              System.exit(-1);
31          }
32  
33          int startport = Integer.parseInt(args[0]);
34          long numthreads = Integer.parseInt(args[1]);
35          int numcontexts = args.length > 2 ? Integer.parseInt(args[2]) : 1;
36  
37          ZContext context = new ZContext(numcontexts);
38  
39          for (int i = 0; i < numthreads; i++)
40          {
41              new RepThread(context, startport + i).start();
42          }
43  
44          context.destroy();
45      }
46  
47      /**
48       * @param args command line arguments
49       * @throws Sim0MQException on error
50       */
51      public static void main(final String[] args) throws Sim0MQException
52      {
53          new Rep(args);
54      }
55  
56      /** The worker thread for the REP requests. */
57      protected class RepThread extends Thread
58      {
59          /** the (shared) context. */
60          private final ZContext context;
61  
62          /** the port to use. */
63          private final int port;
64  
65          /**
66           * @param context the (shared) context
67           * @param port the port to use
68           */
69          public RepThread(final ZContext context, final int port)
70          {
71              super();
72              this.context = context;
73              this.port = port;
74          }
75  
76          /** {@inheritDoc} */
77          @Override
78          public void run()
79          {
80              // Socket to talk to server
81              System.out.println("REP: Connecting to server with thread on port " + this.port);
82  
83              // Socket to talk to clients
84              ZMQ.Socket socket = this.context.createSocket(ZMQ.REP);
85              socket.bind("tcp://*:" + this.port);
86  
87              String senderId = "REP." + this.port;
88              String receiverId = "REQ." + this.port;
89  
90              int messagenr = 0;
91  
92              while (true)
93              {
94                  try
95                  {
96                      // Wait for next request from the client
97                      byte[] request = socket.recv(0);
98                      Object[] message = SimulationMessage.decode(request);
99  
100                     if (message[4].toString().equals("STOP"))
101                     {
102                         // send a reply
103                         Object[] reply = new Object[] { "STOPPED" };
104                         socket.send(SimulationMessage.encodeUTF8(message[1], senderId, receiverId, "STOPPED", messagenr,
105                                 MessageStatus.NEW, reply), 0);
106                         break;
107                     }
108 
109                     // check the message
110                     if (!message[3].toString().equals(senderId))
111                     {
112                         System.err.println(SimulationMessage.print(message));
113                         System.err.println("receive message " + messagenr + " for port " + this.port + ", receiver = "
114                                 + message[3].toString() + ", expected " + senderId);
115                     }
116                     if (((Number) message[7]).intValue() == 0)
117                     {
118                         System.err.println(SimulationMessage.print(message));
119                         System.err.println("receive message " + messagenr + " for port " + this.port + ", #fields = 0");
120                     }
121                     else if (((Number) message[5]).intValue() != messagenr)
122                     {
123                         System.err.println(SimulationMessage.print(message));
124                         System.err.println(
125                                 "receive message " + messagenr + " for port " + this.port + ", payload# = " + message[5]);
126                     }
127 
128                     // send a reply
129                     Object[] reply = new Object[] { message[8], message[9] };
130                     socket.send(SimulationMessage.encodeUTF8(message[1], senderId, receiverId, "REPLY", messagenr,
131                             MessageStatus.NEW, reply), 0);
132                 }
133                 catch (Sim0MQException | SerializationException exception)
134                 {
135                     exception.printStackTrace();
136                 }
137                 
138                 // increase the messagenr
139                 messagenr++;
140             }
141 
142             socket.close();
143 
144         }
145 
146     }
147 
148 }