ZmqChat.java
package org.sim0mq.test;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import org.zeromq.ZThread;
import zmq.ZError;
import java.util.Scanner;
/**
* <p>
* Copyright (c) 2013-2017 Delft University of Technology, PO Box 5, 2600 AA, Delft, the Netherlands. All rights reserved. <br>
* BSD-style license. See <a href="http://sim0mq.org/docs/current/license.html">Sim0MQ License</a>.
* </p>
* $LastChangedDate: 2015-07-24 02:58:59 +0200 (Fri, 24 Jul 2015) $, @version $Revision: 1147 $, by $Author: averbraeck $,
* initial version 6 Mar 2020 <br>
* @author <a href="http://www.tbm.tudelft.nl/averbraeck">Alexander Verbraeck</a>
*/
public class ZmqChat
{
/**
* Decentralized chat example
* @param args first arg is chat server, 2nd is us, 3rd is id
*/
public static void main(String[] args)
{
if (args.length < 3)
{
System.out.println("Usage: ZmqChat ipaddress interface username");
System.out.println("Example: ZmqChat 192.168.55.123 localhost joe");
// System.exit(0);
args = new String[] {"localhost", "localhost", "tud"};
}
ZContext ctx = new ZContext();
// cut string after dot
String addressWithoutLastPart = args[0].substring(0, args[0].lastIndexOf('.'));
ZThread.fork(ctx, new ListenerTask(), addressWithoutLastPart);
ZMQ.Socket broadcaster = ctx.createSocket(ZMQ.PUB);
broadcaster.bind(String.format("tcp://%s:9000", args[1]));
Scanner scanner = new Scanner(System.in);
while (!Thread.currentThread().isInterrupted())
{
String line = scanner.nextLine();
if (line.isEmpty())
break;
broadcaster.send(String.format("%s: %s", args[2], line));
}
ctx.destroy();
}
static class ListenerTask implements ZThread.IAttachedRunnable
{
@Override
public void run(Object[] args, ZContext ctx, ZMQ.Socket pipe)
{
ZMQ.Socket listener = ctx.createSocket(ZMQ.SUB);
int address;
for (address = 1; address < 255; address++)
listener.connect(String.format("tcp://%s.%d:9000", args[0], address));
listener.subscribe(ZMQ.SUBSCRIPTION_ALL);
while (!Thread.currentThread().isInterrupted())
{
String message;
try
{
message = listener.recvStr();
}
catch (ZMQException e)
{
if (e.getErrorCode() == ZError.ETERM)
break;
e.printStackTrace();
break;
}
if (!message.isEmpty())
System.out.println(message);
}
}
}
}