In this chapter, you’ll take a look at the most commonly used strategies for designing the entry point for a topology (a spout) and how to make spouts fault-tolerant.
When designing a topology, one important thing to keep in mind is message reliability. If a message can’t be processed, you need to decide what to do with the individual message and what to do with the topology as a whole. For example, when processing bank deposits, it is important not to lose a single transaction message. But if you’re processing millions of tweets looking for some statistical metric, and one tweet gets lost, you can assume that the metric will still be fairly accurate.
In Storm, it is the author’s responsibility to guarantee message reliability according to the needs of each topology. This involves a trade-off. A reliable topology must manage lost messages, which requires more resources. A less reliable topology may lose some messages, but is less resource-intensive. Whatever the chosen reliability strategy, Storm provides the tools to implement it.
To manage reliability at the spout, you can include a message ID
with the tuple at emit time (collector.emit(new Values(…),tupleId)
). The
methods ack
and fail
are called when a tuple is processed
correctly or fails respectively. Tuple processing succeeds when the tuple
is processed by all target bolts and all anchored bolts (you will learn
how to anchor a bolt to a tuple in the Chapter 5).
Tuple processing fails when:
collector.fail(tuple)
is called by the target spoutprocessing time exceeds the configured timeout
Let’s take a look at an example. Imagine you are processing bank transactions, and you have the following requirements:
If a transaction fails, resend the message.
If the transaction fails too many times, terminate the topology.
You’ll create a spout that sends 100 random transaction IDs, and a
bolt that fails for 80% of tuples received (you can find the complete
example at ch04-spout
examples). You’ll implement the spout using a Map
to emit transaction message tuples so that
it’s easy to resend messages.
public
void
nextTuple
()
{
if
(!
toSend
.
isEmpty
()){
for
(
Map
.
Entry
<
Integer
,
String
>
transactionEntry
:
toSend
.
entrySet
()){
Integer
transactionId
=
transactionEntry
.
getKey
();
String
transactionMessage
=
transactionEntry
.
getValue
();
collector
.
emit
(
new
Values
(
transactionMessage
),
transactionId
);
}
toSend
.
clear
();
}
}
If there are messages waiting to be sent, get each transaction
message and its associated ID and emit them as a tuple, then clear the
message queue. Note that it’s safe to call clear
on the map, because nextTuple
, fail
, and ack
are the only methods that modify the map, and they all run in the same
thread.
Maintain two maps to keep track of transaction messages waiting to
be sent, and the number of times each transaction has failed. The ack
method simply removes the transaction
message from each list.
public
void
ack
(
Object
msgId
)
{
messages
.
remove
(
msgId
);
failCounterMessages
.
remove
(
msgId
);
}
The fail
method decides whether
to resend a transaction message or fail if it has failed too many
times.
Warning
If you are using an all grouping in your topology and any instance
of the bolt fails, the fail
method of
the spout will be called as well.
public
void
fail
(
Object
msgId
)
{
Integer
transactionId
=
(
Integer
)
msgId
;
// Check the number of times the transaction has failed
Integer
failures
=
transactionFailureCount
.
get
(
transactionId
)
+
1
;
if
(
fails
>=
MAX_FAILS
){
// If the number of failures is too high, terminate the topology
throw
new
RuntimeException
(
"Error, transaction id ["
+
transactionId
+
"] has had too many errors ["
+
failures
+
"]"
);
}
// If the number of failures is less than the maximum, save the number and re-send the message
transactionFailureCount
.
put
(
transactionId
,
failures
);
toSend
.
put
(
transactionId
,
messages
.
get
(
transactionId
));
LOG
.
info
(
"Re-sending message ["
+
msgId
+
"]"
);
}
First, check the number of times the transaction has failed. If a
transaction fails too many times, throw a RuntimeException
to terminate the worker where
it is running. Otherwise, save the failure count and put the transaction
message in the toSend
queue so that it
will be resent when nextTuple
is
called.
Warning
Storm nodes do not maintain state, so if you store information in memory (as in this example) and the node goes down, you will lose all stored information.
Storm is a fast-fail system. If an exception is thrown, the topology will go down, but Storm will restart the process in a consistent state so that it can recover correctly.
Here you’ll take a look at some common techniques for designing spouts that collect data efficiently from multiple sources.
In a direct connection architecture, the spout connects directly to a message emitter (see Figure 4-1).
This architecture is simple to implement, particularly when the message emitter is a well-known device or a well-known device group. A well-known device is one that is known at startup and remains the same throughout the life of the topology. An unknown device is one that is added after the topology is already running. A well-known device group is one in which all devices in the group are known at start time.
As an example, create a spout to read the Twitter stream using the
Twitter
streaming API. The spout will connect directly to the API, which
serves as the message emitter. Filter the stream to get all public
tweets that match the track
parameter
(as documented on the Twitter dev page). The complete example can be
found at Twitter
Example github page.
The spout gets the connection parameters from the configuration
object (track
, user
, and password
) and creates a connection to the API
(in this case, using the DefaultHttpClient
from Apache). It reads the
connection one line at a time, parses the line from JSON format into a
Java object, and emits it.
public
void
nextTuple
()
{
//Create the client call
client
=
new
DefaultHttpClient
();
client
.
setCredentialsProvider
(
credentialProvider
);
HttpGet
get
=
new
HttpGet
(
STREAMING_API_URL
+
track
);
HttpResponse
response
;
try
{
//Execute
response
=
client
.
execute
(
get
);
StatusLine
status
=
response
.
getStatusLine
();
if
(
status
.
getStatusCode
()
==
200
){
InputStream
inputStream
=
response
.
getEntity
().
getContent
();
BufferedReader
reader
=
new
BufferedReader
(
new
InputStreamReader
(
inputStream
));
String
in
;
//Read line by line
while
((
in
=
reader
.
readLine
())!=
null
){
try
{
//Parse and emit
Object
json
=
jsonParser
.
parse
(
in
);
collector
.
emit
(
new
Values
(
track
,
json
));
}
catch
(
ParseException
e
)
{
LOG
.
error
(
"Error parsing message from twitter"
,
e
);
}
}
}
}
catch
(
IOException
e
)
{
LOG
.
error
(
"Error in communication with twitter api ["
+
get
.
getURI
().
toString
()+
"], sleeping 10s"
);
try
{
Thread
.
sleep
(
10000
);
}
catch
(
InterruptedException
e1
)
{
}
}
}
Tip
Here you are locking the nextTuple
method, so you never execute the
ack
and fail
methods. In a real application, we
recommend that you do the locking into a separate thread and use an
internal queue to exchange information (you’ll learn how to do that in
the next example, Enqueued Messages).
This is great!
You’re reading the Twitter stream with a single spout. If you
parallelize the topology, you’ll have several spouts reading different
partitions of the same stream, which doesn’t make sense. So how do you
parallelize processing if you have several streams to read? One
interesting feature of Storm is that you can access the TopologyContext
from any component
(spouts/bolts). Using this feature, you can divide the streams between
your spout instances.
public
void
open
(
Map
conf
,
TopologyContext
context
,
SpoutOutputCollector
collector
)
{
//Get the spout size from the context
int
spoutsSize
=
context
.
getComponentTasks
(
context
.
getThisComponentId
()).
size
();
//Get the id of this spout
int
myIdx
=
context
.
getThisTaskIndex
();
String
[]
tracks
=
((
String
)
conf
.
get
(
"track"
)).
split
(
","
);
StringBuffer
tracksBuffer
=
new
StringBuffer
();
for
(
int
i
=
0
;
i
<
tracks
.
length
;
i
++){
//Check if this spout must read the track word
if
(
i
%
spoutsSize
==
myIdx
){
tracksBuffer
.
append
(
","
);
tracksBuffer
.
append
(
tracks
[
i
]);
}
}
if
(
tracksBuffer
.
length
()
==
0
)
{
throw
new
RuntimeException
(
"No track found for spout"
+
" [spoutsSize:"
+
spoutsSize
+
", tracks:"
+
tracks
.
length
+
"] the amount"
+
" of tracks must be more then the spout paralellism"
);
this
.
track
=
tracksBuffer
.
substring
(
1
).
toString
();
}
...
}
Using this technique, you can distribute collectors evenly across data sources. The same technique can be applied in other situations—for example, for collecting log files from web servers. See Figure 4-2.
In the previous example, you connected the spout to a well-known device. You can use the same approach to connect to unknown devices using a coordinating system to maintain the device list. The coordinator detects changes to the list and creates and destroys connections. For example, when collecting log files from web servers, the list of web servers may change over time. When a web server is added, the coordinator detects the change and creates a new spout for it. See Figure 4-3.
Tip
It’s recommended to create connections from spouts to message emitters, rather than the other way around. If the machine on which a spout is running goes down, Storm will restart it on another machine, so it’s easier for the spout to locate the message emitter than for the message emitter to keep track of which machine the spout is on.
The second approach is to connect your spouts to a queue system that will receive the messages from the message emitters and will leave the messages available for consumption by the spouts. The advantage of using a queue system is that it can serve as middleware between the spouts and data source; in many cases, you can use the queue to be reliable using the capability of replay messages of many queue systems. This means you don’t need to know anything about message emitters, and the process of adding and removing emitters will be easier than with direct connection. The problem with this architecture is that the queue will be your point of failure, and you’ll be adding a new layer to your processing flow.
Figure 4-4 shows the architecture schema.
Tip
You can use round-robin pull or hashing queues (divide the queue messages by hash to send it to the spouts or create many queues) to parallelize the processing through queues, dividing the messages between many spouts.
You’ll create an example using Redis as your queue system and their Java
library, Jedis.
In this example, you’ll create a log processor to collect logs from an
unknown source using the command lpush
to insert messages into the queue and
blpop
to allow you to wait for a
message. If you have many processes, using blpop
will let you receive the messages in
round-robin fashion.
To retrieve messages from Redis, you’ll use a thread created at
the open
spout (using a thread to
avoid locking the main loop where the nextTuple
method is):
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
while
(
true
){
try
{
Jedis
client
=
new
Jedis
(
redisHost
,
redisPort
);
List
<
String
>
res
=
client
.
blpop
(
Integer
.
MAX_VALUE
,
queues
);
messages
.
offer
(
res
.
get
(
1
));
}
catch
(
Exception
e
){
LOG
.
error
(
"Error reading queues from redis"
,
e
);
try
{
Thread
.
sleep
(
100
);
}
catch
(
InterruptedException
e1
)
{}
}
}
}
}).
start
()
The only purpose of this thread is to create the connection and
execute the blpop
command. When a
message is received, it is added to an internal queue of messages that
will be consumed by the nextTuple
method. Here you can see that the source is the Redis queue and you
don’t know which are the message emitters nor their quantity.
Tip
We recommend that you not create many threads with spout, because each spout runs in a different thread. Instead of creating many threads, it is better to increase the parallelism. This will create more threads in a distributed fashion through the Storm cluster.
In your nextTuple
method, the
only thing that you’ll do is receive the messages and emit them
again.
public
void
nextTuple
()
{
while
(!
messages
.
isEmpty
()){
collector
.
emit
(
new
Values
(
messages
.
poll
()));
}
}
Tip
You could transform this spout for the possibility of replaying messages from Redis to transform this topology into a reliable topology.
DRPCSpout is a spout implementation that receives a function invocation stream from the DRPC server and processes it (see the example in Chapter 3). In the most common cases, using the backtype.storm.drpc.DRPCSpout will be enough, but it’s possible to create your own implementation using the DRPC classes included with the Storm package.
You’ve seen the common spout implementation patterns, their advantages, and how to make the messages reliable. It’s important to define spout communication based on the problem that you are working on. There is no one architecture that fits all topologies. If you know the sources or you can control these sources, then you can use a direct connection, while if you need the capacity to add unknown sources or receive messages from variety sources, it’s better to use a queued connection. If you need an online process, you will need to use DRPCSpouts or implement something similar.
Although you have learned the three main types of connections, there are infinite ways to do it depending on your needs.
Get Getting Started with Storm now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.