O'Reilly logo

Using Flume by Hari Shreedharan

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Chapter 4. Channels

Channels are buffers that sit in between sources and sinks. As such, channels allow sources and sinks to operate at different rates. Channels are key to Flume’s guarantees of not losing data (of course, when configured properly). Sources write data to one or more channels, which are read by one or more sinks. A sink can read only from one channel, while multiple sinks can read from the same channel for better performance. Channels have transactional semantics that allow Flume to provide explicit guarantees about the data written in a channel.

Having a channel operating as a buffer between sources and sinks has several advantages. The channel allows sources operating on the same channel to have their own threading models without being worried about the sinks reading from the channel, and vice versa. Having a buffer in between the sources and the sinks also allows them to operate at different rates, since the writes happen at the tail of the buffer and reads happen off the head. This also allows the Flume agents to handle “peak hour” loads from the sources, even if the sinks are unable to drain the channels immediately.

Channels allow multiple sources and sinks to operate on them. Channels are transactional in nature. Each write to a channel and each read from a channel happens within the context of a transaction. Only once a write transaction is committed will the events from that transaction be readable by any sinks. Also, if a sink has successfully taken an event, the event is not available for other sinks to take until and unless the sink rolls back the transaction.

Temporary load spikes are common in most real-time applications, and Flume is designed to handle these cases. The events will be buffered in the channel until the sinks remove them, allowing the agent to handle changes in incoming load. How much additional data each agent can handle depends on the capacity of the channel. The capacity of the channel should be allocated based on the expected combined maximum peak load of all sources writing to the channel and the combined rate of drain of all sinks. This design also allows the sources and sinks to have retry logic on failure. On failure, sources can reattempt writing to the channel and sinks can reattempt reads.

Transaction Workflow

As discussed in “Transactions in Flume Channels”, Flume channels are transactional. Transactions are essentially batches of events written into a channel atomically. Either all or none of the events in the batch are present in the channel. Transactions are important to provide guarantees of exactly when an event is written to or removed from the channel. For example, a sink could take an event from the channel and attempt to write it to HDFS and fail. In this case, the event should go back to the channel and be available for this sink or another one to take and write to HDFS.

Making sure that takes cause events to be removed only on transaction commit guarantees that events are not lost even if the write fails once, at which point the sink can just roll back this transaction. Transactions could have one event or many events, but for performance reasons it is always recommended to have a reasonably large number of events per transaction.

It is important to batch writes to channels, especially durable channels. Durable channels guarantee no data loss even in the event of agent or machine restarts, so they have to flush and sync all buffered event data to disk during a transaction commit, which occurs once per batch. Syncing to disk is an expensive and time-consuming operation and should be done only when a reasonably large chunk of data has been written to the page cache. Also, time taken to sync to disk includes the nontrivial cost of making a system call before the actual sync, which adds up over time. Each such batch is also represented by a transaction, making transactions important for performance as well as reliability.

Each channel can have several sources and sinks, respectively writing to and reading from the channel. Sources and sinks operate in slightly different ways with respect to transactions. Sources do not directly work with transactions; instead, a source’s channel processor handles the transactions on its behalf. The way the channel processor works with transactions is almost identical to the way sinks do (except that sinks take from the channels, while channel processors put data into the channels).

The sink initiates a transaction with the channel by calling the channel’s getTransaction method, which returns an instance of a Transaction. The sink then calls begin on the transaction object, which allows the channel to set up any internal state it requires for the transaction. Usually, this includes creation of queues for temporarily holding the events until the transaction is completed.

Once the transaction is started, the sink calls take (put in the case of channel processors) on the channel, until the sink is ready to commit the transaction. Once a sink takes an event, it will not be available for the same or another sink to take unless the transaction is rolled back.

Sinks (and channel processors) usually batch several events into one transaction for performance reasons. Once the sink has completed its batch, the sink calls commit on the transaction. Once a sink-side transaction (a transaction with only takes) is committed, the events in that transaction are marked as deleted by the channel and will not be available to any sink again. Once a source-side transaction (owned by the channel processor) is committed, the events are safely in the channel. This means that the events will be deleted from the channel only when a sink takes the events and commits.

