Getting the Message Out

The second classic pattern is one-way data distribution, in which a server pushes updates to a set of clients. Let’s look at an example that pushes out weather updates consisting of a zip code, temperature, and relative humidity. We’ll generate random values, just like the real weather stations do.

Example 1-6 shows the code for the server. We’ll use port 5556 for this application.

Example 1-6. Weather update server (wuserver.c)

//
//  Weather update server
//  Binds PUB socket to tcp://*:5556
//  Publishes random weather updates
//
#include "zhelpers.h"

int main (void)
{
    //  Prepare our context and publisher
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://*:5556");
    assert (rc == 0);
    rc = zmq_bind (publisher, "ipc://weather.ipc");
    assert (rc == 0);

    //  Initialize random number generator
    srandom ((unsigned) time (NULL));
    while (1) {
        //  Get values that will fool the boss
        int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;

        //  Send message to all subscribers
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        s_send (publisher, update);
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return 0;
}

There’s no start and no end to this stream of updates; it’s like a never-ending broadcast (Figure 1-4).

Publish-subscribe

Figure 1-4. Publish-subscribe

Example 1-7 shows the client application, which listens to the stream of updates and grabs anything to do with a specified zip code (by default, New York City, because that’s a great place to start any adventure).

Example 1-7. Weather update client (wuclient.c)

//
//  Weather update client
//  Connects SUB socket to tcp://localhost:5556
//  Collects weather updates and finds avg temp in zipcode
//
#include "zhelpers.h"

int main (int argc, char *argv [])
{
    void *context = zmq_ctx_new ();

    //  Socket to talk to server
    printf ("Collecting updates from weather server...\n");
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");
    assert (rc == 0);

    //  Subscribe to zipcode, default is NYC, 10001
    char *filter = (argc > 1)? argv [1]: "10001 ";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
    assert (rc == 0);

    //  Process 100 updates
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        char *string = s_recv (subscriber);

        int zipcode, temperature, relhumidity;
        sscanf (string, "%d %d %d",
            &zipcode, &temperature, &relhumidity);
        total_temp += temperature;
        free (string);
    }
    printf ("Average temperature for zipcode '%s' was %dF\n",
        filter, (int) (total_temp / update_nbr));

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return 0;
}

Note that when you use a SUB socket you must set a subscription using zmq_setsockopt() and SUBSCRIBE, as in this code. If you don’t set any subscription, you won’t get any messages. It’s a common mistake for beginners. The subscriber can set many subscriptions, which are added together. That is, if an update matches any subscription, the subscriber receives it. The subscriber can also cancel specific subscriptions. A subscription is often but not necessarily a printable string. See zmq_setsockopt() for how this works.

The PUB-SUB socket pair is asynchronous. The client does zmq_msg_recv(), in a loop (or once if that’s all it needs). Trying to send a message to a SUB socket will cause an error. Similarly, the service does zmq_msg_send() as often as it needs to, but must not do zmq_msg_recv() on a PUB socket.

In theory, with ØMQ sockets it does not matter which end connects and which end binds. However, in practice there are undocumented differences that I’ll come to later. For now, bind the PUB and connect the SUB, unless your network design makes that impossible.

There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but nonzero amount of time), the publisher may already be sending messages out.

This “slow joiner” symptom hits enough people, often enough, that we’re going to explain it in detail. Remember that ØMQ does asynchronous I/O (i.e., in the background). Say you have two nodes doing this, in this order:

  • Subscriber connects to an endpoint and receives and counts messages.

  • Publisher binds to an endpoint and immediately sends 1,000 messages.

The subscriber will most likely not receive anything. You’ll blink, check that you set a correct filter, and try again, and the subscriber will still not receive anything.

Making a TCP connection involves to and from handshaking that can take several milliseconds (msec), depending on your network and the number of hops between peers. In that time, ØMQ can send very many messages. For the sake of argument, assume it takes 5 msec to establish a connection, and that same link can handle 1M messages per second. During the 5 msec that the subscriber is connecting to the publisher, it takes the publisher only 1 msec to send out those 1K messages.

In Chapter 2, we’ll explain how to synchronize a publisher and subscribers so that you don’t start to publish data until the subscribers really are connected and ready. There is a simple (and stupid) way to delay the publisher, which is to sleep. Don’t do this in a real application, though, because it is extremely fragile as well as inelegant and slow. Use sleeps to prove to yourself what’s happening, and then read Chapter 2 to see how to do this right.

The alternative to synchronization is to simply assume that the published data stream is infinite and has no start and no end. One also assumes that the subscriber doesn’t care what transpired before it started up. This is how we built our weather client example.

So, the client subscribes to its chosen zip code and collects a thousand updates for that zip code. That means about 10 million updates from the server, if zip codes are randomly distributed. You can start the client, and then the server, and the client will keep working. You can stop and restart the server as often as you like, and the client will keep working. When the client has collected its thousand updates, it calculates the average, prints it, and exits.

Some points about the publish-subscribe pattern:

  • A subscriber can connect to more than one publisher, using one connect call each time. Data will then arrive and be interleaved (“fair queued”) so that no single publisher drowns out the others.

  • If a publisher has no connected subscribers, then it will simply drop all messages.

  • If you’re using TCP and a subscriber is slow, messages will queue up on the publisher. We’ll look at how to protect publishers against this by using the “high-water mark” in the next chapter.

  • From ØMQ v3.x, filtering happens on the publisher’s side when using a connected protocol (tcp or ipc). Using the epgm protocol, filtering happens on the subscriber’s side. In ØMQ v2.x, all filtering happened on the subscriber’s side.

This is how long it takes to receive and filter 10M messages on my laptop, which is a 2011-era Intel I7—fast, but nothing special:

ph@nb201103:~/work/git/zguide/examples/c$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 28F

real    0m4.470s
user    0m0.000s
sys     0m0.008s

Get ZeroMQ now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.