Chapter 4. Spouts

In this chapter, you’ll take a look at the most commonly used strategies for designing the entry point for a topology (a spout) and how to make spouts fault-tolerant.

Reliable versus Unreliable Messages

When designing a topology, one important thing to keep in mind is message reliability. If a message can’t be processed, you need to decide what to do with the individual message and what to do with the topology as a whole. For example, when processing bank deposits, it is important not to lose a single transaction message. But if you’re processing millions of tweets looking for some statistical metric, and one tweet gets lost, you can assume that the metric will still be fairly accurate.

In Storm, it is the author’s responsibility to guarantee message reliability according to the needs of each topology. This involves a trade-off. A reliable topology must manage lost messages, which requires more resources. A less reliable topology may lose some messages, but is less resource-intensive. Whatever the chosen reliability strategy, Storm provides the tools to implement it.

To manage reliability at the spout, you can include a message ID with the tuple at emit time (collector.emit(new Values(…),tupleId)). The methods ack and fail are called when a tuple is processed correctly or fails respectively. Tuple processing succeeds when the tuple is processed by all target bolts and all anchored bolts (you will learn how to anchor a bolt to a tuple in the Chapter 5).

Tuple processing fails when:

  • collector.fail(tuple) is called by the target spout

  • processing time exceeds the configured timeout

Let’s take a look at an example. Imagine you are processing bank transactions, and you have the following requirements:

  • If a transaction fails, resend the message.

  • If the transaction fails too many times, terminate the topology.

You’ll create a spout that sends 100 random transaction IDs, and a bolt that fails for 80% of tuples received (you can find the complete example at ch04-spout examples). You’ll implement the spout using a Map to emit transaction message tuples so that it’s easy to resend messages.

public void nextTuple() {
    if(!toSend.isEmpty()){
        for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet()){
            Integer transactionId = transactionEntry.getKey();
            String transactionMessage = transactionEntry.getValue();
            collector.emit(new Values(transactionMessage),transactionId);
        }
        toSend.clear();
    }
}

If there are messages waiting to be sent, get each transaction message and its associated ID and emit them as a tuple, then clear the message queue. Note that it’s safe to call clear on the map, because nextTuple, fail, and ack are the only methods that modify the map, and they all run in the same thread.

Maintain two maps to keep track of transaction messages waiting to be sent, and the number of times each transaction has failed. The ack method simply removes the transaction message from each list.

public void ack(Object msgId) {
    messages.remove(msgId);
    failCounterMessages.remove(msgId);
}

The fail method decides whether to resend a transaction message or fail if it has failed too many times.

Warning

If you are using an all grouping in your topology and any instance of the bolt fails, the fail method of the spout will be called as well.

public void fail(Object msgId) {
    Integer transactionId = (Integer) msgId;
    // Check the number of times the transaction has failed
    Integer failures = transactionFailureCount.get(transactionId) + 1;

    if(fails >= MAX_FAILS){
        // If the number of failures is too high, terminate the topology
        throw new RuntimeException("Error, transaction id ["+
        transactionId+"] has had too many errors ["+failures+"]");
    }

    // If the number of failures is less than the maximum, save the number and re-send the message
    transactionFailureCount.put(transactionId, failures);
    toSend.put(transactionId,messages.get(transactionId));
    LOG.info("Re-sending message ["+msgId+"]");
}

First, check the number of times the transaction has failed. If a transaction fails too many times, throw a RuntimeException to terminate the worker where it is running. Otherwise, save the failure count and put the transaction message in the toSend queue so that it will be resent when nextTuple is called.

Warning

Storm nodes do not maintain state, so if you store information in memory (as in this example) and the node goes down, you will lose all stored information.

Storm is a fast-fail system. If an exception is thrown, the topology will go down, but Storm will restart the process in a consistent state so that it can recover correctly.

Getting Data

Here you’ll take a look at some common techniques for designing spouts that collect data efficiently from multiple sources.

Direct Connection

In a direct connection architecture, the spout connects directly to a message emitter (see Figure 4-1).

Direct connection spout

Figure 4-1. Direct connection spout

This architecture is simple to implement, particularly when the message emitter is a well-known device or a well-known device group. A well-known device is one that is known at startup and remains the same throughout the life of the topology. An unknown device is one that is added after the topology is already running. A well-known device group is one in which all devices in the group are known at start time.

As an example, create a spout to read the Twitter stream using the Twitter streaming API. The spout will connect directly to the API, which serves as the message emitter. Filter the stream to get all public tweets that match the track parameter (as documented on the Twitter dev page). The complete example can be found at Twitter Example github page.

The spout gets the connection parameters from the configuration object (track, user, and password) and creates a connection to the API (in this case, using the DefaultHttpClient from Apache). It reads the connection one line at a time, parses the line from JSON format into a Java object, and emits it.

public void nextTuple() {
  //Create the client call
  client = new DefaultHttpClient();
  client.setCredentialsProvider(credentialProvider);
  HttpGet get = new HttpGet(STREAMING_API_URL+track);
  HttpResponse response;
  try {
   //Execute
   response = client.execute(get);
   StatusLine status = response.getStatusLine();
   if(status.getStatusCode() == 200){
    InputStream inputStream = response.getEntity().getContent();
    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
    String in;
    //Read line by line
    while((in = reader.readLine())!=null){
     try{
      //Parse and emit
      Object json = jsonParser.parse(in);
      collector.emit(new Values(track,json));
     }catch (ParseException e) {
      LOG.error("Error parsing message from twitter",e);
     }
    }
   }
  } catch (IOException e) {
   LOG.error("Error in communication with twitter api ["+get.getURI().toString()+"], sleeping 10s");
   try {
    Thread.sleep(10000);
   } catch (InterruptedException e1) {
   }
  } 
 }
      

