As a final example (you are surely getting tired of juicy code and want to delve back into philological discussions about comparative abstractive norms), letâs do a little supercomputing. Then, coffee. Our supercomputing application is a fairly typical parallel processing model (Figure 1-5). We have:
A ventilator that produces tasks that can be done in parallel
A set of workers that processes tasks
A sink that collects results back from the worker processes
In reality, workers run on superfast boxes, perhaps using GPUs (graphic processing units) to do the hard math. Example 1-8 shows the code for the ventilator. It generates 100 tasks, each one a message telling the worker to sleep for some number of milliseconds.
Example 1-8. Parallel task ventilator (taskvent.c)
//
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
//
#include "zhelpers.h"
int
main
(
void
)
{
void
*
context
=
zmq_ctx_new
();
// Socket to send messages on
void
*
sender
=
zmq_socket
(
context
,
ZMQ_PUSH
);
zmq_bind
(
sender
,
"tcp://*:5557"
);
// Socket to send start of batch message on
void
*
sink
=
zmq_socket
(
context
,
ZMQ_PUSH
);
zmq_connect
(
sink
,
"tcp://localhost:5558"
);
printf
(
"Press Enter when the workers are ready: "
);
getchar
();
printf
(
"Sending tasks to workers...
\n
"
);
// The first message is "0" and signals start of batch
s_send
(
sink
,
"0"
);
// Initialize random number generator
srandom
((
unsigned
)
time
(
NULL
));
// Send 100 tasks
int
task_nbr
;
int
total_msec
=
0
;
// Total expected cost in msec
for
(
task_nbr
=
0
;
task_nbr
<
100
;
task_nbr
++
)
{
int
workload
;
// Random workload from 1 to 100 msec
workload
=
randof
(
100
)
+
1
;
total_msec
+=
workload
;
char
string
[
10
];
sprintf
(
string
,
"%d"
,
workload
);
s_send
(
sender
,
string
);
}
printf
(
"Total expected cost: %d msec
\n
"
,
total_msec
);
sleep
(
1
);
// Give 0MQ time to deliver
zmq_close
(
sink
);
zmq_close
(
sender
);
zmq_ctx_destroy
(
context
);
return
0
;
}
The code for the worker application is in Example 1-9. It receives a message, sleeps for that number of seconds, and then signals that itâs finished.
Example 1-9. Parallel task worker (taskwork.c)
//
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
//
#include "zhelpers.h"
int
main
(
void
)
{
void
*
context
=
zmq_ctx_new
();
// Socket to receive messages on
void
*
receiver
=
zmq_socket
(
context
,
ZMQ_PULL
);
zmq_connect
(
receiver
,
"tcp://localhost:5557"
);
// Socket to send messages to
void
*
sender
=
zmq_socket
(
context
,
ZMQ_PUSH
);
zmq_connect
(
sender
,
"tcp://localhost:5558"
);
// Process tasks forever
while
(
1
)
{
char
*
string
=
s_recv
(
receiver
);
// Simple progress indicator for the viewer
fflush
(
stdout
);
printf
(
"%s."
,
string
);
// Do the work
s_sleep
(
atoi
(
string
));
free
(
string
);
// Send results to sink
s_send
(
sender
,
""
);
}
zmq_close
(
receiver
);
zmq_close
(
sender
);
zmq_ctx_destroy
(
context
);
return
0
;
}
Finally, Example 1-10 shows the sink application. It collects the 100 messages and then calculates how long the overall processing took, so we can confirm that the workers really were running in parallel if there are more than one of them.
Example 1-10. Parallel task sink (tasksink.c)
//
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
//
#include "zhelpers.h"
int
main
(
void
)
{
// Prepare our context and socket
void
*
context
=
zmq_ctx_new
();
void
*
receiver
=
zmq_socket
(
context
,
ZMQ_PULL
);
zmq_bind
(
receiver
,
"tcp://*:5558"
);
// Wait for start of batch
char
*
string
=
s_recv
(
receiver
);
free
(
string
);
// Start our clock now
int64_t
start_time
=
s_clock
();
// Process 100 confirmations
int
task_nbr
;
for
(
task_nbr
=
0
;
task_nbr
<
100
;
task_nbr
++
)
{
char
*
string
=
s_recv
(
receiver
);
free
(
string
);
if
((
task_nbr
/
10
)
*
10
==
task_nbr
)
printf
(
":"
);
else
printf
(
"."
);
fflush
(
stdout
);
}
// Calculate and report duration of batch
printf
(
"Total elapsed time: %d msec
\n
"
,
(
int
)
(
s_clock
()
-
start_time
));
zmq_close
(
receiver
);
zmq_ctx_destroy
(
context
);
return
0
;
}
The average cost of a batch is five seconds. When we start one, two, and four workers, we get results like this from the sink:
# 1 worker Total elapsed time: 5034 msec # 2 workers Total elapsed time: 2421 msec # 4 workers Total elapsed time: 1018 msec
Letâs look at some aspects of this code in more detail:
The workers connect upstream to the ventilator, and downstream to the sink. This means you can add workers arbitrarily. If the workers bound to their endpoints, you would need (a) more endpoints and (b) to modify the ventilator and/or the sink each time you added a worker. We say that the ventilator and sink are stable parts of our architecture and the workers are dynamic parts of it.
We have to synchronize the start of the batch with all workers being up and running. This is a fairly common gotcha in ÃMQ, and there is no easy solution. The
connect
method takes a certain amount of time, so when a set of workers connect to the ventilator, the first one to successfully connect will get a whole load of messages in that short time while the others are still connecting. If you donât synchronize the start of the batch somehow, the system wonât run in parallel at all. Try removing the wait in the ventilator, and see what happens.The ventilatorâs PUSH socket distributes tasks to workers (assuming they are all connected before the batch starts going out) evenly. This is called load balancing, and itâs something weâll look at again in more detail.
The sinkâs PULL socket collects results from workers evenly. This is called fair queuing (Figure 1-6).
The pipeline pattern also exhibits the âslow joinerâ syndrome, leading to accusations that PUSH sockets donât load-balance properly. If you are using PUSH and PULL and one of your workers gets way more messages than the others, itâs because that PULL socket has joined faster than the others, and grabs a lot of messages before the others manage to connect.
Get ZeroMQ now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.