1 package org.sim0mq.test.threads;
2
3 import java.util.LinkedHashMap;
4 import java.util.Map;
5
6 import org.zeromq.SocketType;
7 import org.zeromq.ZContext;
8 import org.zeromq.ZMQ;
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 public class PushPullThreads
24 {
25
26 private ZContext ctx;
27
28
29 private Map<Long, ZMQ.Socket> socketMap = new LinkedHashMap<>();
30
31
32 private final int totalPushThreads;
33
34
35
36
37 public static void main(final String[] args)
38 {
39 new PushPullThreads();
40 }
41
42
43
44
45 public PushPullThreads()
46 {
47 this.totalPushThreads = 1000;
48 this.ctx = new ZContext(1);
49 for (int i = 0; i < this.totalPushThreads; i++)
50 {
51 new ProducerThread(this, i);
52 }
53 new ConsumerThread(this.ctx).start();
54 }
55
56
57
58
59
60 public synchronized void call(final String message)
61 {
62 long threadId = Thread.currentThread().getId();
63 ZMQ.Socket pushSocket = this.socketMap.get(threadId);
64 if (pushSocket == null)
65 {
66 pushSocket = this.ctx.createSocket(SocketType.PUSH);
67 pushSocket.setHWM(100000);
68 pushSocket.connect("inproc://bus");
69 this.socketMap.put(threadId, pushSocket);
70 System.out.println("Socket added for thread " + threadId);
71 }
72 pushSocket.send(message, ZMQ.DONTWAIT);
73 }
74
75
76 class ProducerThread extends Thread
77 {
78
79 private int threadNr;
80
81
82 private PushPullThreads program;
83
84
85
86
87
88 ProducerThread(final PushPullThreads program, final int threadNr)
89 {
90 this.program = program;
91 this.threadNr = threadNr;
92 start();
93 }
94
95
96 @Override
97 public void run()
98 {
99 for (int i = 0; i < 1000; i++)
100 {
101 this.program.call("Message from thread " + this.threadNr + " #" + i);
102 }
103 this.program.call("STOP");
104 }
105 }
106
107
108 class ConsumerThread extends Thread
109 {
110
111 private ZContext ctx;
112
113
114
115
116
117 ConsumerThread(final ZContext ctx)
118 {
119 this.ctx = ctx;
120 }
121
122
123 @Override
124 public void run()
125 {
126 ZMQ.Socket pullSocket = this.ctx.createSocket(SocketType.PULL);
127 pullSocket.bind("inproc://bus");
128 int stopCount = 0;
129 int msgCount = 0;
130 while (true)
131 {
132 String msg = pullSocket.recvStr(0);
133 if ("STOP".equals(msg))
134 {
135 stopCount++;
136 if (stopCount == totalPushThreads)
137 {
138 break;
139 }
140 }
141 else
142 {
143 msgCount++;
144 System.out.println(msg);
145 }
146 }
147 System.out.println("# messages received = " + msgCount);
148 }
149
150 }
151 }