1 package org.sim0mq.test;
2
3 import org.zeromq.ZContext;
4 import org.zeromq.ZMQ;
5 import org.zeromq.ZMQException;
6 import org.zeromq.ZThread;
7 import zmq.ZError;
8
9 import java.util.Scanner;
10
11
12
13
14
15
16
17
18
19
20 public class ZmqChat
21 {
22
23
24
25
26 public static void main(String[] args)
27 {
28 if (args.length < 3)
29 {
30 System.out.println("Usage: ZmqChat ipaddress interface username");
31 System.out.println("Example: ZmqChat 192.168.55.123 localhost joe");
32
33 args = new String[] {"localhost", "localhost", "tud"};
34 }
35 ZContext ctx = new ZContext();
36
37
38 String addressWithoutLastPart = args[0].substring(0, args[0].lastIndexOf('.'));
39 ZThread.fork(ctx, new ListenerTask(), addressWithoutLastPart);
40 ZMQ.Socket broadcaster = ctx.createSocket(ZMQ.PUB);
41 broadcaster.bind(String.format("tcp://%s:9000", args[1]));
42 Scanner scanner = new Scanner(System.in);
43 while (!Thread.currentThread().isInterrupted())
44 {
45 String line = scanner.nextLine();
46 if (line.isEmpty())
47 break;
48 broadcaster.send(String.format("%s: %s", args[2], line));
49 }
50 ctx.destroy();
51 }
52
53 static class ListenerTask implements ZThread.IAttachedRunnable
54 {
55
56 @Override
57 public void run(Object[] args, ZContext ctx, ZMQ.Socket pipe)
58 {
59 ZMQ.Socket listener = ctx.createSocket(ZMQ.SUB);
60 int address;
61 for (address = 1; address < 255; address++)
62 listener.connect(String.format("tcp://%s.%d:9000", args[0], address));
63
64 listener.subscribe(ZMQ.SUBSCRIPTION_ALL);
65 while (!Thread.currentThread().isInterrupted())
66 {
67 String message;
68 try
69 {
70 message = listener.recvStr();
71 }
72 catch (ZMQException e)
73 {
74 if (e.getErrorCode() == ZError.ETERM)
75 break;
76 e.printStackTrace();
77 break;
78 }
79 if (!message.isEmpty())
80 System.out.println(message);
81 }
82 }
83 }
84
85 }