Be aware that if a sink has taken an event, that event is no longer available to any sink unless and until this sink rolls back the transaction, causing the events in it to be available for takes again. This is specifically designed to avoid duplicates when multiple sinks operate on the same channel. Every event in the channel can be taken and committed exactly once, after which the event is removed from the channel.

Depending on the specific channel used, the events may be available in the channel even if the machine or the JVM restarts. It is also likely that the sink may have failed to write all the events to wherever it was supposed to and hence has to retry. In this case, the sink rolls back the entire transaction using the rollback method in the transaction. Once a transaction is rolled back on the sink side, the channel restores the events to the channel and makes them available for sinks to take. In the case of a source-side transaction rollback, it is as if the transaction never happened, and the events written during that transaction are not written into the channel. Rollbacks could potentially cause duplicates when they are caused by timeouts or other failures where the events may have been committed to the next hop’s channel.

After the transaction is committed or rolled back, it is closed by calling the close method to clear up any resources to be used by the transaction. Figure 4-1 illustrates the workflow for a transaction.

TxnWorkFlow
Figure 4-1. Transaction workflow

A single transaction cannot put and take events. This ensures that sources can only put events into the channel and sinks can only take events from the channel.

Channels Bundled with Flume

Flume comes bundled with two channels: the Memory Channel and the File Channel. Both channels work on the same basic principles explained here. Both channels are fully thread-safe and can operate with multiple sources and sinks. As the names imply, the Memory Channel stores committed events in main memory while the File Channel writes out events to files on disk. In this section, we will discuss both of these channels and the different factors to consider while selecting between the two.

Memory Channel

The Memory Channel is an in-memory channel that stores events written to it on the heap. For all practical purposes, the Memory Channel is an in-memory queue—the sources write to its tail and sinks read off its head. The Memory Channel supports very high throughput, as it holds all data in memory. As mentioned earlier, the channel is thread-safe and can handle writes from several sources and reads from several sinks at the same time. The Memory Channel should be used when data loss is not a concern, since the channel does not persist the data to disk. If data loss is a concern, then the Memory Channel should not be used, since process death or machine crashes or restarts can cause data to be lost.

The Memory Channel supports Flume’s transactional model and maintains separate queues for each transaction in progress. Once a source-side transaction is committed, the events in the queue for that transaction are moved to the channel’s main queue atomically. If the commit successfully completes, the events in the transaction will be available for the sinks to take. If it fails, the transaction has to be rolled back by the source and the events will be discarded. For sink-side transactions, the events are moved to the transaction’s queue each time the sink does a take. This ensures that exactly one sink “takes” an event. When the sink commits the transaction, the transaction queue is discarded and the events are dereferenced, to be garbage-collected. Therefore, the sink implementation must be careful to commit the transaction if and only if the events have been successfully written to the destination.

If the transaction fails, the events are reinserted at the head of the channel in the reverse order, so the events are available to be “taken” again in the same order as they were originally inserted. In this way, although Flume does not guarantee ordering, the Memory Channel does make the events available for takes in the order they were written. However, when certain transactions are rolled back, it is possible that events written after the events in those transactions will get written out to their destination earlier (since another sink may have committed a transaction containing events that are “newer” than the ones in the rolled-back transactions).

The Memory Channel can be configured without much effort and is one of the easiest Flume components to configure. Table 4-1 lists the configuration parameters for the channel.

Table 4-1. Memory Channel configuration
Config parameter Default value Description

type

-

The alias for the Memory Channel is memory. The FQCN is org.apache.flume.channel.MemoryChannel (case sensitive).

capacity

100

The maximum number of committed events the channel can hold.

transactionCapacity

100

The maximum number of events that can be put or taken in a single transaction.

byteCapacity

80% of the total amount of heap space available to the process

The maximum amount of heap space (in bytes) this channel is allowed to use.

byteCapacityBufferPercentage

20

The percent of byteCapacity to consider keeping as a buffer between the byte capacity of the channel and the total size of the bodies of all events currently in the channel.

keep-alive

3

The maximum period of time (in seconds) each put or take should wait for completion.

