The second classic pattern is one-way data distribution, in which a server pushes updates to a set of clients. Letâs look at an example that pushes out weather updates consisting of a zip code, temperature, and relative humidity. Weâll generate random values, just like the real weather stations do.
Example 1-6 shows the code for the server. Weâll use port 5556 for this application.
Example 1-6. Weather update server (wuserver.c)
//
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates
//
#include "zhelpers.h"
int
main
(
void
)
{
// Prepare our context and publisher
void
*
context
=
zmq_ctx_new
();
void
*
publisher
=
zmq_socket
(
context
,
ZMQ_PUB
);
int
rc
=
zmq_bind
(
publisher
,
"tcp://*:5556"
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
publisher
,
"ipc://weather.ipc"
);
assert
(
rc
==
0
);
// Initialize random number generator
srandom
((
unsigned
)
time
(
NULL
));
while
(
1
)
{
// Get values that will fool the boss
int
zipcode
,
temperature
,
relhumidity
;
zipcode
=
randof
(
100000
);
temperature
=
randof
(
215
)
-
80
;
relhumidity
=
randof
(
50
)
+
10
;
// Send message to all subscribers
char
update
[
20
];
sprintf
(
update
,
"%05d %d %d"
,
zipcode
,
temperature
,
relhumidity
);
s_send
(
publisher
,
update
);
}
zmq_close
(
publisher
);
zmq_ctx_destroy
(
context
);
return
0
;
}
Thereâs no start and no end to this stream of updates; itâs like a never-ending broadcast (Figure 1-4).
Example 1-7 shows the client application, which listens to the stream of updates and grabs anything to do with a specified zip code (by default, New York City, because thatâs a great place to start any adventure).
Example 1-7. Weather update client (wuclient.c)
//
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode
//
#include "zhelpers.h"
int
main
(
int
argc
,
char
*
argv
[])
{
void
*
context
=
zmq_ctx_new
();
// Socket to talk to server
printf
(
"Collecting updates from weather server...
\n
"
);
void
*
subscriber
=
zmq_socket
(
context
,
ZMQ_SUB
);
int
rc
=
zmq_connect
(
subscriber
,
"tcp://localhost:5556"
);
assert
(
rc
==
0
);
// Subscribe to zipcode, default is NYC, 10001
char
*
filter
=
(
argc
>
1
)
?
argv
[
1
]
:
"10001 "
;
rc
=
zmq_setsockopt
(
subscriber
,
ZMQ_SUBSCRIBE
,
filter
,
strlen
(
filter
));
assert
(
rc
==
0
);
// Process 100 updates
int
update_nbr
;
long
total_temp
=
0
;
for
(
update_nbr
=
0
;
update_nbr
<
100
;
update_nbr
++
)
{
char
*
string
=
s_recv
(
subscriber
);
int
zipcode
,
temperature
,
relhumidity
;
sscanf
(
string
,
"%d %d %d"
,
&
zipcode
,
&
temperature
,
&
relhumidity
);
total_temp
+=
temperature
;
free
(
string
);
}
printf
(
"Average temperature for zipcode '%s' was %dF
\n
"
,
filter
,
(
int
)
(
total_temp
/
update_nbr
));
zmq_close
(
subscriber
);
zmq_ctx_destroy
(
context
);
return
0
;
}
Note that when you use a SUB socket you must set a subscription using zmq_setsockopt()
and SUBSCRIBE, as in this code.
If you donât set any subscription, you wonât get any messages. Itâs a
common mistake for beginners. The subscriber can set many subscriptions,
which are added together. That is, if an update matches any subscription,
the subscriber receives it. The subscriber can also cancel specific
subscriptions. A subscription is often but not necessarily a printable
string. See zmq_setsockopt()
for how
this works.
The PUB-SUB socket pair is asynchronous. The client does
zmq_msg_recv()
, in a loop (or once if
thatâs all it needs). Trying to send a message to a SUB socket will cause
an error. Similarly, the service does zmq_msg_send()
as often as it needs to, but must
not do zmq_msg_recv()
on a PUB
socket.
In theory, with ÃMQ sockets it does not matter which end connects and which end binds. However, in practice there are undocumented differences that Iâll come to later. For now, bind the PUB and connect the SUB, unless your network design makes that impossible.
There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but nonzero amount of time), the publisher may already be sending messages out.
This âslow joinerâ symptom hits enough people, often enough, that weâre going to explain it in detail. Remember that ÃMQ does asynchronous I/O (i.e., in the background). Say you have two nodes doing this, in this order:
Subscriber connects to an endpoint and receives and counts messages.
Publisher binds to an endpoint and immediately sends 1,000 messages.
The subscriber will most likely not receive anything. Youâll blink, check that you set a correct filter, and try again, and the subscriber will still not receive anything.
Making a TCP connection involves to and from handshaking that can take several milliseconds (msec), depending on your network and the number of hops between peers. In that time, ÃMQ can send very many messages. For the sake of argument, assume it takes 5 msec to establish a connection, and that same link can handle 1M messages per second. During the 5 msec that the subscriber is connecting to the publisher, it takes the publisher only 1 msec to send out those 1K messages.
In Chapter 2, weâll explain how to synchronize a publisher and subscribers so that you donât start to publish data until the subscribers really are connected and ready. There is a simple (and stupid) way to delay the publisher, which is to sleep. Donât do this in a real application, though, because it is extremely fragile as well as inelegant and slow. Use sleeps to prove to yourself whatâs happening, and then read Chapter 2 to see how to do this right.
The alternative to synchronization is to simply assume that the published data stream is infinite and has no start and no end. One also assumes that the subscriber doesnât care what transpired before it started up. This is how we built our weather client example.
So, the client subscribes to its chosen zip code and collects a thousand updates for that zip code. That means about 10 million updates from the server, if zip codes are randomly distributed. You can start the client, and then the server, and the client will keep working. You can stop and restart the server as often as you like, and the client will keep working. When the client has collected its thousand updates, it calculates the average, prints it, and exits.
Some points about the publish-subscribe pattern:
A subscriber can connect to more than one publisher, using one
connect
call each time. Data will then arrive and be interleaved (âfair queuedâ) so that no single publisher drowns out the others.If a publisher has no connected subscribers, then it will simply drop all messages.
If youâre using TCP and a subscriber is slow, messages will queue up on the publisher. Weâll look at how to protect publishers against this by using the âhigh-water markâ in the next chapter.
From ÃMQ v3.x, filtering happens on the publisherâs side when using a connected protocol (
tcp
oripc
). Using theepgm
protocol, filtering happens on the subscriberâs side. In ÃMQ v2.x, all filtering happened on the subscriberâs side.
This is how long it takes to receive and filter 10M messages on my laptop, which is a 2011-era Intel I7âfast, but nothing special:
ph@nb201103:~/work/git/zguide/examples/c$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 28F
real 0m4.470s
user 0m0.000s
sys 0m0.008s
Get ZeroMQ now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.