Tip

Here you are locking the nextTuple method, so you never execute the ack and fail methods. In a real application, we recommend that you do the locking into a separate thread and use an internal queue to exchange information (you’ll learn how to do that in the next example, Enqueued Messages).

This is great!

You’re reading the Twitter stream with a single spout. If you parallelize the topology, you’ll have several spouts reading different partitions of the same stream, which doesn’t make sense. So how do you parallelize processing if you have several streams to read? One interesting feature of Storm is that you can access the TopologyContext from any component (spouts/bolts). Using this feature, you can divide the streams between your spout instances.

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {

       //Get the spout size from the context
        int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size();

        //Get the id of this spout
        int myIdx = context.getThisTaskIndex();

        String[] tracks = ((String) conf.get("track")).split(",");
        StringBuffer tracksBuffer = new StringBuffer();
        for(int i=0; i< tracks.length;i++){

            //Check if this spout must read the track word
            if( i % spoutsSize == myIdx){
                tracksBuffer.append(",");
                tracksBuffer.append(tracks[i]);
            }
        }
        if(tracksBuffer.length() == 0) {
            throw new RuntimeException("No track found for spout" +
                    " [spoutsSize:"+spoutsSize+", tracks:"+tracks.length+"] the amount" +
                    " of tracks must be more then the spout paralellism");
        this.track =tracksBuffer.substring(1).toString();
        }
        ...

   }

Using this technique, you can distribute collectors evenly across data sources. The same technique can be applied in other situations—for example, for collecting log files from web servers. See Figure 4-2.

Direct connection hashing

Figure 4-2. Direct connection hashing

In the previous example, you connected the spout to a well-known device. You can use the same approach to connect to unknown devices using a coordinating system to maintain the device list. The coordinator detects changes to the list and creates and destroys connections. For example, when collecting log files from web servers, the list of web servers may change over time. When a web server is added, the coordinator detects the change and creates a new spout for it. See Figure 4-3.

Direct connection coordinator

Figure 4-3. Direct connection coordinator

Tip

It’s recommended to create connections from spouts to message emitters, rather than the other way around. If the machine on which a spout is running goes down, Storm will restart it on another machine, so it’s easier for the spout to locate the message emitter than for the message emitter to keep track of which machine the spout is on.

Enqueued Messages

The second approach is to connect your spouts to a queue system that will receive the messages from the message emitters and will leave the messages available for consumption by the spouts. The advantage of using a queue system is that it can serve as middleware between the spouts and data source; in many cases, you can use the queue to be reliable using the capability of replay messages of many queue systems. This means you don’t need to know anything about message emitters, and the process of adding and removing emitters will be easier than with direct connection. The problem with this architecture is that the queue will be your point of failure, and you’ll be adding a new layer to your processing flow.

Figure 4-4 shows the architecture schema.

Using a queue system

Figure 4-4. Using a queue system

Tip

You can use round-robin pull or hashing queues (divide the queue messages by hash to send it to the spouts or create many queues) to parallelize the processing through queues, dividing the messages between many spouts.

You’ll create an example using Redis as your queue system and their Java library, Jedis. In this example, you’ll create a log processor to collect logs from an unknown source using the command lpush to insert messages into the queue and blpop to allow you to wait for a message. If you have many processes, using blpop will let you receive the messages in round-robin fashion.

To retrieve messages from Redis, you’ll use a thread created at the open spout (using a thread to avoid locking the main loop where the nextTuple method is):

    new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try{
                        Jedis client = new Jedis(redisHost, redisPort);
                        List<String> res = client.blpop(Integer.MAX_VALUE, queues);
                        messages.offer(res.get(1));
                    }catch(Exception e){
                        LOG.error("Error reading queues from redis",e);
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e1) {}
                    }
                }

            }
     }).start()

The only purpose of this thread is to create the connection and execute the blpop command. When a message is received, it is added to an internal queue of messages that will be consumed by the nextTuple method. Here you can see that the source is the Redis queue and you don’t know which are the message emitters nor their quantity.

Tip

We recommend that you not create many threads with spout, because each spout runs in a different thread. Instead of creating many threads, it is better to increase the parallelism. This will create more threads in a distributed fashion through the Storm cluster.

In your nextTuple method, the only thing that you’ll do is receive the messages and emit them again.

    public void nextTuple() {
        while(!messages.isEmpty()){
            collector.emit(new Values(messages.poll()));
        }
    }

Tip

You could transform this spout for the possibility of replaying messages from Redis to transform this topology into a reliable topology.

DRPC

DRPCSpout is a spout implementation that receives a function invocation stream from the DRPC server and processes it (see the example in Chapter 3). In the most common cases, using the backtype.storm.drpc.DRPCSpout will be enough, but it’s possible to create your own implementation using the DRPC classes included with the Storm package.

Conclusion

You’ve seen the common spout implementation patterns, their advantages, and how to make the messages reliable. It’s important to define spout communication based on the problem that you are working on. There is no one architecture that fits all topologies. If you know the sources or you can control these sources, then you can use a direct connection, while if you need the capacity to add unknown sources or receive messages from variety sources, it’s better to use a queued connection. If you need an online process, you will need to use DRPCSpouts or implement something similar.

Although you have learned the three main types of connections, there are infinite ways to do it depending on your needs.

Get Getting Started with Storm now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.