O'Reilly logo

ZeroMQ by Pieter Hintjens

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Client-Side Reliability (Lazy Pirate Pattern)

We can get very simple, reliable request-reply with some changes to the client. We call this the Lazy Pirate pattern (Figure 4-1). Rather than doing a blocking receive, we:

  • Poll the REQ socket and receive from it only when it’s sure a reply has arrived.

  • Resend a request, if no reply has arrived within a timeout period.

  • Abandon the transaction if there is still no reply after several requests.

The Lazy Pirate pattern

Figure 4-1. The Lazy Pirate pattern

If we try to use a REQ socket in anything other than a strict send/receive fashion, we’ll get an error (technically, the REQ socket implements a small finite-state machine to enforce the send/receive ping-pong, so the error code is called “EFSM”). This is slightly annoying when we want to use REQ in a Pirate pattern, because we may send several requests before getting a reply, as you can see in Example 4-1. The pretty good brute-force solution is to close and reopen the REQ socket after an error.

Example 4-1. Lazy Pirate client (lpclient.c)

//
//  Lazy Pirate client
//  Use zmq_poll to do a safe request-reply
//  To run, start lpserver and then randomly kill/restart it
//
#include "czmq.h"

#define REQUEST_TIMEOUT     2500    //  msec (> 1000!)
#define REQUEST_RETRIES     3       //  Before we abandon
#define SERVER_ENDPOINT     "tcp://localhost:5555"

int main (void)
{
    zctx_t *ctx = zctx_new ();
    printf ("I: connecting to server...\n");
    void *client = zsocket_new (ctx, ZMQ_REQ);
    assert (client);
    zsocket_connect (client, SERVER_ENDPOINT);

    int sequence = 0;
    int retries_left = REQUEST_RETRIES;
    while (retries_left && !zctx_interrupted) {
        //  We send a request, then we work to get a reply
        char request [10];
        sprintf (request, "%d", ++sequence);
        zstr_send (client, request);

        int expect_reply = 1;
        while (expect_reply) {
            //  Poll socket for a reply, with timeout
            zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
            int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  Interrupted

Example 4-2 shows how we process a server reply and exit our loop if the reply is valid. If we didn’t receive a reply, we close the client socket and resend the request. We try a number of times before finally abandoning.

Example 4-2. Lazy Pirate client (lpclient.c): process server reply

            if (items [0].revents & ZMQ_POLLIN) {
                //  We got a reply from the server, must match sequence
                char *reply = zstr_recv (client);
                if (!reply)
                    break;      //  Interrupted
                if (atoi (reply) == sequence) {
                    printf ("I: server replied OK (%s)\n", reply);
                    retries_left = REQUEST_RETRIES;
                    expect_reply = 0;
                }
                else
                    printf ("E: malformed reply from server: %s\n",
                        reply);

                free (reply);
            }
            else
            if (--retries_left == 0) {
                printf ("E: server seems to be offline, abandoning\n");
                break;
            }
            else {
                printf ("W: no response from server, retrying...\n");
                //  Old socket is confused; close it and open a new one
                zsocket_destroy (ctx, client);
                printf ("I: reconnecting to server...\n");
                client = zsocket_new (ctx, ZMQ_REQ);
                zsocket_connect (client, SERVER_ENDPOINT);
                //  Send request again, on new socket
                zstr_send (client, request);
            }
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

We run this together with the matching server, shown in Example 4-3.

Example 4-3. Lazy Pirate server (lpserver.c)

//
//  Lazy Pirate server
//  Binds REQ socket to tcp://*:5555
//  Like hwserver except:
//   - echoes request as-is
//   - randomly runs slowly, or exits to simulate a crash.
//
#include "zhelpers.h"

int main (void)
{
    srandom ((unsigned) time (NULL));

    void *context = zmq_ctx_new ();
    void *server = zmq_socket (context, ZMQ_REP);
    zmq_bind (server, "tcp://*:5555");

    int cycles = 0;
    while (1) {
        char *request = s_recv (server);
        cycles++;

        //  Simulate various problems, after a few cycles
        if (cycles > 3 && randof (3) == 0) {
            printf ("I: simulating a crash\n");
            break;
        }
        else
        if (cycles > 3 && randof (3) == 0) {
            printf ("I: simulating CPU overload\n");
            sleep (2);
        }
        printf ("I: normal request (%s)\n", request);
        sleep (1);              //  Do some heavy work
        s_send (server, request);
        free (request);
    }
    zmq_close (server);
    zmq_ctx_destroy (context);
    return 0;
}

To run this test case, start the client and the server in two console windows. The server will randomly misbehave after a few messages. You can check the client’s response. Here is typical output from the server:

I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash

And here is the client’s response:

I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning

The client sequences each message and checks that replies come back exactly in order: that no requests or replies are lost, and no replies come back more than once or out of order. Run the test a few times until you’re convinced that this mechanism actually works. You don’t need sequence numbers in a production application; they just help us trust our design.

The client uses a REQ socket, and it does the brute-force close/reopen because REQ sockets impose that strict send/receive cycle. You might be tempted to use a DEALER instead, but it would not be a good decision. First, it would mean emulating the secret sauce that REQ does with envelopes (if you’ve forgotten what that is, it’s a good sign you don’t want to have to do it). Second, it would mean potentially getting back replies that you didn’t expect.

Handling failures only at the client works when we have a set of clients talking to a single server. This design can handle a server crash, but only if recovery means restarting that same server. If there’s a permanent error, such as a dead power supply on the server hardware, this approach won’t work. Because the application code in servers is usually the biggest source of failures in any architecture, depending on a single server is not a great idea.

So, the pros and cons are:

  • Pro: simple to understand and implement.

  • Pro: works easily with existing client and server application code.

  • Pro: ØMQ automatically retries the actual reconnection until it works.

  • Con: doesn’t do failover to backup or alternate servers.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required