MQ Protocols

We used a mailman analogy earlier to describe Node’s event loop. If the mailman were to arrive at a closed gate, he would be unable to deliver his message; but imagine an elderly and kind groundskeeper was in the process of opening the gate so the mailman could pass through. Being elderly and somewhat frail from his years of service, it takes the groundskeeper some time to clear the way—time during which the mailman is unable to deliver any messages.

This situation is a blocking process, but it is not a permanent state. Evenually the groundskeeper will manage to get the gate open, and the mailman will go about his business. Every house the mailman reaches with a similar gate-opening process will slow down the overall route. In the context of a Node application, this type of block will seriously degrade performance.

In the computer realm, similar situations may be caused by sending a user email during a registration process, by lots of math that needs to be done as a result of user input, or by any situation in which the time it takes to complete a task exceeds a user’s normally expected wait times. Node’s event-driven design handles the majority of these situations for you by using asynchronous functions and callbacks, but when an event is particularly “heavy” to process, it doesn’t make sense to process it inside Node. Node should only take care of handling results and fast operations.

By way of example, consider a generic user registration process. When a user registers herself, the application saves a new record in the database, sends an email to that user, and perhaps records some statistics about the registration process, such as the number of steps completed or amount of time taken. It probably doesn’t make sense to perform all of those actions right away when the user hits the Submit button on your web page. For one thing, the email process could take several seconds (or if you’re unlucky, minutes) to complete, the database call may not need to finish before the user is welcomed, and the statistics are probably separate from your main application flow. In this case, you might choose to generate a message that notifies other parts of your application instead—perhaps running on a different machine entirely—that a user has registered. This is known as a publish-subscribe pattern.

Another example: suppose you have a cluster of machines running Node.js. When a new machine is added to the cluster, it issues a message requesting configuration information. A configuration server responds to the message with a list of configuration information the new machine needs to integrate into the cluster. This is known as a request-reply pattern.

Message queues allow programmers to publish events and move on, enabling improved performance through parallel processing and higher levels of scalability through inter-process communication channels.

RabbitMQ

RabbitMQ is a message broker that supports the advanced message queueing protocol (AMQP). It is useful in situations where data needs to be communicated between different servers, or between different processes on the same server. Written in Erlang, RabbitMQ is capable of clustering for high availability, and is fairly straightforward to install and begin using.

Installing RabbitMQ

If you’re using Linux, RabbitMQ is available in package form for most distributions. Anyone can download the software from http://www.rabbitmq.com and compile it from source.

Once RabbitMQ has been installed and is running, use npm to retrieve Node’s AMQP drivers:

npm install amqp

Publish and subscribe

RabbitMQ communicates using the standardized protocol AMQP. AMQP comes from the financial services industry, where reliable messaging is a matter of life or death. It provides a vendor-neutral and abstract specification for generic (not just financial) middleware messaging and is intended to solve the problem of communicating between different types of systems. AMQP is conceptually similar to email: email messages have specifications for headers and format, but their contents can be anything from text to photos and video. Just as two companies don’t need to run the same email server software to communicate, AMQP allows messaging between different platforms. For example, a publisher written in PHP can send a message to a consumer written in JavaScript.

Example 6-35 shows the most basic elements of RabbitMQ programming.

Example 6-35. AMQP/RabbitMQ usage

var connection = require('amqp').createConnection();

connection.on('ready', function() {
  console.log('Connected to ' + connection.serverProperties.product);
  var e = connection.exchange('up-and-running');

  var q = connection.queue('up-and-running-queue');

  q.on('queueDeclareOk', function(args) {
    console.log('Queue opened');
    q.bind(e, '#');

    q.on('queueBindOk', function() {
      console.log('Queue bound');

      q.on('basicConsumeOk', function() {
        console.log("Consumer has subscribed, publishing message.");
        e.publish('routingKey', {hello:'world'});
      });
    });

    q.subscribe(function(msg) {
      console.log('Message received:');
      console.log(msg);
      connection.end();
    });
  });
});

The output is:

Connected to RabbitMQ
Queue opened
Queue bound
Consumer has subscribed, publishing message.
Message received:
{ hello: 'world' }

The createConnection command opens a connection to the RabbitMQ message broker, which in this case defaults (as per AMQP) to localhost on port 5672. If necessary, this command can be overloaded; for example:

createConnection({host: 'dev.mycompany.com', port: 5555})

Next, a queue and exchange are defined. This step is not strictly required, because AMQP brokers are required to provide a default exchange, but by specifying up-and-running as the exchange name, you insulate your application from other exchanges that could be running on the server. An exchange is an entity that receives messages and passes them forward to attached queues.