The Memory Channel holds all events in memory—therefore, the channel’s capacity is limited and is defined by the capacity parameter. This parameter defines the total number of committed events that the channel can hold at any given time. The difference between the total number of events committed into the channel and the number of events taken out of the channel (and committed) at any time should be less than or equal to the capacity of the channel. If the channel is at capacity, any more attempts to insert events into the channel will fail with a ChannelException until at least an equivalent number of events are taken from the channel.

The maximum number of events that can be put or taken in a transaction is controlled by the transactionCapacity parameter. This parameter is also a good defense against rogue clients pushing a huge number of events to a source, causing the agent to run out of memory. This parameter forces batches to be of limited size and thus limits the number of events per RPC call, and is a simple defense against denial of service (DoS) attacks.

The total amount of memory that the events in the channel use can be restricted by the byteCapacity parameter. The byteCapacityBufferPercentage parameter represents the percentage of the byte capacity that is reserved for the event headers. When an event is about to be committed to the channel, the event is inserted into the channel if and only if the combined size of the bodies of all events in the channel plus the size of the body of the current event is less than or equal to the amount of memory available that is not reserved for the event headers.

When an event is about to be inserted into the channel, it is possible that the channel is full at that time. In this case, the thread inserting the events will wait for a maximum of keep-alive seconds before failing. The thread taking events from the channel will also wait for keep-alive seconds for an event to be available in the channel. In most cases, there should be no need to set this parameter. It is a safety valve to throttle the write or read rate. Throttling the rate is useful when the take rate is much faster than the put rate, or vice versa.

The following configuration shows a Memory Channel configured to hold up to 100,000 events, with each transaction being able to hold up to 1,000 events. The total memory occupied by all events in the channel can be a maximum of approximately 5 GB of space. Of this 5 GB, the channel considers 10% to be reserved for event headers (as defined by the byteCapacityBufferPercentage parameter), making 4.5 GB available for event bodies:

agent.channels = mc
agent.sources = sq

agent.channels.mc.type = memory
agent.channels.mc.capacity = 100000
agent.channels.mc.transactionCapacity = 1000
agent.channels.mc.byteCapacity = 5000000000
agent.channels.mc.byteCapacityBufferPercentage = 10

agent.sources.sq.type = seq
agent.sources.sq.channels = mc

File Channel

The File Channel is Flume’s persistent channel. It writes out all events to disk and thus does not lose data on process or machine shutdown or crash. The File Channel ensures that any events committed into the channel are removed from the channel only when a sink takes the events and commits the transaction, even if the machine or agent crashed and was restarted. It is designed to be highly concurrent and to handle several sources and sinks at the same time. The File Channel’s design is roughly based on the paper about log-structured file systems by Rosenblum and Ousterhout [lfs-paper]. The design is discussed in more detail later in this section.

The File Channel is designed to be used in situations where data durability is required and data loss cannot be tolerated. Since the channel writes data to disk, it does not lose data on crash or failure. An additional bonus due to the fact that it writes data to disk is that the channel can have a very large capacity, especially compared to the Memory Channel.

As long as disk space is available, the File Channel can have an extremely large capacity, up to tens or hundreds of millions of events. This is especially useful when it is expected that the sinks taking from the channel will not be able to keep up with a limited peak period, and a large backlog of events is possible. The File Channel as a result can also handle much longer downstream downtimes, if configured correctly. Since the channel does not keep the events in memory once they’ve been committed, it requires much less heap space than a Memory Channel of equivalent capacity.

The File Channel guarantees that every event written to it will be available through agent and machine failures or restarts. It does this by writing out every event put to the channel to disk. Once a transaction is committed, the events in that transaction are made available for takes. The events are read from disk and passed to the sink when they are taken from the channel, and are completely dereferenced and eligible for removal once the take transaction is committed. More details on the implementation will be discussed later in this section.

The File Channel allows the user to configure the use of multiple disks by having them mounted at different mountpoints. When configured to use multiple disks, the channel round-robins between the disks, thus allowing the channel to perform better when more disks are available to it. It is recommended (though not required) to use a separate disk for the File Channel checkpoint. The checkpoint reflects the exact state of the channel at the instant the checkpoint was written out. The File Channel uses the checkpoint to restart quickly without having to read all the data files. It writes out the checkpoint to disk periodically while it is in operation. On restart, the channel loads the last checkpoint written out and only replays the puts and takes that happened after this checkpoint, allowing the channel to start up quickly and be ready for normal operation. The interval between two consecutive checkpoints is set to 30 seconds by default, though it is configurable.

