View Javadoc
1   package org.sim0mq.test;
2   
3   import org.zeromq.ZContext;
4   import org.zeromq.ZMQ;
5   import org.zeromq.ZMQException;
6   import org.zeromq.ZThread;
7   import zmq.ZError;
8   
9   import java.util.Scanner;
10  
11  /**
12   * <p>
13   * Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
14   * BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
15   * </p>
16   * $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
17   * initial version 6 Mar 2020 <br>
18   * @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
19   */
20  public class ZmqChat
21  {
22      /**
23       * Decentralized chat example
24       * @param args first arg is chat server, 2nd is us, 3rd is id
25       */
26      public static void main(String[] args)
27      {
28          if (args.length < 3)
29          {
30              System.out.println("Usage: ZmqChat ipaddress interface username");
31              System.out.println("Example: ZmqChat 192.168.55.123 localhost joe");
32              // System.exit(0);
33              args = new String[] {"localhost", "localhost", "tud"}; 
34          }
35          ZContext ctx = new ZContext();
36  
37          // cut string after dot
38          String addressWithoutLastPart = args[0].substring(0, args[0].lastIndexOf('.'));
39          ZThread.fork(ctx, new ListenerTask(), addressWithoutLastPart);
40          ZMQ.Socket broadcaster = ctx.createSocket(ZMQ.PUB);
41          broadcaster.bind(String.format("tcp://%s:9000", args[1]));
42          Scanner scanner = new Scanner(System.in);
43          while (!Thread.currentThread().isInterrupted())
44          {
45              String line = scanner.nextLine();
46              if (line.isEmpty())
47                  break;
48              broadcaster.send(String.format("%s: %s", args[2], line));
49          }
50          ctx.destroy();
51      }
52  
53      static class ListenerTask implements ZThread.IAttachedRunnable
54      {
55  
56          @Override
57          public void run(Object[] args, ZContext ctx, ZMQ.Socket pipe)
58          {
59              ZMQ.Socket listener = ctx.createSocket(ZMQ.SUB);
60              int address;
61              for (address = 1; address < 255; address++)
62                  listener.connect(String.format("tcp://%s.%d:9000", args[0], address));
63  
64              listener.subscribe(ZMQ.SUBSCRIPTION_ALL);
65              while (!Thread.currentThread().isInterrupted())
66              {
67                  String message;
68                  try
69                  {
70                      message = listener.recvStr();
71                  }
72                  catch (ZMQException e)
73                  {
74                      if (e.getErrorCode() == ZError.ETERM)
75                          break;
76                      e.printStackTrace();
77                      break;
78                  }
79                  if (!message.isEmpty())
80                      System.out.println(message);
81              }
82          }
83      }
84  
85  }