The queue doesn’t do anything by itself; it must be bound to an exchange before it will do anything. The command q.bind(e, '#') instructs AMQP to attach the queue named up-and-running-queue to the exchange named up-and-running, and to listen for all messages passed to the exchange (the '#' parameter). You could easily change the # to some specific key to filter out messages.

Once the queue and exchange have been declared, an event is set up for basicConsumeOk, which is an event generated by the AMQP library when a client subscribes to a queue. When that happens, Node will publish a “hello world” message to the exchange under a filtering key of routingKey. In this example, the filter key doesn’t matter, because the queue is bound to all keys (via the bind('#') command), but a central tenet of AMQP is that the publisher is never aware of which subscribers (if any) are connected, so a routing key is supplied in any case.

Finally, the subscribe command is issued. The callback function that is passed as its argument is called every time an eligible message is received by the exchange and passed through to the queue. In this case, the callback causes the program to end, which is good for demonstration purposes, but in “real” applications it’s unlikely you would do this. When the subscribe command is successful, AMQP dispatches the basicConsumeOk event, which triggers the publishing of the “hello world” message and subsequently ends the demonstration program.

Work queues

Queues are useful when long-running tasks take longer than is acceptable to the user (such as during a web page load) or when the task would otherwise block the application. Using RabbitMQ, is it possible to split tasks among multiple workers and ensure that tasks are completed even if the first worker that handles them dies mid-process (Example 6-36).

Example 6-36. Publishing long jobs with AMQP

var connection = require('amqp').createConnection();
var count = 0;

connection.on('ready', function() {
  console.log('Connected to ' + connection.serverProperties.product);
  var e = connection.exchange('up-and-running');

  var q = connection.queue('up-and-running-queue');

  q.on('queueDeclareOk', function(args) {
    console.log('Queue opened');
    q.bind(e, '#');

    q.on('queueBindOk', function() {
      console.log('Queue bound');

      setInterval(function(){
        console.log('Publishing message #' + ++count);
        e.publish('routingKey', {count:count});
      }, 1000);
    });
  });
});

This example is a modified version of the straight publish-subscribe example from the previous section, but it is just a publisher, so the event listener for subscribing is gone. In its place is an interval timer that publishes a message to the queue every 1,000 milliseconds (that is, every second). The message contains a count variable that is incremented during each publish. This code can be used to implement a simple worker application. Example 6-37 shows the corresponding client.

Example 6-37. Processing long jobs with AMQP

var connection = require('amqp').createConnection();

function sleep(milliseconds)
{
  var start = new Date().getTime();
  while (new Date().getTime() < start + milliseconds);
}

connection.on('ready', function() {
  console.log('Connected to ' + connection.serverProperties.product);

  var e = connection.exchange('up-and-running');
  var q = connection.queue('up-and-running-queue');

  q.on('queueDeclareOk', function(args) {
  q.bind(e,'#');

  q.subscribe({ack:true},function(msg) {
    console.log('Message received:');
    console.log(msg.count);
    sleep(5000);
    console.log('Processed. Waiting for next message.');
    q.shift();
    });
  });
});

The client works by taking a message from the queue, processing it (in this example, sleeping for 5 seconds), and then taking the next message from the queue and repeating. Although there is no “sleep” function in Node, you can fake it with a blocking loop, as done here.

There is a problem. Recall that the publisher posts a message to the queue every second. Because the client takes 5 seconds to process each message, it will very quickly get far behind the publisher. The solution? Open another window and run a second client, and now the messages are processed twice as fast. It’s still not quick enough to handle the volume produced by the publisher, but adding more clients can further spread the load and keep the unprocessed messages from falling behind. This setup is referred to as worker queues.

Worker queues function by round-robining the message publishing between clients connected to a named queue. The {ack:true} parameter to the subscribe command instructs AMQP to wait for the user to acknowledge that the processing has been completed for a message. The shift method provides that acknowledgment by shifting the message off the queue and removing it from service. This way, if the worker happens to die while processing a message, the RabbitMQ broker will send the message to the next available client. There is no timeout; as long as the client is connected, the message will be removed from play. Only when the client disconnects without acknowledging a message will it be sent to the next client.

Warning

A common “gotcha” occurs when developers forget to use the q.shift() command. If you forget it, your program will continue to function as normal, but as soon as your client disconnects, the server will place all of the messages the client processed back onto the queue.

Another side effect is that the memory usage by RabbitMQ will gradually rise. This is because, although the messages are removed from active duty on the queue, they are kept in memory until they are acknowledged and deleted by the client.

Get Node: Up and Running now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.