The File Channel allows users to pass in several configuration parameters, allowing them to fine-tune the channel’s performance based on the hardware. The configuration parameters for the File Channel are described in Table 4-2.

Table 4-2. File Channel configuration
Config parameter Default value Description

type

-

The alias for the File Channel is file. The FQCN is org.apache.flume.channel.file.FileChannel (case sensitive).

capacity

1000000

The maximum number of committed events the channel can hold.

transactionCapacity

1000

The maximum number of events that can be put or taken in a single transaction.

checkpointDir

~/flume/filechannel/checkpoint

The directory to which the channel should write out the checkpoint.

dataDirs

~/flume/filechannel/data

A comma-separated list of directories to use to write the events to. Configuring multiple directories, each mounting a different disk, can dramatically improve performance by writing to the disks in parallel.

useDualCheckpoints

false

Tells the channel whether to back up the checkpoint once it has completely been written out. This must be either true or false. If this is set to true, backupCheckpointDir must be set.

backupCheckpointDir

-

The directory to back up the checkpoint to. If the primary checkpoint is corrupt or incomplete, the channel can recover from the backup, thus avoiding a full replay of the data files. This parameter should point to a directory different to checkpointDir.

checkpointInterval

30

The time period (in seconds) between consecutive checkpoints.

maxFileSize

1623195647

The maximum size (in bytes) of each data file. Once the file reaches this size (or will reach it once the next event is written to it), the file is rolled and a new data file is created in that directory. If this value is set to higher than the default value, the channel still uses the default as the maximum value.

minimumRequiredSpace

524288000

The minimum amount of space (in bytes) required on each volume for the channel to continue operation. If any one of the volumes on which the data directories is mounted has only this much space remaining, the channel will stop operation to prevent corruption and avoid incomplete data being written out. The minimum possible value for this parameter is 1048576 (1 MB).

keep-alive

3

The maximum period of time (in seconds) each put or take should wait for completion.

The File Channel, being Flume’s main persistent channel, often dictates the performance of the agent as a whole. It is possible to fine-tune several aspects of the channel through configuration. The File Channel has capacity and transactionCapacity parameters, which are exactly the same as those of the Memory Channel, though they default to higher values of 1000000 and 1000, respectively.

As discussed earlier, the File Channel can write data to multiple disks, though the channel is not directly aware of this. Different disks can be mounted at different mountpoints and the channel can be configured to write data in a round-robin fashion to these directories. The channel will always append to exactly one file per data directory, though it will read from all of the files as required. Since multiple sources can write to the channel (and each source can write from multiple threads), the channel will write to different data directories in parallel from different threads (each source and sink runs at least one thread, and in many cases, like the Avro Source, they may run more than one thread), thus parallelizing disk usage, resulting in better performance. Therefore, if you give the File Channel more disks to work with, it is likely that the performance of the channel will improve.

The File Channel takes a comma-separated list of data directories as the value of the dataDirs parameter. The default value should only be used for testing and is not recommended for production use. Even for a single disk or a limited number of disks, multiple data directories can be used per disk for better performance, though the number of directories that can be used such that the performance improves will vary by disk.

The File Channel writes out a checkpoint periodically to make restart or recovery faster. The checkpoint is written to the directory specified as the value of the checkpointDir parameter. If the channel is stopped while it is checkpointing, the checkpoint may be incomplete or corrupt. A corrupt or incomplete checkpoint could make the restart of the channel extremely slow, as the channel would need to read and replay all data files.

To avoid this problem, it is recommended that the useDualCheckpoints parameter be set to true and that backupCheckpointDir be set. It is recommended that this directory be on a different disk than the one where the original checkpoint is stored. When these parameters are set, Flume will back up the checkpoint to the backupCheckpointDir as soon as it is completed. This ensures that once the channel has been in operation for a brief period of time (enough time for the first checkpoint to be written out and backed up), it will be able to restart from a checkpoint, even if the newest one is corrupt or incomplete, reducing the restart time drastically. The time period between consecutive checkpoints is controlled by the checkpointInterval parameter.

