We used a mailman analogy earlier to describe Node’s event loop. If the mailman were to arrive at a closed gate, he would be unable to deliver his message; but imagine an elderly and kind groundskeeper was in the process of opening the gate so the mailman could pass through. Being elderly and somewhat frail from his years of service, it takes the groundskeeper some time to clear the way—time during which the mailman is unable to deliver any messages.
This situation is a blocking process, but it is not a permanent state. Evenually the groundskeeper will manage to get the gate open, and the mailman will go about his business. Every house the mailman reaches with a similar gate-opening process will slow down the overall route. In the context of a Node application, this type of block will seriously degrade performance.
In the computer realm, similar situations may be caused by sending a user email during a registration process, by lots of math that needs to be done as a result of user input, or by any situation in which the time it takes to complete a task exceeds a user’s normally expected wait times. Node’s event-driven design handles the majority of these situations for you by using asynchronous functions and callbacks, but when an event is particularly “heavy” to process, it doesn’t make sense to process it inside Node. Node should only take care of handling results and fast operations.
By way of example, consider a generic user registration process. When a user registers herself, the application saves a new record in the database, sends an email to that user, and perhaps records some statistics about the registration process, such as the number of steps completed or amount of time taken. It probably doesn’t make sense to perform all of those actions right away when the user hits the Submit button on your web page. For one thing, the email process could take several seconds (or if you’re unlucky, minutes) to complete, the database call may not need to finish before the user is welcomed, and the statistics are probably separate from your main application flow. In this case, you might choose to generate a message that notifies other parts of your application instead—perhaps running on a different machine entirely—that a user has registered. This is known as a publish-subscribe pattern.
Another example: suppose you have a cluster of machines running Node.js. When a new machine is added to the cluster, it issues a message requesting configuration information. A configuration server responds to the message with a list of configuration information the new machine needs to integrate into the cluster. This is known as a request-reply pattern.
Message queues allow programmers to publish events and move on, enabling improved performance through parallel processing and higher levels of scalability through inter-process communication channels.
RabbitMQ is a message broker that supports the advanced message queueing protocol (AMQP). It is useful in situations where data needs to be communicated between different servers, or between different processes on the same server. Written in Erlang, RabbitMQ is capable of clustering for high availability, and is fairly straightforward to install and begin using.
If you’re using Linux, RabbitMQ is available in package form for most distributions. Anyone can download the software from http://www.rabbitmq.com and compile it from source.
Once RabbitMQ has been installed and is running, use
npm
to retrieve Node’s AMQP drivers:
npm install amqp
RabbitMQ communicates using the standardized protocol AMQP. AMQP comes from the financial services industry, where reliable messaging is a matter of life or death. It provides a vendor-neutral and abstract specification for generic (not just financial) middleware messaging and is intended to solve the problem of communicating between different types of systems. AMQP is conceptually similar to email: email messages have specifications for headers and format, but their contents can be anything from text to photos and video. Just as two companies don’t need to run the same email server software to communicate, AMQP allows messaging between different platforms. For example, a publisher written in PHP can send a message to a consumer written in JavaScript.
Example 6-35 shows the most basic elements of RabbitMQ programming.
Example 6-35. AMQP/RabbitMQ usage
var connection = require('amqp').createConnection(); connection.on('ready', function() { console.log('Connected to ' + connection.serverProperties.product); var e = connection.exchange('up-and-running'); var q = connection.queue('up-and-running-queue'); q.on('queueDeclareOk', function(args) { console.log('Queue opened'); q.bind(e, '#'); q.on('queueBindOk', function() { console.log('Queue bound'); q.on('basicConsumeOk', function() { console.log("Consumer has subscribed, publishing message."); e.publish('routingKey', {hello:'world'}); }); }); q.subscribe(function(msg) { console.log('Message received:'); console.log(msg); connection.end(); }); }); });
The output is:
Connected to RabbitMQ Queue opened Queue bound Consumer has subscribed, publishing message. Message received: { hello: 'world' }
The createConnection
command
opens a connection to the RabbitMQ message broker, which
in this case defaults (as per AMQP) to localhost on port 5672. If
necessary, this command can be overloaded; for example:
createConnection({host: 'dev.mycompany.com', port: 5555})
Next, a queue and exchange are defined. This step is not
strictly required, because AMQP brokers are required to provide a
default exchange, but by specifying up-and-running
as the exchange name, you
insulate your application from other exchanges that could be running
on the server. An exchange is an entity that receives messages and
passes them forward to attached queues.
The queue doesn’t do anything by itself; it must be bound to an
exchange before it will do anything. The command q.bind(e, '#')
instructs AMQP to attach the
queue named up-and-running-queue
to the exchange
named up-and-running
, and to listen for all
messages passed to the exchange (the '#'
parameter). You could easily change the
#
to some specific key to filter
out messages.
Once the queue and exchange have
been declared, an event is set up for basicConsume
Ok
, which is an event generated by the AMQP
library when a client subscribes to a queue. When that happens, Node
will publish a “hello world” message to the exchange under a filtering
key of routingKey
. In this example,
the filter key doesn’t matter, because the queue is bound to all keys
(via the bind('#')
command), but a
central tenet of AMQP is that the publisher is never aware of which
subscribers (if any) are connected, so a routing key is supplied in
any case.
Finally, the subscribe
command is issued. The callback function that is passed as its
argument is called every time an eligible message is received by the
exchange and passed through to the queue. In this case, the callback
causes the program to end, which is good for demonstration purposes,
but in “real” applications it’s unlikely you would do this. When the subscribe command is successful,
AMQP dispatches the basicConsume
Ok
event, which triggers the publishing of
the “hello world” message and subsequently ends the demonstration
program.
Queues are useful when long-running tasks take longer than is acceptable to the user (such as during a web page load) or when the task would otherwise block the application. Using RabbitMQ, is it possible to split tasks among multiple workers and ensure that tasks are completed even if the first worker that handles them dies mid-process (Example 6-36).
Example 6-36. Publishing long jobs with AMQP
var connection = require('amqp').createConnection(); var count = 0; connection.on('ready', function() { console.log('Connected to ' + connection.serverProperties.product); var e = connection.exchange('up-and-running'); var q = connection.queue('up-and-running-queue'); q.on('queueDeclareOk', function(args) { console.log('Queue opened'); q.bind(e, '#'); q.on('queueBindOk', function() { console.log('Queue bound'); setInterval(function(){ console.log('Publishing message #' + ++count); e.publish('routingKey', {count:count}); }, 1000); }); }); });
This example is a modified version of the straight
publish-subscribe example from the previous section, but it is just a
publisher, so the event listener for subscribing is gone. In its place
is an interval timer that publishes a message to the queue every 1,000
milliseconds (that is, every second). The message contains a count
variable that is incremented during
each publish. This code can be used to implement a simple worker
application. Example 6-37 shows the corresponding
client.
Example 6-37. Processing long jobs with AMQP
var connection = require('amqp').createConnection(); function sleep(milliseconds) { var start = new Date().getTime(); while (new Date().getTime() < start + milliseconds); } connection.on('ready', function() { console.log('Connected to ' + connection.serverProperties.product); var e = connection.exchange('up-and-running'); var q = connection.queue('up-and-running-queue'); q.on('queueDeclareOk', function(args) { q.bind(e,'#'); q.subscribe({ack:true},function(msg) { console.log('Message received:'); console.log(msg.count); sleep(5000); console.log('Processed. Waiting for next message.'); q.shift(); }); }); });
The client works by taking a message from the queue, processing it (in this example, sleeping for 5 seconds), and then taking the next message from the queue and repeating. Although there is no “sleep” function in Node, you can fake it with a blocking loop, as done here.
There is a problem. Recall that the publisher posts a message to the queue every second. Because the client takes 5 seconds to process each message, it will very quickly get far behind the publisher. The solution? Open another window and run a second client, and now the messages are processed twice as fast. It’s still not quick enough to handle the volume produced by the publisher, but adding more clients can further spread the load and keep the unprocessed messages from falling behind. This setup is referred to as worker queues.
Worker queues function by round-robining the message publishing
between clients connected to a named queue. The {ack:true}
parameter to the subscribe
command instructs AMQP to wait for the user to acknowledge that the
processing has been completed for a message. The shift
method provides that acknowledgment by shifting the message off
the queue and removing it from service. This way, if the worker
happens to die while processing a message, the RabbitMQ broker will
send the message to the next available client. There is no timeout; as
long as the client is connected, the message will be removed from
play. Only when the client disconnects without acknowledging a message
will it be sent to the next client.
Warning
A common “gotcha” occurs when developers forget to use the
q.shift()
command. If you forget
it, your program will continue to function as normal, but as soon as
your client disconnects, the server will place all of the messages
the client processed back onto the queue.
Another side effect is that the memory usage by RabbitMQ will gradually rise. This is because, although the messages are removed from active duty on the queue, they are kept in memory until they are acknowledged and deleted by the client.
Get Node: Up and Running now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.