The Simple Pirate queue pattern works pretty well, especially because itâs just a combination of two existing patterns. Still, it does have some weaknesses:
Itâs not robust in the face of a queue crash and restart. The client will recover, but the workers wonât. While ÃMQ will reconnect workersâ sockets automatically, as far as the newly started queue is concerned the workers havenât signaled ready, so they donât exist. To fix this we have to do heartbeating from queue to worker so that the worker can detect when the queue has gone away.
The queue does not detect worker failure, so if a worker dies while idle, the queue canât remove it from its worker queue until the queue sends it a request. The client waits and retries for nothing. Itâs not a critical problem, but itâs not nice. To make this work properly, we need to do heartbeating from worker to queue, so that the queue can detect a lost worker at any stage.
Weâll fix these issues in a properly pedantic Paranoid Pirate pattern.
We previously used a REQ socket for the worker. For the Paranoid Pirate worker, weâll switch to a DEALER socket (Figure 4-3). This has the advantage of letting us send and receive messages at any time, rather than the lock-step send/receive that REQ imposes. The downside of DEALER is that we have to do our own envelope management (re-read Chapter 3 for background on this concept).
Weâre still using the Lazy Pirate client. The Paranoid Pirate queue proxy is shown in Example 4-6.
Example 4-6. Paranoid Pirate queue (ppqueue.c)
//
// Paranoid Pirate queue
//
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3
// 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000
// msec
// Paranoid Pirate Protocol constants
#define PPP_READY "\001"
// Signals worker is ready
#define PPP_HEARTBEAT "\002"
// Signals worker heartbeat
Example 4-7 defines the worker class: a structure and a set of functions that act as constructor, destructor, and methods on worker objects.
Example 4-7. Paranoid Pirate queue (ppqueue.c): worker class structure
typedef
struct
{
zframe_t
*
identity
;
// Identity of worker
char
*
id_string
;
// Printable identity
int64_t
expiry
;
// Expires at this time
}
worker_t
;
// Construct new worker
static
worker_t
*
s_worker_new
(
zframe_t
*
identity
)
{
worker_t
*
self
=
(
worker_t
*
)
zmalloc
(
sizeof
(
worker_t
));
self
->
identity
=
identity
;
self
->
id_string
=
zframe_strdup
(
identity
);
self
->
expiry
=
zclock_time
()
+
HEARTBEAT_INTERVAL
*
HEARTBEAT_LIVENESS
;
return
self
;
}
// Destroy specified worker object, including identity frame
static
void
s_worker_destroy
(
worker_t
**
self_p
)
{
assert
(
self_p
);
if
(
*
self_p
)
{
worker_t
*
self
=
*
self_p
;
zframe_destroy
(
&
self
->
identity
);
free
(
self
->
id_string
);
free
(
self
);
*
self_p
=
NULL
;
}
}
The ready
method (Example 4-8)
puts a worker at the end of the ready list.
Example 4-8. Paranoid Pirate queue (ppqueue.c): worker ready method
static
void
s_worker_ready
(
worker_t
*
self
,
zlist_t
*
workers
)
{
worker_t
*
worker
=
(
worker_t
*
)
zlist_first
(
workers
);
while
(
worker
)
{
if
(
streq
(
self
->
id_string
,
worker
->
id_string
))
{
zlist_remove
(
workers
,
worker
);
s_worker_destroy
(
&
worker
);
break
;
}
worker
=
(
worker_t
*
)
zlist_next
(
workers
);
}
zlist_append
(
workers
,
self
);
}
The next
method, shown in Example 4-9, returns the next available workerâs
identity.
Example 4-9. Paranoid Pirate queue (ppqueue.c): get next available worker method
static
zframe_t
*
s_workers_next
(
zlist_t
*
workers
)
{
worker_t
*
worker
=
zlist_pop
(
workers
);
assert
(
worker
);
zframe_t
*
frame
=
worker
->
identity
;
worker
->
identity
=
NULL
;
s_worker_destroy
(
&
worker
);
return
frame
;
}
The purge
method (Example 4-10)
looks for and kills expired workers. We hold workers from oldest to most
recent, so we stop at the first alive worker.
Example 4-10. Paranoid Pirate queue (ppqueue.c): purge expired workers method
static
void
s_workers_purge
(
zlist_t
*
workers
)
{
worker_t
*
worker
=
(
worker_t
*
)
zlist_first
(
workers
);
while
(
worker
)
{
if
(
zclock_time
()
<
worker
->
expiry
)
break
;
// Worker is alive, we're done here
zlist_remove
(
workers
,
worker
);
s_worker_destroy
(
&
worker
);
worker
=
(
worker_t
*
)
zlist_first
(
workers
);
}
}
The main task is a load balancer with heartbeating on workers so we can detect crashed or blocked worker tasks, as shown in Example 4-11.
Example 4-11. Paranoid Pirate queue (ppqueue.c): main task
int
main
(
void
)
{
zctx_t
*
ctx
=
zctx_new
();
void
*
frontend
=
zsocket_new
(
ctx
,
ZMQ_ROUTER
);
void
*
backend
=
zsocket_new
(
ctx
,
ZMQ_ROUTER
);
zsocket_bind
(
frontend
,
"tcp://*:5555"
);
// For clients
zsocket_bind
(
backend
,
"tcp://*:5556"
);
// For workers
// List of available workers
zlist_t
*
workers
=
zlist_new
();
// Send out heartbeats at regular intervals
uint64_t
heartbeat_at
=
zclock_time
()
+
HEARTBEAT_INTERVAL
;
while
(
true
)
{
zmq_pollitem_t
items
[]
=
{
{
backend
,
0
,
ZMQ_POLLIN
,
0
},
{
frontend
,
0
,
ZMQ_POLLIN
,
0
}
};
// Poll frontend only if we have available workers
int
rc
=
zmq_poll
(
items
,
zlist_size
(
workers
)
?
2
:
1
,
HEARTBEAT_INTERVAL
*
ZMQ_POLL_MSEC
);
if
(
rc
==
-
1
)
break
;
// Interrupted
// Handle worker activity on backend
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
// Use worker identity for load balancing
zmsg_t
*
msg
=
zmsg_recv
(
backend
);
if
(
!
msg
)
break
;
// Interrupted
// Any sign of life from worker means it's ready
zframe_t
*
identity
=
zmsg_unwrap
(
msg
);
worker_t
*
worker
=
s_worker_new
(
identity
);
s_worker_ready
(
worker
,
workers
);
// Validate control message, or return reply to client
if
(
zmsg_size
(
msg
)
==
1
)
{
zframe_t
*
frame
=
zmsg_first
(
msg
);
if
(
memcmp
(
zframe_data
(
frame
),
PPP_READY
,
1
)
&&
memcmp
(
zframe_data
(
frame
),
PPP_HEARTBEAT
,
1
))
{
printf
(
"E: invalid message from worker"
);
zmsg_dump
(
msg
);
}
zmsg_destroy
(
&
msg
);
}
else
zmsg_send
(
&
msg
,
frontend
);
}
if
(
items
[
1
].
revents
&
ZMQ_POLLIN
)
{
// Now get next client request, route to next worker
zmsg_t
*
msg
=
zmsg_recv
(
frontend
);
if
(
!
msg
)
break
;
// Interrupted
zmsg_push
(
msg
,
s_workers_next
(
workers
));
zmsg_send
(
&
msg
,
backend
);
}
We handle heartbeating after any socket activity. As shown in Example 4-12, first, we send heartbeats to any idle workers if itâs time, then we purge any dead workers.
Example 4-12. Paranoid Pirate queue (ppqueue.c): handle heartbeating
if
(
zclock_time
()
>=
heartbeat_at
)
{
worker_t
*
worker
=
(
worker_t
*
)
zlist_first
(
workers
);
while
(
worker
)
{
zframe_send
(
&
worker
->
identity
,
backend
,
ZFRAME_REUSE
+
ZFRAME_MORE
);
zframe_t
*
frame
=
zframe_new
(
PPP_HEARTBEAT
,
1
);
zframe_send
(
&
frame
,
backend
,
0
);
worker
=
(
worker_t
*
)
zlist_next
(
workers
);
}
heartbeat_at
=
zclock_time
()
+
HEARTBEAT_INTERVAL
;
}
s_workers_purge
(
workers
);
}
// When we're done, clean up properly
while
(
zlist_size
(
workers
))
{
worker_t
*
worker
=
(
worker_t
*
)
zlist_pop
(
workers
);
s_worker_destroy
(
&
worker
);
}
zlist_destroy
(
&
workers
);
zctx_destroy
(
&
ctx
);
return
0
;
}
The queue extends the load-balancing pattern with heartbeating of workers. Heartbeating is one of those âsimpleâ things that can be difficult to get right. Iâll explain more about that in the next section; for now, back to the code.
Take a look at the Paranoid Pirate worker in Example 4-13.
Example 4-13. Paranoid Pirate worker (ppworker.c)
//
// Paranoid Pirate worker
//
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3
// 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000
// msec
#define INTERVAL_INIT 1000
// Initial reconnect
#define INTERVAL_MAX 32000
// After exponential backoff
// Paranoid Pirate Protocol constants
#define PPP_READY "\001"
// Signals worker is ready
#define PPP_HEARTBEAT "\002"
// Signals worker heartbeat
// Helper function that returns a new configured socket
// connected to the Paranoid Pirate queue
static
void
*
s_worker_socket
(
zctx_t
*
ctx
)
{
void
*
worker
=
zsocket_new
(
ctx
,
ZMQ_DEALER
);
zsocket_connect
(
worker
,
"tcp://localhost:5556"
);
// Tell queue we're ready for work
printf
(
"I: worker ready
\n
"
);
zframe_t
*
frame
=
zframe_new
(
PPP_READY
,
1
);
zframe_send
(
&
frame
,
worker
,
0
);
return
worker
;
}
We have a single task that implements the worker side of the Paranoid Pirate Protocol (PPP). The heartbeating code in Example 4-14 lets the worker detect if the queue has died, and vice versa.
Example 4-14. Paranoid Pirate worker (ppworker.c): main task
int
main
(
void
)
{
zctx_t
*
ctx
=
zctx_new
();
void
*
worker
=
s_worker_socket
(
ctx
);
// If liveness hits zero, queue is considered disconnected
size_t
liveness
=
HEARTBEAT_LIVENESS
;
size_t
interval
=
INTERVAL_INIT
;
// Send out heartbeats at regular intervals
uint64_t
heartbeat_at
=
zclock_time
()
+
HEARTBEAT_INTERVAL
;
srandom
((
unsigned
)
time
(
NULL
));
int
cycles
=
0
;
while
(
true
)
{
zmq_pollitem_t
items
[]
=
{
{
worker
,
0
,
ZMQ_POLLIN
,
0
}
};
int
rc
=
zmq_poll
(
items
,
1
,
HEARTBEAT_INTERVAL
*
ZMQ_POLL_MSEC
);
if
(
rc
==
-
1
)
break
;
// Interrupted
if
(
items
[
0
].
revents
&
ZMQ_POLLIN
)
{
// Get message
// - 3-part envelope + content -> request
// - 1-part HEARTBEAT -> heartbeat
zmsg_t
*
msg
=
zmsg_recv
(
worker
);
if
(
!
msg
)
break
;
// Interrupted
To test the robustness of the queue implementation, we simulate various typical problems, such as the worker crashing or running very slowly. We do this after a few cycles so that the architecture can get up and running first. The problem simulation code is in Example 4-15.
Example 4-15. Paranoid Pirate worker (ppworker.c): simulating problems
cycles
++
;
if
(
cycles
>
3
&&
randof
(
5
)
==
0
)
{
printf
(
"I: simulating a crash
\n
"
);
zmsg_destroy
(
&
msg
);
break
;
}
else
if
(
cycles
>
3
&&
randof
(
5
)
==
0
)
{
printf
(
"I: simulating CPU overload
\n
"
);
sleep
(
3
);
if
(
zctx_interrupted
)
break
;
}
printf
(
"I: normal reply
\n
"
);
zmsg_send
(
&
msg
,
worker
);
liveness
=
HEARTBEAT_LIVENESS
;
sleep
(
1
);
// Do some heavy work
if
(
zctx_interrupted
)
break
;
}
else
When we get a heartbeat message from the queue, it means the queue is (or rather, was recently) alive, so we must reset our liveness indicator. The code in Example 4-16 handles the heartbeats.
Example 4-16. Paranoid Pirate worker (ppworker.c): handle heartbeats
zframe_t
*
frame
=
zmsg_first
(
msg
);
if
(
memcmp
(
zframe_data
(
frame
),
PPP_HEARTBEAT
,
1
)
==
0
)
liveness
=
HEARTBEAT_LIVENESS
;
else
{
printf
(
"E: invalid message
\n
"
);
zmsg_dump
(
msg
);
}
zmsg_destroy
(
&
msg
);
}
else
{
printf
(
"E: invalid message
\n
"
);
zmsg_dump
(
msg
);
}
interval
=
INTERVAL_INIT
;
}
else
If the queue hasnât sent us heartbeats in a while, we destroy the socket and reconnect, as shown in Example 4-17. This is the simplest and most brutal way of discarding any messages we might have sent in the meantime.
Example 4-17. Paranoid Pirate worker (ppworker.c): detecting a dead queue
printf
(
"W: heartbeat failure, can't reach queue
\n
"
);
printf
(
"W: reconnecting in %zd msec...
\n
"
,
interval
);
zclock_sleep
(
interval
);
if
(
interval
<
INTERVAL_MAX
)
interval
*=
2
;
zsocket_destroy
(
ctx
,
worker
);
worker
=
s_worker_socket
(
ctx
);
liveness
=
HEARTBEAT_LIVENESS
;
}
// Send heartbeat to queue if it's time
if
(
zclock_time
()
>
heartbeat_at
)
{
heartbeat_at
=
zclock_time
()
+
HEARTBEAT_INTERVAL
;
printf
(
"I: worker heartbeat
\n
"
);
zframe_t
*
frame
=
zframe_new
(
PPP_HEARTBEAT
,
1
);
zframe_send
(
&
frame
,
worker
,
0
);
}
}
zctx_destroy
(
&
ctx
);
return
0
;
}
Some comments about this example:
The code includes simulation of failures, as before. This makes it (a) very hard to debug, and (b) dangerous to reuse. When you want to debug this code, disable the failure simulation.
The worker uses a reconnect strategy similar to the one we designed for the Lazy Pirate client, with two major differences: it does an exponential backoff, and it retries indefinitely (whereas the client retries a few times before reporting a failure).
You can try the client, queue, and workers by using a script like this:
ppqueue & for i in 1 2 3 4; do ppworker & sleep 1 done lpclient &
You should see the workers die one by one as they simulate a crash, and the client eventually give up. You can stop and restart the queue, and both the client and the workers will reconnect and carry on. And no matter what you do to queues and workers, the client will never get an out-of-order reply: either the whole chain works, or the client abandons.
Get ZeroMQ now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.