The remaining parameters are meant to fine-tune the File Channel’s performance and disk usage. Flume appends each event to the end of a data file and retains that file as long as the file contains events that have not yet been taken and committed, or is still being written to. The maximum size the file should grow to before Flume rolls it and considers it read-only is controlled by the maxFileSize parameter. This parameter defaults to the equivalent of about 1.6 GB, which is also the maximum value for this parameter. If this is set to higher than the default value, the file will still be rolled once it reaches the default size. It must be noted that each file is deleted if and only if all of the events written to the channel are taken and committed (in fact, the file is deleted only at the time of the checkpoint following the last event being taken and committed). If the files are to be deleted sooner, this parameter should be set to a lower value, so that all events get taken out from individual files faster (since the files are smaller, they will contain fewer events than a larger file). Keeping this value too small can lead to too many files being created on the disks being used, though, so it is better to not reduce this value from the default. The channel is also conservative with regard to deletion of files. It will always retain two files per data directory, even if the files do not have any events to be taken and committed. The channel will also not delete the file currently being written to.

To ensure that the channel does not write to disks with low disk space, the minimumRequiredSpace parameter can be configured. Once the disk space on a specific disk goes down to the value set by this parameter (or 1 MB, whichever is higher), the channel ceases operation. To conserve system resources and not affect performance, the channel does not check the disk space on every write, but does it periodically, maintaining counters internally to calculate the space available on disk. This makes the assumption that no other channel or process is writing to this disk, which would make the available disk space decrease faster.

As discussed earlier, the File Channel writes a checkpoint out to disk periodically. As the checkpoint reflects the state of the channel at the time it was written, normal operations cannot be performed while the checkpoint is being written. The keep-alive parameter works similarly to the one in the Memory Channel, specifying the time to wait for capacity to be available or the checkpoint to complete for an event to be put or an event to be available for taking from the channel.

Typically, the channel performs better when more disks are available to it. If enough disk space is available, the channel should be configured to a capacity high enough to accommodate downstream failures or traffic spikes. Based on the expected throughput per channel and expected maximum downtime of downstream agents or destinations, the channel capacity can be configured to handle big backlogs. The channel is fast enough to be able to clear off tens of millions of events within a few minutes, though performance to a large extent will depend on the underlying hardware. 

The following configuration file shows a File Channel named fc configured to be able to hold one million events. This channel stores data to three disks, writing to them in round-robin order. The channel is also configured to back up the checkpoint to a different directory to recover from failure quickly. It is configured to support transactions of up to 10,000 events. To garbage-collect files faster, the channel also sets the maximum file size of the data files to approximately 900 MB:

agent.channels = fc
agent.sources = sq

agent.channels.fc.type = file
agent.channels.fc.capacity = 1000000
agent.channels.fc.transactionCapacity = 10000
agent.channels.fc.checkpointDir = /data1/fc/checkpoint
agent.channels.fc.dataDirs = /data1/fc/data,/data2/fc/data,/data3/fc/data
agent.channels.fc.useDualCheckpoints = true
agent.channels.fc.backupCheckpointDir = /data4/fc/backup
agent.channels.fc.maxFileSize = 900000000

agent.sources.sq.type = seq
agent.sources.sq.channels = fc

Best Practices with the File Channel

You can have the File Channel write to multiple disks by specifying several data directories in the configuration. Adding more disks directly improves performance, since the File Channel round-robins writes between disks.

The File Channel can back up the checkpoint and start from the backup if the checkpoint itself is incomplete or corrupt (this might happen if the machine or agent crashes while the channel is checkpointing). Enabling checkpoint backup allows the channel to restart fast even if the checkpoint itself is corrupt.

The File Channel can lose data if the disk it is writing to fails. Even if just one of the many disks the channel is writing data to fails, the channel may not be able to recover any of the data—even events that are on disks that have not failed. To avoid such a situation, it is a good idea to used RAID-ed disks with the File Channel.

