Asynchronous Majordomo Pattern

The Majordomo implementation in the previous section is simple and stupid. The client is just the original Simple Pirate, wrapped up in a sexy API. When I fire up a client, broker, and worker on a test box, it can process 100,000 requests in about 14 seconds. That is partially due to the code, which cheerfully copies message frames around as if CPU cycles were free. But the real problem is that we’re doing network round-trips. ØMQ disables Nagle’s algorithm, but round-tripping is still slow.

Theory is great in theory, but in practice, practice is better. Let’s measure the actual cost of round-tripping with a simple test program. This sends a bunch of messages, first waiting for a reply to each message, and second as a batch, reading all the replies back as a batch. Both approaches do the same work, but they give very different results. We mock up a client, broker, and worker. The client task is shown in Example 4-46.

Example 4-46. Round-trip demonstrator (tripping.c)

//
//  Round-trip demonstrator
//
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. The client task signals to
//  main when it's ready.
//
#include "czmq.h"

static void
client_task (void *args, zctx_t *ctx, void *pipe)
{
    void *client = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (client, "tcp://localhost:5555");
    printf ("Setting up test...\n");
    zclock_sleep (100);

    int requests;
    int64_t start;

    printf ("Synchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {
        zstr_send (client, "hello");
        char *reply = zstr_recv (client);
        free (reply);
    }
    printf (" %d calls/second\n",
        (1000 * 10000) / (int) (zclock_time () - start));

    printf ("Asynchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++)
        zstr_send (client, "hello");
    for (requests = 0; requests < 100000; requests++) {
        char *reply = zstr_recv (client);
        free (reply);
    }
    printf (" %d calls/second\n",
        (1000 * 100000) / (int) (zclock_time () - start));
    zstr_send (pipe, "done");
}

The worker task is in Example 4-47. All it does is receive a message, and bounce it back the way it came.

Example 4-47. Round-trip demonstrator (tripping.c): worker task

static void *
worker_task (void *args)
{
    zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (worker, "tcp://localhost:5556");
    
    while (true) {
        zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return NULL;
}

Example 4-48 shows the broker task. It uses the zmq_proxy() function to switch messages between the frontend and backend.

Example 4-48. Round-trip demonstrator (tripping.c): broker task

static void *
broker_task (void *args)
{
    //  Prepare our context and sockets
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_bind (frontend, "tcp://*:5555");
    void *backend = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_bind (backend, "tcp://*:5556");
    zmq_proxy (frontend, backend, NULL);
    zctx_destroy (&ctx);
    return NULL;
}

Finally, Example 4-49 presents the main task, which starts the client, worker, and broker, and then runs until the client signals it to stop.

Example 4-49. Round-trip demonstrator (tripping.c): main task

int main (void)
{
    //  Create threads
    zctx_t *ctx = zctx_new ();
    void *client = zthread_fork (ctx, client_task, NULL);
    zthread_new (worker_task, NULL);
    zthread_new (broker_task, NULL);

    //  Wait for signal on client pipe
    char *signal = zstr_recv (client);
    free (signal);

    zctx_destroy (&ctx);
    return 0;
}

On my development box, running this program results in:

Setting up test...
Synchronous round-trip test...
 9057 calls/second
Asynchronous round-trip test...
 173010 calls/second

Note that the client thread does a small pause before starting. This is to get around one of the “features” of the router socket: if you send a message with the address of a peer that’s not yet connected, the message gets discarded. In this example we don’t use the load-balancing mechanism, so without the sleep, if the worker thread is too slow to connect it will lose messages, making a mess of our test.

As we see, round-tripping in the simplest case is 20 times slower than the asynchronous, “shove it down the pipe as fast as it’ll go” approach. Let’s see if we can apply this to Majordomo to make it faster.

First, we modify the client API to send and receive in two separate methods:

mdcli_t *mdcli_new     (char *broker);
void     mdcli_destroy (mdcli_t **self_p);
int      mdcli_send    (mdcli_t *self, char *service, zmsg_t **request_p);
zmsg_t  *mdcli_recv    (mdcli_t *self);

It’s literally a few minutes’ work to refactor the synchronous client API to become asynchronous, as shown in Example 4-50.

Example 4-50. Majordomo asynchronous client API (mdcliapi2.c)

/*  =====================================================================
 *  mdcliapi2.c - Majordomo Protocol Client API
 *  Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
 *  ===================================================================== */

#include "mdcliapi2.h"

//  Structure of our class
//  We access these properties only via class methods

struct _mdcli_t {
    zctx_t *ctx;                //  Our context
    char *broker;
    void *client;               //  Socket to broker
    int verbose;                //  Print activity to stdout
    int timeout;                //  Request timeout
};

//  ---------------------------------------------------------------------
//  Connect or reconnect to broker. In this asynchronous class, we use a
//  DEALER socket instead of a REQ socket; this lets us send any number
//  of requests without waiting for a reply.

void s_mdcli_connect_to_broker (mdcli_t *self)
{
    if (self->client)
        zsocket_destroy (self->ctx, self->client);
    self->client = zsocket_new (self->ctx, ZMQ_DEALER);
    zmq_connect (self->client, self->broker);
    if (self->verbose)
        zclock_log ("I: connecting to broker at %s...", self->broker);
}


//  The constructor and destructor are the same as in mdcliapi, except
//  we don't do retries, so there's no retries property.
...
...

The differences are:

  • We use a DEALER socket instead of REQ, so we emulate REQ with an empty delimiter frame before each request and each response.

  • We don’t retry requests; if the application needs to retry, it can do this itself.

  • We break the synchronous send method into separate send and recv methods.

  • The send method is asynchronous and returns immediately after sending. The caller can thus send a number of messages before getting a response.

  • The recv method waits for (with a timeout) one response and returns that to the caller.

The corresponding client test program, which sends 100,000 messages and then receives 100,000 back, is shown in Example 4-51.

Example 4-51. Majordomo client application (mdclient2.c)

//
//  Majordomo Protocol client example - asynchronous
//  Uses the mdcli API to hide all MDP aspects
//
//  Lets us build this source without creating a library
#include "mdcliapi2.c"

int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);

    int count;
    for (count = 0; count < 100000; count++) {
        zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        mdcli_send (session, "echo", &request);
    }
    for (count = 0; count < 100000; count++) {
        zmsg_t *reply = mdcli_recv (session);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  Interrupted by Ctrl-C
    }
    printf ("%d replies received\n", count);
    mdcli_destroy (&session);
    return 0;
}

