The Majordomo implementation in the previous section is simple and stupid. The client is just the original Simple Pirate, wrapped up in a sexy API. When I fire up a client, broker, and worker on a test box, it can process 100,000 requests in about 14 seconds. That is partially due to the code, which cheerfully copies message frames around as if CPU cycles were free. But the real problem is that weâre doing network round-trips. ÃMQ disables Nagleâs algorithm, but round-tripping is still slow.
Theory is great in theory, but in practice, practice is better. Letâs measure the actual cost of round-tripping with a simple test program. This sends a bunch of messages, first waiting for a reply to each message, and second as a batch, reading all the replies back as a batch. Both approaches do the same work, but they give very different results. We mock up a client, broker, and worker. The client task is shown in Example 4-46.
Example 4-46. Round-trip demonstrator (tripping.c)
//
// Round-trip demonstrator
//
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. The client task signals to
// main when it's ready.
//
#include "czmq.h"
static
void
client_task
(
void
*
args
,
zctx_t
*
ctx
,
void
*
pipe
)
{
void
*
client
=
zsocket_new
(
ctx
,
ZMQ_DEALER
);
zsocket_connect
(
client
,
"tcp://localhost:5555"
);
printf
(
"Setting up test...
\n
"
);
zclock_sleep
(
100
);
int
requests
;
int64_t
start
;
printf
(
"Synchronous round-trip test...
\n
"
);
start
=
zclock_time
();
for
(
requests
=
0
;
requests
<
10000
;
requests
++
)
{
zstr_send
(
client
,
"hello"
);
char
*
reply
=
zstr_recv
(
client
);
free
(
reply
);
}
printf
(
" %d calls/second
\n
"
,
(
1000
*
10000
)
/
(
int
)
(
zclock_time
()
-
start
));
printf
(
"Asynchronous round-trip test...
\n
"
);
start
=
zclock_time
();
for
(
requests
=
0
;
requests
<
100000
;
requests
++
)
zstr_send
(
client
,
"hello"
);
for
(
requests
=
0
;
requests
<
100000
;
requests
++
)
{
char
*
reply
=
zstr_recv
(
client
);
free
(
reply
);
}
printf
(
" %d calls/second
\n
"
,
(
1000
*
100000
)
/
(
int
)
(
zclock_time
()
-
start
));
zstr_send
(
pipe
,
"done"
);
}
The worker task is in Example 4-47. All it does is receive a message, and bounce it back the way it came.
Example 4-47. Round-trip demonstrator (tripping.c): worker task
static
void
*
worker_task
(
void
*
args
)
{
zctx_t
*
ctx
=
zctx_new
();
void
*
worker
=
zsocket_new
(
ctx
,
ZMQ_DEALER
);
zsocket_connect
(
worker
,
"tcp://localhost:5556"
);
while
(
true
)
{
zmsg_t
*
msg
=
zmsg_recv
(
worker
);
zmsg_send
(
&
msg
,
worker
);
}
zctx_destroy
(
&
ctx
);
return
NULL
;
}
Example 4-48 shows the broker task. It uses the
zmq_proxy()
function to switch messages between the
frontend and backend.
Example 4-48. Round-trip demonstrator (tripping.c): broker task
static
void
*
broker_task
(
void
*
args
)
{
// Prepare our context and sockets
zctx_t
*
ctx
=
zctx_new
();
void
*
frontend
=
zsocket_new
(
ctx
,
ZMQ_DEALER
);
zsocket_bind
(
frontend
,
"tcp://*:5555"
);
void
*
backend
=
zsocket_new
(
ctx
,
ZMQ_DEALER
);
zsocket_bind
(
backend
,
"tcp://*:5556"
);
zmq_proxy
(
frontend
,
backend
,
NULL
);
zctx_destroy
(
&
ctx
);
return
NULL
;
}
Finally, Example 4-49 presents the main task, which starts the client, worker, and broker, and then runs until the client signals it to stop.
Example 4-49. Round-trip demonstrator (tripping.c): main task
int
main
(
void
)
{
// Create threads
zctx_t
*
ctx
=
zctx_new
();
void
*
client
=
zthread_fork
(
ctx
,
client_task
,
NULL
);
zthread_new
(
worker_task
,
NULL
);
zthread_new
(
broker_task
,
NULL
);
// Wait for signal on client pipe
char
*
signal
=
zstr_recv
(
client
);
free
(
signal
);
zctx_destroy
(
&
ctx
);
return
0
;
}
On my development box, running this program results in:
Setting up test... Synchronous round-trip test... 9057 calls/second Asynchronous round-trip test... 173010 calls/second
Note that the client thread does a small pause before starting. This is to get around one of the âfeaturesâ of the router socket: if you send a message with the address of a peer thatâs not yet connected, the message gets discarded. In this example we donât use the load-balancing mechanism, so without the sleep, if the worker thread is too slow to connect it will lose messages, making a mess of our test.
As we see, round-tripping in the simplest case is 20 times slower than the asynchronous, âshove it down the pipe as fast as itâll goâ approach. Letâs see if we can apply this to Majordomo to make it faster.
First, we modify the client API to send and receive in two separate methods:
mdcli_t
*
mdcli_new
(
char
*
broker
);
void
mdcli_destroy
(
mdcli_t
**
self_p
);
int
mdcli_send
(
mdcli_t
*
self
,
char
*
service
,
zmsg_t
**
request_p
);
zmsg_t
*
mdcli_recv
(
mdcli_t
*
self
);
Itâs literally a few minutesâ work to refactor the synchronous client API to become asynchronous, as shown in Example 4-50.
Example 4-50. Majordomo asynchronous client API (mdcliapi2.c)
/* =====================================================================
* mdcliapi2.c - Majordomo Protocol Client API
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
* ===================================================================== */
#include "mdcliapi2.h"
// Structure of our class
// We access these properties only via class methods
struct
_mdcli_t
{
zctx_t
*
ctx
;
// Our context
char
*
broker
;
void
*
client
;
// Socket to broker
int
verbose
;
// Print activity to stdout
int
timeout
;
// Request timeout
};
// ---------------------------------------------------------------------
// Connect or reconnect to broker. In this asynchronous class, we use a
// DEALER socket instead of a REQ socket; this lets us send any number
// of requests without waiting for a reply.
void
s_mdcli_connect_to_broker
(
mdcli_t
*
self
)
{
if
(
self
->
client
)
zsocket_destroy
(
self
->
ctx
,
self
->
client
);
self
->
client
=
zsocket_new
(
self
->
ctx
,
ZMQ_DEALER
);
zmq_connect
(
self
->
client
,
self
->
broker
);
if
(
self
->
verbose
)
zclock_log
(
"I: connecting to broker at %s..."
,
self
->
broker
);
}
// The constructor and destructor are the same as in mdcliapi, except
// we don't do retries, so there's no retries property.
...
...
The differences are:
We use a DEALER socket instead of REQ, so we emulate REQ with an empty delimiter frame before each request and each response.
We donât retry requests; if the application needs to retry, it can do this itself.
We break the synchronous
send
method into separatesend
andrecv
methods.The
send
method is asynchronous and returns immediately after sending. The caller can thus send a number of messages before getting a response.The
recv
method waits for (with a timeout) one response and returns that to the caller.
The corresponding client test program, which sends 100,000 messages and then receives 100,000 back, is shown in Example 4-51.
Example 4-51. Majordomo client application (mdclient2.c)
//
// Majordomo Protocol client example - asynchronous
// Uses the mdcli API to hide all MDP aspects
//
// Lets us build this source without creating a library
#include "mdcliapi2.c"
int
main
(
int
argc
,
char
*
argv
[])
{
int
verbose
=
(
argc
>
1
&&
streq
(
argv
[
1
],
"-v"
));
mdcli_t
*
session
=
mdcli_new
(
"tcp://localhost:5555"
,
verbose
);
int
count
;
for
(
count
=
0
;
count
<
100000
;
count
++
)
{
zmsg_t
*
request
=
zmsg_new
();
zmsg_pushstr
(
request
,
"Hello world"
);
mdcli_send
(
session
,
"echo"
,
&
request
);
}
for
(
count
=
0
;
count
<
100000
;
count
++
)
{
zmsg_t
*
reply
=
mdcli_recv
(
session
);
if
(
reply
)
zmsg_destroy
(
&
reply
);
else
break
;
// Interrupted by Ctrl-C
}
printf
(
"%d replies received
\n
"
,
count
);
mdcli_destroy
(
&
session
);
return
0
;
}
The broker and worker are unchanged because we havenât modified the protocol at all. We see an immediate improvement in performance. Hereâs the synchronous client chugging through 100K request-reply cycles:
$ time mdclient
100000 requests/replies processed
real 0m14.088s
user 0m1.310s
sys 0m2.670s
And hereâs the asynchronous client, with a single worker:
$ time mdclient2
100000 replies received
real 0m8.730s
user 0m0.920s
sys 0m1.550s
Twice as fast. Not bad, but letâs fire up 10 workers and see how it handles the traffic:
$ time mdclient2
100000 replies received
real 0m3.863s
user 0m0.730s
sys 0m0.470s
It isnât fully asynchronous because workers get their messages on a strict last-used basis, but it will scale better with more workers. On my PC, after eight or so workers, it doesnât get any faster. Four cores only stretches so far. But we got a 4x improvement in throughput with just a few minutesâ work. The broker is still unoptimized. It spends most of its time copying message frames around, instead of doing zero-copy, which it could. But weâre getting 25K reliable request-reply calls a second, with pretty low effort.
However, the asynchronous Majordomo pattern isnât all roses. It has
a fundamental weakness, namely that it cannot survive a broker crash
without more work. If you look at the mdcliapi2
code
youâll see it does not attempt to reconnect after a failure. A proper
reconnect would require the following:
A number on every request and a matching number on every reply, which would ideally require a change to the protocol to enforce
Tracking and holding onto all outstanding requests in the client API (i.e., those for which no reply has yet been received)
In case of fail over, for the client API to resend all outstanding requests to the broker
Itâs not a deal breaker, but it does show that performance often means complexity. Is this worth doing for Majordomo? It depends on your use case. For a name lookup service you call once per session, no. For a web frontend serving thousands of clients, probably yes.
Get ZeroMQ now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.