Using NFS-mounted disks with the File Channel is not a good idea, since NFS does not provide the same guarantees as local disks. Specifically, the fsync system call does not guarantee that data is persisted onto a physical disk, so it is possible that the File Channel data might be lost if the machine or agent crashes.

Design and implementation of the File Channel*

As discussed previously, the File Channel persists every event to disk and ensures that the events are available even in the event of agent or machine crashes and restarts. The File Channel also persists every operation that is performed to disk. This means the channel can replay each record in the same order as it happened to get itself back to the same state it was in when the channel shut down. When the channel has completed replaying the records, it is ready for normal operation. In this section, we will take a more detailed look at the internals of the File Channel.

The File Channel maintains two separate data structures: the Flume event queue (which will be referred to as the queue) and the write-ahead log (WAL). Every put, take, commit, and rollback is represented by a transaction event record (referred to from here on as a record), with the type of the record representing the operation—put, take, commit, or rollback. Each File Channel operation is recorded in the WAL as a record. Each time an event is put into the channel, a put record is written to the WAL, even if the transaction does not actually get committed.

Similarly, for a take, a take record is written out. Each record has a unique, monotonically increasing ID, the write ID, which is recorded when the record is written to the WAL. Each record also contains the unique ID of the transaction that the record is a part of. Since each put (or take) record contains the transaction ID, it is possible to figure out which events were committed and which ones rolled back by mapping this transaction to the corresponding commit or rollback record.

By reading the WAL and performing each operation in the order it actually happened (which can be inferred from the write IDs of the operations), we can reconstruct the file state of the channel at any point. When the channel is reconstructed by reading all data files fully, it is called a full replay. Full replays are often time-consuming and disruptive, as all data files have to be read and every operation in the WAL has to be processed. This is especially true when the WAL contains millions of put records and take records—even if the final state contains very few events, each record has to be read and stored in memory until we read the corresponding commit or rollback record.

Each time a put happens, a put record is written to disk. Using the file ID and offset of the record in the file, the channel constructs a unique Flume event pointer for that record. Each time an event is put into the channel, the pointer representing the record is stored in an in-memory queue local to the transaction. When the transaction is committed, the pointers from the local queue are copied to the tail of the File Channel’s main queue: the Flume event queue. That queue, therefore, represents the current state of the channel at any point in time.

When a sink takes an event from the channel, the head of the queue is removed and the pointer is dereferenced. The event is then stored in a queue local to the transaction. On commit, the local queue is discarded since the events are completely removed. On rollback, the events are pushed back into the queue. The queue is actually a memory-mapped file—the mapped buffer is updated and pushed to disk during a checkpoint.

At startup, Flume begins a process called replay to get the channel back to the exact state it was in when it was previously stopped. The queue is loaded back into memory at the time of startup by simply memory-mapping the checkpoint file. The data files are then read from the offset at the time of the checkpoint (this offset is recorded in the data files’ metadata at the time of the checkpoint), and the puts and takes are applied to the queue in order.

All pointers representing the current incomplete transactions (called inflights) are also written to disk so that any events taken out but not yet committed at the time of the checkpoint can be reinserted into the queue after loading the checkpoint. Put transactions that were still in progress at the time of the checkpoint are also recovered using the inflight files. (There may be events in data files that were written before the checkpoint but committed after. They are not inserted into the queue at the time of the checkpoint and thus are not replayed from the data files, as the data files are replayed only from their offset at the time of the checkpoint.)

Once the replay is completed, the channel is ready for normal operation. When no queue is present or when it is incomplete, the channel does a full replay. As discussed earlier, the channel can back up each checkpoint immediately after it is completed, so that if the current checkpoint is corrupted, or the agent is killed before it is complete, the previous one can be loaded to avoid a full replay.

Summary

In this chapter we discussed channels, which are buffers that sit between sources and sinks and hold the data brought into a Flume agent by a source, until it is removed by a sink. Channels can be in memory or on disk, with the in-memory Memory Channel giving better performance while the on-disk File Channel guarantees durability through agent and machine restarts by persisting all operations and data to disk.

In the next chapter, we will discuss how sinks are designed and the various sinks that come packaged with Flume. We will also look at how to write custom sinks.

References

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required