The broker and worker are unchanged because we haven’t modified the protocol at all. We see an immediate improvement in performance. Here’s the synchronous client chugging through 100K request-reply cycles:

$ time mdclient
100000 requests/replies processed

real    0m14.088s
user    0m1.310s
sys     0m2.670s

And here’s the asynchronous client, with a single worker:

$ time mdclient2
100000 replies received

real    0m8.730s
user    0m0.920s
sys     0m1.550s

Twice as fast. Not bad, but let’s fire up 10 workers and see how it handles the traffic:

$ time mdclient2
100000 replies received

real    0m3.863s
user    0m0.730s
sys     0m0.470s

It isn’t fully asynchronous because workers get their messages on a strict last-used basis, but it will scale better with more workers. On my PC, after eight or so workers, it doesn’t get any faster. Four cores only stretches so far. But we got a 4x improvement in throughput with just a few minutes’ work. The broker is still unoptimized. It spends most of its time copying message frames around, instead of doing zero-copy, which it could. But we’re getting 25K reliable request-reply calls a second, with pretty low effort.

However, the asynchronous Majordomo pattern isn’t all roses. It has a fundamental weakness, namely that it cannot survive a broker crash without more work. If you look at the mdcliapi2 code you’ll see it does not attempt to reconnect after a failure. A proper reconnect would require the following:

  • A number on every request and a matching number on every reply, which would ideally require a change to the protocol to enforce

  • Tracking and holding onto all outstanding requests in the client API (i.e., those for which no reply has yet been received)

  • In case of fail over, for the client API to resend all outstanding requests to the broker

It’s not a deal breaker, but it does show that performance often means complexity. Is this worth doing for Majordomo? It depends on your use case. For a name lookup service you call once per session, no. For a web frontend serving thousands of clients, probably yes.

Get ZeroMQ now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.