View Javadoc
1   package org.sim0mq.demo.reqrep;
2   
3   import org.djutils.serialization.SerializationException;
4   import org.sim0mq.Sim0MQException;
5   import org.sim0mq.message.Sim0MQMessage;
6   import org.zeromq.SocketType;
7   import org.zeromq.ZContext;
8   import org.zeromq.ZMQ;
9   
10  /**
11   * <p>
12   * Copyright (c) 2013-2020 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
13   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
14   * </p>
15   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
16   * initial version Aug 4, 2018 <br>
17   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
18   */
19  public class Rep
20  {
21      /**
22       * @param args command line arguments
23       * @throws Sim0MQException on error
24       */
25      protected Rep(final String[] args) throws Sim0MQException
26      {
27          if (args.length < 2)
28          {
29              System.err.println("Use as Rep startport #threads [#contexts]");
30              System.exit(-1);
31          }
32  
33          int startport = Integer.parseInt(args[0]);
34          long numthreads = Integer.parseInt(args[1]);
35          int numcontexts = args.length > 2 ? Integer.parseInt(args[2]) : 1;
36  
37          ZContext context = new ZContext(numcontexts);
38  
39          for (int i = 0; i < numthreads; i++)
40          {
41              new RepThread(context, startport + i).start();
42          }
43  
44          context.destroy();
45      }
46  
47      /**
48       * @param args command line arguments
49       * @throws Sim0MQException on error
50       */
51      public static void main(final String[] args) throws Sim0MQException
52      {
53          new Rep(args);
54      }
55  
56      /** The worker thread for the REP requests. */
57      protected class RepThread extends Thread
58      {
59          /** the (shared) context. */
60          private final ZContext context;
61  
62          /** the port to use. */
63          private final int port;
64  
65          /**
66           * @param context the (shared) context
67           * @param port the port to use
68           */
69          public RepThread(final ZContext context, final int port)
70          {
71              this.context = context;
72              this.port = port;
73          }
74  
75          /** {@inheritDoc} */
76          @Override
77          public void run()
78          {
79              // Socket to talk to server
80              System.out.println("REP: Connecting to server with thread on port " + this.port);
81  
82              // Socket to talk to clients
83              ZMQ.Socket socket = this.context.createSocket(SocketType.REP);
84              socket.bind("tcp://*:" + this.port);
85  
86              String senderId = "REP." + this.port;
87              String receiverId = "REQ." + this.port;
88  
89              int messagenr = 0;
90  
91              while (true)
92              {
93                  try
94                  {
95                      // Wait for next request from the client
96                      byte[] request = socket.recv(0);
97                      Object[] message = Sim0MQMessage.decode(request).createObjectArray();
98  
99                      if (message[4].toString().equals("STOP"))
100                     {
101                         // send a reply
102                         Object[] reply = new Object[] { "STOPPED" };
103                         socket.send(Sim0MQMessage.encodeUTF8(true, message[1], senderId, receiverId, "STOPPED", messagenr,
104                                 reply), 0);
105                         break;
106                     }
107 
108                     // check the message
109                     if (!message[3].toString().equals(senderId))
110                     {
111                         System.err.println(Sim0MQMessage.print(message));
112                         System.err.println("receive message " + messagenr + " for port " + this.port + ", receiver = "
113                                 + message[3].toString() + ", expected " + senderId);
114                     }
115                     if (((Number) message[7]).intValue() == 0)
116                     {
117                         System.err.println(Sim0MQMessage.print(message));
118                         System.err.println("receive message " + messagenr + " for port " + this.port + ", #fields = 0");
119                     }
120                     else if (((Number) message[5]).intValue() != messagenr)
121                     {
122                         System.err.println(Sim0MQMessage.print(message));
123                         System.err.println(
124                                 "receive message " + messagenr + " for port " + this.port + ", payload# = " + message[5]);
125                     }
126 
127                     // send a reply
128                     Object[] reply = new Object[] { message[8], message[9] };
129                     socket.send(Sim0MQMessage.encodeUTF8(true, message[1], senderId, receiverId, "REPLY", messagenr,
130                             reply), 0);
131                 }
132                 catch (Sim0MQException | SerializationException exception)
133                 {
134                     exception.printStackTrace();
135                 }
136                 
137                 // increase the messagenr
138                 messagenr++;
139             }
140 
141             socket.close();
142 
143         }
144 
145     }
146 
147 }