Chapter 4. Hadoop I/O
Hadoop comes with a set of primitives for data I/O. Some of these are techniques that are more general than Hadoop, such as data integrity and compression, but deserve special consideration when dealing with multiterabyte datasets. Others are Hadoop tools or APIs that form the building blocks for developing distributed systems, such as serialization frameworks and on-disk data structures.
Data Integrity
Users of Hadoop rightly expect that no data will be lost or corrupted during storage or processing. However, since every I/O operation on the disk or network carries with it a small chance of introducing errors into the data that it is reading or writing, when the volumes of data flowing through the system are as large as the ones Hadoop is capable of handling, the chance of data corruption occurring is high.
The usual way of detecting corrupted data is by computing a checksum for the data when it first enters the system, and then whenever it is transmitted across a channel that is unreliable and hence capable of corrupting the data. The data is deemed to be corrupt if the newly generated checksum doesn’t exactly match the original. This technique doesn’t offer any way to fix the data—merely error detection. (And this is a reason for not using low-end hardware; in particular, be sure to use ECC memory.) Note that it is possible that it’s the checksum that is corrupt, not the data, but this is very unlikely, since the checksum is much smaller than the data.
A commonly used error-detecting code is CRC-32 (cyclic redundancy check), which computes a 32-bit integer checksum for input of any size.
Data Integrity in HDFS
HDFS transparently checksums all data written to it and by
default verifies checksums when reading data. A separate checksum is
created for every io.bytes.per.checksum
bytes of data. The
default is 512 bytes, and since a CRC-32 checksum is 4 bytes long, the
storage overhead is less than 1%.
Datanodes are responsible for verifying the data they receive
before storing the data and its checksum. This applies to data that
they receive from clients and from other datanodes during replication.
A client writing data sends it to a pipeline of datanodes (as
explained in Chapter 3), and the last datanode in the
pipeline verifies the checksum. If it detects an error, the client
receives a ChecksumException
, a
subclass of IOException
.
When clients read data from datanodes, they verify checksums as well, comparing them with the ones stored at the datanode. Each datanode keeps a persistent log of checksum verifications, so it knows the last time each of its blocks was verified. When a client successfully verifies a block, it tells the datanode, which updates its log. Keeping statistics such as these is valuable in detecting bad disks.
Aside from block verification on client reads, each datanode
runs a DataBlockScanner
in a
background thread that periodically verifies all the blocks stored on
the datanode. This is to guard against corruption due to “bit rot” in
the physical storage media. See Datanode block scanner
for details on how to access the scanner reports.
Since HDFS stores replicas of blocks, it can “heal” corrupted
blocks by copying one of the good replicas to produce a new, uncorrupt
replica. The way this works is that if a client detects an error when
reading a block, it reports the bad block and the datanode it was
trying to read from to the namenode before throwing a ChecksumException
. The namenode marks the
block replica as corrupt, so it doesn’t direct clients to it, or try
to copy this replica to another datanode. It then schedules a copy of
the block to be replicated on another datanode, so its replication
factor is back at the expected level. Once this has happened, the
corrupt replica is deleted.
It is possible to disable verification of checksums by passing
false
to the setVerifyChecksum()
method on FileSystem
, before using the open()
method to read a file. The same
effect is possible from the shell by using the -ignoreCrc
option with the -get
or the equivalent -copyToLocal
command. This feature is useful
if you have a corrupt file that you want to inspect so you can decide
what to do with it. For example, you might want to see whether it can
be salvaged before you delete it.
LocalFileSystem
The Hadoop LocalFileSystem
performs client-side checksumming. This means that when you write a
file called filename, the
filesystem client transparently creates a hidden file, .filename.crc, in the same directory
containing the checksums for each chunk of the file. Like HDFS, the
chunk size is controlled by the io.bytes.per.checksum
property, which
defaults to 512 bytes. The chunk size is stored as metadata in the
.crc file, so the file can be
read back correctly even if the setting for the chunk size has
changed. Checksums are verified when the file is read, and if an error
is detected, LocalFileSystem
throws a ChecksumException
.
Checksums are fairly cheap to compute (in Java, they are
implemented in native code), typically adding a few percent overhead
to the time to read or write a file.
For most applications, this is an acceptable price to pay for data
integrity. It is, however, possible to disable checksums: the use case
here is when the underlying filesystem support checksums natively.
This is accomplished by using RawLocalFileSystem
in place of LocalFileSystem
. To do this globally in an
application, it suffices to remap the implementation for
file
URIs by setting the property fs.file.impl
to the value org.apache.hadoop.fs.RawLocalFileSystem
.
Alternatively, you can directly create a RawLocalFileSystem
instance, which may be
useful if you want to disable checksum verification for only some
reads; for example:
Configuration conf = ... FileSystem fs = new RawLocalFileSystem(); fs.initialize(null, conf);
ChecksumFileSystem
LocalFileSystem
uses ChecksumFileSystem
to do its work, and this
class makes it easy to add checksumming to other (nonchecksummed)
filesystems, as ChecksumFileSystem
is just a wrapper around FileSystem
. The general idiom is as
follows:
FileSystem rawFs = ... FileSystem checksummedFs = new ChecksumFileSystem(rawFs);
The underlying filesystem is called the
raw filesystem, and may be retrieved using the
getRawFileSystem()
method on
ChecksumFileSystem
. ChecksumFileSystem
has a few more useful
methods for working with checksums, such as getChecksumFile()
for getting the path of a
checksum file for any file. Check the documentation for the
others.
If an error is detected by ChecksumFileSystem
when reading a file, it
will call its reportChecksumFailure()
method. The
default implementation does nothing, but LocalFileSystem
moves the offending file and
its checksum to a side directory on the same device called bad_files. Administrators should
periodically check for these bad files and take action on
them.
Compression
File compression brings two major benefits: it reduces the space needed to store files, and it speeds up data transfer across the network, or to or from disk. When dealing with large volumes of data, both of these savings can be significant, so it pays to carefully consider how to use compression in Hadoop.
There are many different compression formats, tools and algorithms, each with different characteristics. Table 4-1 lists some of the more common ones that can be used with Hadoop.[30]
Compression format | Tool | Algorithm | Filename extension | Multiple files | Splittable |
DEFLATE[a] | N/A | DEFLATE | .deflate | No | No |
gzip | gzip | DEFLATE | .gz | No | No |
ZIP | zip | DEFLATE | .zip | Yes | Yes, at file boundaries |
bzip2 | bzip2 | bzip2 | .bz2 | No | Yes |
LZO | lzop | LZO | .lzo | No | No |
[a] DEFLATE is a compression algorithm whose standard implementation is zlib. There is no commonly available command-line tool for producing files in DEFLATE format, as gzip is normally used. (Note that the gzip file format is DEFLATE with extra headers and a footer.) The .deflate filename extension is a Hadoop convention. |
All compression algorithms exhibit a space/time trade-off: faster
compression and decompression speeds usually come at the expense of
smaller space savings. All of the tools listed in Table 4-1 give some control over this trade-off at
compression time by offering nine different options: –1
means optimize for speed and -9
means optimize for space. For example, the
following command creates a compressed file file.gz using the fastest compression
method:
gzip -1 file
The different tools have very different compression characteristics. Both gzip and ZIP are general-purpose compressors, and sit in the middle of the space/time trade-off. Bzip2 compresses more effectively than gzip or ZIP, but is slower. Bzip2’s decompression speed is faster than its compression speed, but it is still slower than the other formats. LZO, on the other hand, optimizes for speed: it is faster than gzip or ZIP (or any other compression or decompression tool[31]), but compresses slightly less effectively.
The “Splittable” column in Table 4-1 indicates whether the compression format supports splitting; that is, whether you can seek to any point in the stream and start reading from some point further on. Splittable compression formats are especially suitable for MapReduce; see Compression and Input Splits for further discussion.
Codecs
A codec is the implementation of a
compression-decompression algorithm. In Hadoop, a codec is represented
by an implementation of the CompressionCodec
interface. So, for example,
GzipCodec
encapsulates the
compression and decompression algorithm for gzip. Table 4-2 lists the codecs that are available for
Hadoop.
Compression format | Hadoop CompressionCodec |
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
The LZO libraries are GPL-licensed and may not be included in
Apache distributions, so for this reason the Hadoop codecs must be
downloaded separately from http://code.google.com/p/hadoop-gpl-compression/. The
LzopCodec
is compatible with the
lzop
tool, which is essentially the LZO format with
extra headers, and is the one you normally want. There is also a
LzoCodec
for the pure LZO format,
which uses the .lzo_deflate
filename extension (by analogy with DEFLATE, which is gzip without the
headers).
Compressing and decompressing streams with CompressionCodec
CompressionCodec
has two
methods that allow you to easily compress or decompress data. To
compress data being written to an output stream, use the createOutputStream(OutputStream out)
method to create a CompressionOutputStream
to which you write
your uncompressed data to have it written in compressed form to the
underlying stream. Conversely, to decompress data being read from an
input stream, call createInputStream(InputStream in)
to
obtain a CompressionInputStream
,
which allows you to read uncompressed data from the underlying
stream.
CompressionOutputStream
and
CompressionInputStream
are
similar to java.util.zip.DeflaterOutputStream
and
java.util.zip.DeflaterInputStream
, except
that both of the former provide the ability to reset their
underlying compressor or decompressor, which is important for
applications that compress sections of the data stream as separate
blocks, such as SequenceFile
,
described in SequenceFile.
Example 4-1 illustrates how to use the API to compress data read from standard input and write it to standard output.
public class StreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class<?> codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } }
The application expects the fully qualified name of the
CompressionCodec
implementation
as the first command-line argument. We use ReflectionUtils
to construct a new
instance of the codec, then obtain a compression wrapper around
System.out
. Then we call the
utility method copyBytes()
on
IOUtils
to copy the input to the
output, which is compressed by the CompressionOutputStream
. Finally, we call
finish()
on CompressionOutputStream
, which tells
the compressor to finish writing to the compressed stream, but
doesn’t close the stream. We can try it out with the following
command line, which compresses the string “Text” using the StreamCompressor
program with the GzipCodec
, then decompresses it from
standard input using gunzip:
%echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec \
| gunzip -
Text
Inferring CompressionCodecs using CompressionCodecFactory
If you are reading a compressed file, you can normally infer
the codec to use by looking at its filename extension. A file ending
in .gz can be read with GzipCodec
, and so on. The extension for
each compression format is listed in Table 4-1.
CompressionCodecFactory
provides a way of mapping a filename extension to a CompressionCodec
using its getCodec()
method,
which takes a Path
object for the
file in question. Example 4-2 shows an
application that uses this feature to decompress files.
public class FileDecompressor { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) { System.err.println("No codec found for " + uri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } }
Once the codec has been found, it is used to strip off the
file suffix to form the output filename (via the removeSuffix()
static method of CompressionCodecFactory
). In this way, a
file named file.gz is
decompressed to file by
invoking the program as follows:
% hadoop FileDecompressor file.gz
CompressionCodecFactory
finds codecs from a list defined by the io.compression.codecs
configuration
property. By default, this lists all the codecs provided by Hadoop
(see Table 4-3), so you would
need to alter it only if you have a custom codec that you wish to
register (such as the externally hosted LZO codecs). Each codec
knows its default filename extension, thus permitting CompressionCodecFactory
to search through
the registered codecs to find a match for a given extension (if
any).
Property name | Type | Default value | Description |
io.compression.codecs | comma-separated
Class names | org.apache.hadoop.io. compress. DefaultCodec,
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.Bzip2Codec | A list of the CompressionCodec classes for
compression/decompression. |
Native libraries
For performance, it is preferable to use a native library for compression and decompression. For example, in one test, using the native gzip libraries reduced decompression times by up to 50% and compression times by around 10% (compared to the built-in Java implementation). Table 4-4 shows the availability of Java and native implementations for each compression format. Not all formats have native implementations (bzip2, for example), whereas others are only available as a native implementation (LZO, for example).
Compression format | Java implementation | Native implementation |
DEFLATE | Yes | Yes |
gzip | Yes | Yes |
bzip2 | Yes | No |
LZO | No | Yes |
Hadoop comes with prebuilt native compression libraries for 32- and 64-bit Linux, which you can find in the lib/native directory. For other platforms, you will need to compile the libraries yourself, following the instructions on the Hadoop wiki at http://wiki.apache.org/hadoop/NativeHadoop.
The native libraries are picked up using the Java system
property java.library.path
. The
hadoop script in the bin directory sets this property for you,
but if you don’t use this script, you will need to set the property
in your application.
By default Hadoop looks for native libraries for the platform
it is running on, and loads them automatically if they are found.
This means you don’t have to change any configuration settings to
use the native libraries. In some circumstances, however, you may
wish to disable use of native libraries, such as when you are
debugging a compression-related problem. You can achieve this by
setting the property hadoop.native.lib
to false
, which ensures that the built-in
Java equivalents will be used (if they are available).
CodecPool
If you are using a native library and you are doing a lot of
compression or decompression in your application, consider using
CodecPool
, which allows you to
reuse compressors and decompressors, thereby amortizing the cost
of creating these objects.
The code in Example 4-3 shows
the API, although in this program, which only creates a single
Compressor
, there is really no
need to use a pool.
public class PooledStreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class<?> codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); Compressor compressor = null; try { compressor = CodecPool.getCompressor(codec); CompressionOutputStream out = codec.createOutputStream(System.out, compressor); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } finally { CodecPool.returnCompressor(compressor); } } }
We retrieve a Compressor
instance from the pool for a given CompressionCodec
, which we use in the
codec’s overloaded createOutputStream()
method. By using a finally
block, we ensure that the compressor is returned to the pool even
if there is an IOException
while copying the bytes
between the streams.
Compression and Input Splits
When considering how to compress data that will be processed by MapReduce, it is important to understand whether the compression format supports splitting. Consider an uncompressed file stored in HDFS whose size is 1 GB. With a HDFS block size of 64 MB, the file will be stored as 16 blocks, and a MapReduce job using this file as input will create 16 input splits, each processed independently as input to a separate map task.
Imagine now the file is a gzip-compressed file whose compressed size is 1 GB. As before, HDFS will store the file as 16 blocks. However, creating a split for each block won’t work since it is impossible to start reading at an arbitrary point in the gzip stream, and therefore impossible for a map task to read its split independently of the others. The gzip format uses DEFLATE to store the compressed data, and DEFLATE stores data as a series of compressed blocks. The problem is that the start of each block is not distinguished in any way that would allow a reader positioned at an arbitrary point in the stream to advance to the beginning of the next block, thereby synchronizing itself with the stream. For this reason, gzip does not support splitting.
In this case, MapReduce will do the right thing, and not try to split the gzipped file, since it knows that the input is gzip-compressed (by looking at the filename extension) and that gzip does not support splitting. This will work, but at the expense of locality: a single map will process the 16 HDFS blocks, most of which will not be local to the map. Also, with fewer maps, the job is less granular, and so may take longer to run.
If the file in our hypothetical example were an LZO file, we would have the same problem since the underlying compression format does not provide a way for a reader to synchronize itself with the stream. A bzip2 file, however, does provide a synchronization marker between blocks (a 48-bit approximation of pi), so it does support splitting. (Table 4-1 lists whether each compression format supports splitting.)
For collections of files, the issues are slightly different. ZIP is an archive format, so it can combine multiple files into a single ZIP archive. Each file is compressed separately, and the locations of all the files in the archive are stored in a central directory at the end of the ZIP file. This property means that ZIP files support splitting at file boundaries, with each split containing one or more files from the ZIP archive. At the time of this writing, however, Hadoop does not support ZIP files as an input format.
Using Compression in MapReduce
As described in Inferring CompressionCodecs using CompressionCodecFactory, if your input files are compressed, they will be automatically decompressed as they are read by MapReduce, using the filename extension to determine the codec to use.
To compress the output of a MapReduce job, in the job
configuration, set the mapred.output.compress
property to true
, and the mapred.output.compression.codec
property to
the classname of the compression codec you want to use, as shown in
Example 4-4.
public class MaxTemperatureWithCompression { public static void main(String[] args) throws IOException { if (args.length != 2) { System.err.println("Usage: MaxTemperatureWithCompression <input path> " + "<output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperatureWithCompression.class); conf.setJobName("Max temperature with output compression"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setBoolean("mapred.output.compress", true); conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); conf.setMapperClass(MaxTemperatureMapper.class); conf.setCombinerClass(MaxTemperatureReducer.class); conf.setReducerClass(MaxTemperatureReducer.class); JobClient.runJob(conf); } }
We run the program over compressed input (which doesn’t have to use the same compression format as the output, although it does in this example) as follows:
% hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output
Each part of the final output is compressed; in this case, there is a single part:
% gunzip -c output/part-00000.gz
1949 111
1950 22
If you are emitting sequence files for your output, then you can
set the mapred.output.compression.type
property to
control the type of compression to use. The default is RECORD
, which compresses individual records.
Changing this to BLOCK
, which
compresses groups of records, is
recommended since it compresses better (see The SequenceFile Format).
Compressing map output
Even if your MapReduce application reads and writes uncompressed data, it may benefit from compressing the intermediate output of the map phase. Since the map output is written to disk and transferred across the network to the reducer nodes, by using a fast compressor such as LZO, you can get performance gains simply because the volume of data to transfer is reduced. The configuration properties to enable compression for map outputs and to set the compression format are shown in Table 4-5.
Property name | Type | Default value | Description |
mapred.compress.map. output | boolean | false | Compress map outputs. |
mapred.map.output.compression.codec | Class | org.apache.hadoop.io.compress.DefaultCodec | The compression codec to use for map outputs. |
Here are the lines to add to enable gzip map output compression in your job:
conf.setCompressMapOutput(true); conf.setMapOutputCompressorClass(GzipCodec.class);
Serialization
Serialization is the process of turning structured objects into a byte stream for transmission over a network or for writing to persistent storage. Deserialization is the process of turning a byte stream back into a series of structured objects.
Serialization appears in two quite distinct areas of distributed data processing: for interprocess communication and for persistent storage.
In Hadoop, interprocess communication between nodes in the system is implemented using remote procedure calls (RPCs). The RPC protocol uses serialization to render the message into a binary stream to be sent to the remote node, which then deserializes the binary stream into the original message. In general, it is desirable that an RPC serialization format is:
- Compact
A compact format makes the best use of network bandwidth, which is the most scarce resource in a data center.
- Fast
Interprocess communication forms the backbone for a distributed system, so it is essential that there is as little performance overhead as possible for the serialization and deserialization process.
- Extensible
Protocols change over time to meet new requirements, so it should be straightforward to evolve the protocol in a controlled manner for clients and servers. For example, it should be possible to add a new argument to a method call, and have the new servers accept messages in the old format (without the new argument) from old clients.
- Interoperable
For some systems, it is desirable to be able to support clients that are written in different languages to the server, so the format needs to be designed to make this possible.
On the face of it, the data format chosen for persistent storage would have different requirements from a serialization framework. After all, the lifespan of an RPC is less than a second, whereas persistent data may be read years after it was written. As it turns out, the four desirable properties of an RPC’s serialization format are also crucial for a persistent storage format. We want the storage format to be compact (to make efficient use of storage space), fast (so the overhead in reading or writing terabytes of data is minimal), extensible (so we can transparently read data written in an older format), and interoperable (so we can read or write persistent data using different languages).
Hadoop uses its own serialization format, Writables, which is certainly compact and fast (but not so easy to extend, or use from languages other than Java). Since Writables are central to Hadoop (MapReduce programs use them for their key and value types), we look at them in some depth in the next section, before briefly turning to other well-known serialization frameworks, like Apache Thrift and Google Protocol Buffers.
The Writable Interface
The Writable interface defines two methods: one for writing its
state to a DataOutput
binary
stream, and one for reading its state from a DataInput
binary stream:
package org.apache.hadoop.io; import java.io.DataOutput; import java.io.DataInput; import java.io.IOException; public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
Let’s look at a particular Writable
to see what we can do with it. We
will use IntWritable
, a wrapper for a Java
int
. We can create one and set its
value using the set()
method:
IntWritable writable = new IntWritable(); writable.set(163);
Equivalently, we can use the constructor that takes the integer value:
IntWritable writable = new IntWritable(163);
To examine the serialized form of the IntWritable
, we write a small helper method
that wraps a java.io.ByteArrayOutputStream
in a java.io.DataOutputStream
(an implementation
of java.io.DataOutput
) to capture
the bytes in the serialized stream:
public static byte[] serialize(Writable writable) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); return out.toByteArray(); }
An integer is written using four bytes (as we see using JUnit 4 assertions):
byte[] bytes = serialize(writable); assertThat(bytes.length, is(4));
The bytes are written in big-endian order (so the most
significant byte is written to the stream first, this is dictated by
the java.io.DataOutput
interface),
and we can see their hexadecimal representation by using a method on
Hadoop’s StringUtils
:
assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));
Let’s try deserialization. Again, we create a helper method to
read a Writable
object from a byte
array:
public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }
We construct a new, value-less, IntWritable
, then call deserialize()
to read from the output data
that we just wrote. Then we check that its value, retrieved using the
get()
method, is the original
value, 163:
IntWritable newWritable = new IntWritable(); deserialize(newWritable, bytes); assertThat(newWritable.get(), is(163));
WritableComparable and comparators
IntWritable
implements the
WritableComparable
interface,
which is just a subinterface of the Writable
and java.lang.Comparable
interfaces:
package org.apache.hadoop.io; public interface WritableComparable<T> extends Writable, Comparable<T> { }
Comparison of types is crucial for MapReduce, where there is a
sorting phase during which keys are compared with one another. One
optimization that Hadoop provides is the RawComparator
extension of Java’s Comparator
:
package org.apache.hadoop.io; import java.util.Comparator; public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
This interface permits implementors to compare records read
from a stream without deserializing them into objects, thereby
avoiding any overhead of object creation. For example, the
comparator for IntWritable
s
implements the raw compare()
method by reading an integer from each of the byte arrays b1
and b2
and comparing them directly, from the
given start positions (s1
and
s2
) and lengths (l1
and l2
).
WritableComparator
is a
general-purpose implementation of RawComparator
for WritableComparable
classes. It
provides two main functions. First, it provides a default
implementation of the raw compare()
method that deserializes the
objects to be compared from the stream and invokes the object
compare()
method. Second, it acts
as a factory for RawComparator
instances (that Writable
implementations have registered). For example, to obtain a
comparator for IntWritable
, we
just use:
RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);
The comparator can be used to compare two IntWritable
objects:
IntWritable w1 = new IntWritable(163); IntWritable w2 = new IntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0));
or their serialized representations:
byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));
Writable Classes
Hadoop comes with a large selection of Writable
classes in the org.apache.hadoop.io
package. They form the
class hierarchy shown in Figure 4-1.
Writable wrappers for Java primitives
There are Writable
wrappers
for all the Java primitive types (see Table 4-6) except short
and char
(both of which can be stored in an
IntWritable
). All have a get()
and a set()
method for retrieving and storing
the wrapped value.
Java primitive | Writable implementation | Serialized size (bytes) |
boolean | BooleanWritable | 1 |
byte | ByteWritable | 1 |
int | IntWritable | 4 |
VIntWritable | 1–5 | |
float | FloatWritable | 4 |
long | LongWritable | 8 |
VLongWritable | 1–9 | |
double | DoubleWritable | 8 |
When it comes to encoding integers there is a choice between
the fixed-length formats (IntWritable
and LongWritable
) and the variable-length
formats (VIntWritable
and
VLongWritable
). The
variable-length formats use only a single byte to encode the value
if it is small enough (between –112 and 127, inclusive); otherwise,
they use the first byte to indicate whether the value is positive or
negative, and how many bytes follow. For example, 163 requires two
bytes:
byte[] data = serialize(new VIntWritable(163)); assertThat(StringUtils.byteToHexString(data), is("8fa3"));
How do you choose between a fixed-length and a variable-length
encoding? Fixed-length encodings are good when the distribution of
values is fairly uniform across the whole value space, such as a
(well-designed) hash function. Most numeric variables tend to have
nonuniform distributions, and on average the variable-length
encoding will save space. Another advantage of variable-length
encodings is that you can switch from VIntWritable
to VLongWritable
, since their encodings are
actually the same. So by choosing a variable-length representation,
you have room to grow without committing to an 8-byte long
representation from the
beginning.
Text
Text
is a Writable
for UTF-8 sequences. It can be
thought of as the Writable
equivalent of java.lang.String
.
Text
is a replacement for the
UTF8
class, which was deprecated
because it didn’t support strings whose encoding was over 32,767
bytes, and because it used Java’s modified UTF-8.
The Text
class uses an
int
(with a variable-length
encoding) to store the number of bytes in the string encoding, so
the maximum value is 2 GB. Furthermore, Text
uses standard UTF-8, which makes it
potentially easier to interoperate with other tools that understand
UTF-8.
Indexing
Because of its emphasis on using standard UTF-8, there are
some differences between Text
and the Java String
class.
Indexing for the Text
class is
in terms of position in the encoded byte sequence, not the Unicode
character in the string, or the Java char
code unit (as it is for String
). For ASCII strings, these three
concepts of index position coincide. Here is an example to
demonstrate the use of the charAt()
method:
Text t = new Text("hadoop"); assertThat(t.getLength(), is(6)); assertThat(t.getBytes().length, is(6)); assertThat(t.charAt(2), is((int) 'd')); assertThat("Out of bounds", t.charAt(100), is(-1));
Notice that charAt()
returns an int
representing a
Unicode code point, unlike the String
variant that returns a char
. Text
also has a find()
method, which is analogous to
String
’s indexOf()
:
Text t = new Text("hadoop"); assertThat("Find a substring", t.find("do"), is(2)); assertThat("Finds first 'o'", t.find("o"), is(3)); assertThat("Finds 'o' from position 4 or later", t.find("o", 4), is(4)); assertThat("No match", t.find("pig"), is(-1));
Unicode
When we start using characters that are encoded with more
than a single byte, the differences between Text
and String
become clear. Consider the
Unicode characters shown in Table 4-7.[32]
Unicode code point | U+0041 | U+00DF | U+6771 | U+10400 |
Name | LATIN CAPITAL LETTER A | LATIN SMALL LETTER SHARP S | N/A (a unified Han ideograph) | DESERET CAPITAL LETTER LONG I |
UTF-8 code units | 41 | c3 9f | e6 9d b1 | f0 90 90 80 |
Java representation | \u0041 | \u00DF | \u6771 | \uuD801\uDC00 |
All but the last character in the table, U+10400, can be
expressed using a single Java char
. U+10400 is a supplementary
character and is represented by two Java char
s, known as a surrogate pair. The
tests in Example 4-5 show the
differences between String
and
Text
when processing a string
of the four characters from Table 4-7.
public class StringTextComparisonTest { @Test public void string() throws UnsupportedEncodingException { String s = "\u0041\u00DF\u6771\uD801\uDC00"; assertThat(s.length(), is(5)); assertThat(s.getBytes("UTF-8").length, is(10)); assertThat(s.indexOf("\u0041"), is(0)); assertThat(s.indexOf("\u00DF"), is(1)); assertThat(s.indexOf("\u6771"), is(2)); assertThat(s.indexOf("\uD801\uDC00"), is(3)); assertThat(s.charAt(0), is('\u0041')); assertThat(s.charAt(1), is('\u00DF')); assertThat(s.charAt(2), is('\u6771')); assertThat(s.charAt(3), is('\uD801')); assertThat(s.charAt(4), is('\uDC00')); assertThat(s.codePointAt(0), is(0x0041)); assertThat(s.codePointAt(1), is(0x00DF)); assertThat(s.codePointAt(2), is(0x6771)); assertThat(s.codePointAt(3), is(0x10400)); } @Test public void text() { Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00"); assertThat(t.getLength(), is(10)); assertThat(t.find("\u0041"), is(0)); assertThat(t.find("\u00DF"), is(1)); assertThat(t.find("\u6771"), is(3)); assertThat(t.find("\uD801\uDC00"), is(6)); assertThat(t.charAt(0), is(0x0041)); assertThat(t.charAt(1), is(0x00DF)); assertThat(t.charAt(3), is(0x6771)); assertThat(t.charAt(6), is(0x10400)); } }
The test confirms that the length of a String
is the number of char
code units it contains (5, one from
each of the first three characters in the string, and a surrogate
pair from the last), whereas the length of a Text
object is the number of bytes in
its UTF-8 encoding (10 = 1+2+3+4). Similarly, the indexOf()
method in String
returns an index in char
code units, and find()
for Text
is a byte offset.
The charAt()
method in
String
returns the char
code unit for the given index,
which in the case of a surrogate pair will not represent a whole
Unicode character. The codePointAt()
method, indexed by
char
code unit, is needed to
retrieve a single Unicode character represented as an int
. In fact, the charAt()
method in Text
is more like the codePointAt()
method than its namesake
in String
. The only difference
is that it is indexed by byte offset.
Iteration
Iterating over the Unicode characters in Text
is complicated by the use of byte
offsets for indexing, since you can’t just increment the index.
The idiom for iteration is a little obscure (see Example 4-6): turn the Text
object into a java.nio.ByteBuffer
, then repeatedly
call the bytesToCodePoint()
static method on Text
with the
buffer. This method extracts the next code point as an int
and updates the position in the
buffer. The end of the string is detected when bytesToCodePoint()
returns –1.
public class TextIterator { public static void main(String[] args) { Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00"); ByteBuffer buf = ByteBuffer.wrap(t.getBytes(), 0, t.getLength()); int cp; while (buf.hasRemaining() && (cp = Text.bytesToCodePoint(buf)) != -1) { System.out.println(Integer.toHexString(cp)); } } }
Running the program prints the code points for the four characters in the string:
% hadoop TextIterator
41
df
6771
10400
Mutability
Another difference with String
is that Text
is mutable (like all Writable
implementations in Hadoop, except NullWritable
, which is a singleton). You
can reuse a Text
instance by
calling one of the set()
methods on it. For example:
Text t = new Text("hadoop"); t.set("pig"); assertThat(t.getLength(), is(3)); assertThat(t.getBytes().length, is(3));
Warning
In some situations, the byte array returned by the
getBytes()
method may be
longer than the length returned by getLength()
:
Text t = new Text("hadoop"); t.set(new Text("pig")); assertThat(t.getLength(), is(3)); assertThat("Byte length not shortened", t.getBytes().length, is(6));
This shows why it is imperative that you always call
getLength()
when calling
getBytes()
, so you know how
much of the byte array is valid data.
BytesWritable
BytesWritable
is a wrapper
for an array of binary data. Its serialized format is an integer
field (4 bytes) that specifies the number of bytes to follow,
followed by the bytes themselves. For example, the byte array of
length two with values 3 and 5 is serialized as a 4-byte integer
(00000002
) followed by the two
bytes from the array (03
and
05
):
BytesWritable b = new BytesWritable(new byte[] { 3, 5 }); byte[] bytes = serialize(b); assertThat(StringUtils.byteToHexString(bytes), is("000000020305"));
BytesWritable
is mutable,
and its value may be changed by calling its set()
method. As with Text
, the size of the byte array returned
from the getBytes()
method for
BytesWritable
—the capacity—may
not reflect the actual size of the data stored in the BytesWritable
. You can determine the size
of the BytesWritable
by calling
getLength()
. To
demonstrate:
b.setCapacity(11); assertThat(b.getLength(), is(2)); assertThat(b.getBytes().length, is(11));
NullWritable
NullWritable
is a special
type of Writable
, as it has a
zero-length serialization. No bytes are written to, or read from,
the stream. It is used as a placeholder; for example, in MapReduce,
a key or a value can be declared as a NullWritable
when you don’t need to use
that position—it effectively stores a constant empty value. NullWritable
can also be useful as a key
in SequenceFile
when you want to
store a list of values, as opposed to key-value pairs. It is an
immutable singleton: the instance can be retrieved by calling
NullWritable.get()
.
ObjectWritable and GenericWritable
ObjectWritable
is a
general-purpose wrapper for the following: Java primitives, String
, enum
, Writable
, null
, or arrays of any of these types. It
is used in Hadoop RPC to marshal and unmarshal method arguments and
return types.
ObjectWritable
is useful
when a field can be of more than one type: for example, if the
values in a SequenceFile
have
multiple types, then you can declare the value type as an ObjectWritable
and wrap each type in an
ObjectWritable
. Being a
general-purpose mechanism, it’s fairly wasteful of space since it
writes the classname of the wrapped type every time it is
serialized. In cases where the number of types is small and known
ahead of time, this can be improved by having a static array of
types, and using the index into the array as the serialized
reference to the type. This is the approach that GenericWritable
takes, and you have to
subclass it to specify the types to support.
Writable collections
There are four Writable
collection types in the org.apache.hadoop.io
package: ArrayWritable
, TwoDArrayWritable
, MapWritable
, and SortedMapWritable
.
ArrayWritable
and TwoDArrayWritable
are Writable
implementations for arrays and
two-dimensional arrays (array of arrays) of Writable
instances. All the elements of an
ArrayWritable
or a TwoDArrayWritable
must be instances of the
same class, which is specified at construction, as follows:
ArrayWritable writable = new ArrayWritable(Text.class);
In contexts where the Writable
is defined by type, such as in
SequenceFile
keys or values, or
as input to MapReduce in general, you need to subclass ArrayWritable
(or TwoDArrayWritable
, as appropriate) to set
the type statically. For example:
public class TextArrayWritable extends ArrayWritable { public TextArrayWritable() { super(Text.class); } }
ArrayWritable
and TwoDArrayWritable
both have get()
and set()
methods, as well as a toArray()
method, which creates a shallow
copy of the array (or 2D array).
MapWritable
and SortedMapWritable
are implementations of
java.util.Map<Writable,
Writable>
and java.util.SortedMap<WritableComparable,
Writable>
, respectively. The type of each key and value
field is a part of the serialization format for that field. The type
is stored as a single byte that acts as an index into an array of
types. The array is populated with the standard types in the
org.apache.hadoop.io
package, but
custom Writable
types are
accommodated, too, by writing a header that encodes the type array
for nonstandard types. As they are implemented, MapWritable
and SortedMapWritable
use positive byte
values for custom types, so a maximum
of 127 distinct nonstandard Writable
classes can be used in any
particular MapWritable
or
SortedMapWritable
instance.
Here’s a demonstration of using a MapWritable
with different types for keys
and values:
MapWritable src = new MapWritable(); src.put(new IntWritable(1), new Text("cat")); src.put(new VIntWritable(2), new LongWritable(163)); MapWritable dest = new MapWritable(); WritableUtils.cloneInto(dest, src); assertThat((Text) dest.get(new IntWritable(1)), is(new Text("cat"))); assertThat((LongWritable) dest.get(new VIntWritable(2)), is(new LongWritable(163)));
Conspicuous by their absence are Writable
collection implementations for
sets and lists. A set can be emulated by using a MapWritable
(or a SortedMapWritable
for a sorted set), with
NullWritable
values. For lists of
a single type of Writable
,
ArrayWritable
is adequate, but to
store different types of Writable
in a single list, you can use GenericWritable
to wrap the elements
in an ArrayWritable
.
Alternatively, you could write a general ListWritable
using the ideas from MapWritable
.
Implementing a Custom Writable
Hadoop comes with a useful set of Writable
implementations that serve most
purposes; however, on occasion, you may need to write your own custom
implementation. With a custom Writable
, you have full control over the
binary representation and the sort order. Because Writable
s are at the heart of the MapReduce
data path, tuning the binary representation can have a significant
effect on performance. The stock Writable
implementations that come with
Hadoop are well-tuned, but for more elaborate structures, it is often
better to create a new Writable
type, rather than compose the stock types.
To demonstrate how to create a custom Writable
, we shall write an implementation
that represents a pair of strings, called TextPair
. The basic implementation is shown
in Example 4-7.
import java.io.*; import org.apache.hadoop.io.*; public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } }
The first part of the implementation is straightforward: there
are two Text
instance variables,
first
and second
, and associated constructors,
getters, and setters. All Writable
implementations must have a
default constructor so that the MapReduce framework can instantiate
them, then populate their fields by calling
readFields()
. Writable instances are mutable
and often reused, so you should take care to avoid allocating objects
in the write()
or
readFields()
methods.
TextPair
’s
write()
method serializes each Text
object in turn to the output stream, by
delegating to the Text
objects
themselves. Similarly, readFields()
deserializes the bytes from the input stream by delegating to each
Text
object. The DataOutput
and DataInput
interfaces have a
rich set of methods for serializing and deserializing Java primitives,
so, in general, you have complete control over the wire format of your
Writable
object.
Just as you would for any value object you write in Java, you
should override the hashCode()
,
equals()
, and
toString()
methods from java.lang.Object
. The
hashCode()
method is used by the HashPartitioner
(the default partitioner in
MapReduce) to choose a reduce partition, so you should make sure that
you write a good hash function that mixes well to ensure reduce
partitions are of a similar size.
Warning
If you ever plan to use your custom Writable
with TextOutputFormat
, then you must implement
its toString()
method. TextOutputFormat
calls
toString()
on keys and values for their
output representation. For TextPair
, we write the underlying Text
objects as strings separated by a tab
character.
TextPair
is an implementation
of WritableComparable
, so it
provides an implementation of the compareTo()
method that imposes the ordering you would expect: it sorts by the
first string followed by the second. Notice that TextPair
differs from TextArrayWritable
from the previous section
(apart from the number of Text
objects it can store), since TextArrayWritable
is only a Writable
, not a WritableComparable
.
Implementing a RawComparator for speed
The code for TextPair
in
Example 4-7 will work as it stands; however, there
is a further optimization we can make. As explained in WritableComparable and comparators, when TextPair
is being used as a key in
MapReduce, it will have to be deserialized into an object for the
compareTo()
method to be invoked. What if
it were possible to compare two TextPair
objects just by looking at their
serialized representations?
It turns out that we can do this, since TextPair
is the concatenation of two
Text
objects, and the binary
representation of a Text
object
is a variable-length integer containing the number of bytes in the
UTF-8 representation of the string, followed by the UTF-8 bytes
themselves. The trick is to read the initial length, so we know how
long the first Text
object’s byte
representation is; then we can delegate to Text
’s RawComparator
, and invoke it with the
appropriate offsets for the first or second string. Example 4-8 gives the details (note that this
code is nested in the TextPair
class).
public static class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) { return cmp; } return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } } static { WritableComparator.define(TextPair.class, new Comparator()); }
We actually subclass WritableComparator
rather than implement
RawComparator
directly, since it
provides some convenience methods and default implementations. The
subtle part of this code is calculating firstL1
and firstL2
, the lengths of the first Text
field in each byte stream. Each is
made up of the length of the variable-length integer (returned by
decodeVIntSize()
on WritableUtils
), and the value it is
encoding (returned by readVInt()
).
The static block registers the raw comparator so that whenever
MapReduce sees the TextPair
class, it knows to use the raw comparator as its default
comparator.
Custom comparators
As we can see with TextPair
, writing raw comparators takes
some care, since you have to deal with details at the byte level. It
is worth looking at some of the implementations of Writable
in the org.apache.hadoop.io
package for further
ideas, if you need to write your own. The utility methods on
WritableUtils
are very handy
too.
Custom comparators should also be written to be RawComparator
s, if possible. These are
comparators that implement a different sort order to the natural
sort order defined by the default comparator. Example 4-9 shows a comparator for TextPair
, called FirstComparator
, that considers only the
first string of the pair. Note that we override the
compare()
method that takes objects so both
compare()
methods have the same semantics.
We will make use of this comparator in Chapter 8, when we look at joins and secondary sorting in MapReduce (see Joins).
public static class FirstComparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public FirstComparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof TextPair && b instanceof TextPair) { return ((TextPair) a).first.compareTo(((TextPair) b).first); } return super.compare(a, b); } }
Serialization Frameworks
Although most MapReduce programs use Writable
key and value types, this isn’t
mandated by the MapReduce API. In fact, any types can be used; the
only requirement is that there be a mechanism that translates to and
from a binary representation of each type.
To support this, Hadoop has an API for pluggable serialization
frameworks. A serialization framework is represented by an
implementation of Serialization
(in
the org.apache.hadoop.io.serializer
package).
WritableSerialization
, for example,
is the implementation of Serialization
for Writable
types.
A Serialization
defines a
mapping from types to Serializer
instances (for turning an object into a byte stream) and Deserializer
instances (for turning a byte
stream into an object).
Set the io.serializations
property to a comma-separated list of classnames to register Serialization
implementations. Its default
value is org.apache.hadoop.io.serializer.WritableSerialization
,
which means that only Writable
objects can be serialized or
deserialized out of the box.
Hadoop includes a class called JavaSerialization
that uses Java Object
Serialization. Although making it convenient to be able to use
standard Java types in MapReduce programs, like Integer
or String
, Java Object Serialization is not as
efficient as Writables, so it’s not worth making this trade-off (see
the sidebar).
Serialization IDL
There are a number of other serialization frameworks that approach the problem in a different way: rather than defining types through code, you define them in a language-neutral, declarative fashion, using an interface description language (IDL). The system can then generate types for different languages, which is good for interoperability. They also typically define versioning schemes that make type evolution straightforward.
Hadoop’s own Record I/O (found in the org.apache.hadoop.record
package) has an
IDL that is compiled into Writable objects, which makes it
convenient for generating types that are compatible with MapReduce.
For whatever reason, however, Record I/O is not widely used.
Apache Thrift and Google Protocol Buffers are both popular serialization frameworks, and they are commonly used as a format for persistent binary data. There is limited support for these as MapReduce formats;[33] however, Thrift is used in parts of Hadoop to provide cross-language APIs, such as the “thriftfs” contrib module, where it is used to expose an API to Hadoop filesystems (see Thrift).
Finally, Avro is a new (at the time of this writing) Hadoop subproject that defines a serialization format. The goal is to migrate Hadoop’s RPC mechanism to use Avro. Avro will also be suitable as a data format for large files.
File-Based Data Structures
For some applications, you need a specialized data structure to hold your data. For doing MapReduce-based processing, putting each blob of binary data into its own file doesn’t scale, so Hadoop developed a number of higher-level containers for these situations.
SequenceFile
Imagine a logfile, where each log record is a new line of text.
If you want to log binary types, plain text isn’t a suitable format.
Hadoop’s SequenceFile
class fits
the bill in this situation, providing a persistent data structure for
binary key-value pairs. To use it as a logfile format, you would
choose a key, such as timestamp represented by a LongWritable
, and the value is a Writable
that represents the quantity being
logged.
SequenceFile
s also work well
as containers for smaller files. HDFS and MapReduce are optimized for
large files, so packing files into a SequenceFile
makes storing and processing
the smaller files more efficient. (Processing a whole file as a record contains a program to pack files into
a SequenceFile
.[34])
Writing a SequenceFile
To create a SequenceFile
,
use one of its createWriter()
static methods, which returns a SequenceFile.Writer
instance. There are
several overloaded versions, but they all require you to specify a
stream to write to (either a FSDataOutputStream
or a FileSystem
and Path
pairing), a Configuration
object, and the key and
value types. Optional arguments include the compression type and
codec, a Progressable
callback to
be informed of write progress, and a Metadata
instance to be stored in the
SequenceFile
header.
The keys and values stored in a SequenceFile
do not necessarily need to be
Writable
. Any types that can be
serialized and deserialized by a Serialization
may be used.
Once you have a SequenceFile.Writer
, you then write
key-value pairs, using the append()
method. Then when you’ve finished
you call the close()
method
(SequenceFile.Writer
implements
java.io.Closeable
).
Example 4-10 shows a short program
to write some key-value pairs to a SequenceFile
, using the API just
described.
public class SequenceFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i++) { key.set(100 - i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } }
The keys in the sequence file are integers counting down from
100 to 1, represented as IntWritable
objects. The values are
Text
objects. Before each record
is appended to the SequenceFile.Writer
, we call the getLength()
method to discover the current
position in the file. (We will use this information about record
boundaries in the next section when we read the file
nonsequentially.) We write the position out to the console, along
with the key and value pairs. The result of running it is shown
here:
% hadoop SequenceFileWriteDemo numbers.seq
[128] 100 One, two, buckle my shoe
[173] 99 Three, four, shut the door
[220] 98 Five, six, pick up sticks
[264] 97 Seven, eight, lay them straight
[314] 96 Nine, ten, a big fat hen
[359] 95 One, two, buckle my shoe
[404] 94 Three, four, shut the door
[451] 93 Five, six, pick up sticks
[495] 92 Seven, eight, lay them straight
[545] 91 Nine, ten, a big fat hen
...
[1976] 60 One, two, buckle my shoe
[2021] 59 Three, four, shut the door
[2088] 58 Five, six, pick up sticks
[2132] 57 Seven, eight, lay them straight
[2182] 56 Nine, ten, a big fat hen
...
[4557] 5 One, two, buckle my shoe
[4602] 4 Three, four, shut the door
[4649] 3 Five, six, pick up sticks
[4693] 2 Seven, eight, lay them straight
[4743] 1 Nine, ten, a big fat hen
Reading a SequenceFile
Reading sequence files from beginning to end is a matter of
creating an instance of SequenceFile.Reader
, and iterating over
records by repeatedly invoking one of the next()
methods. Which one you use depends
on the serialization framework you are using. If you are using
Writable
types, you can use the
next()
method that takes a key
and a value argument, and reads the next key and value in the stream
into these variables:
public boolean next(Writable key, Writable val)
The return value is true
if
a key-value pair was read and false
if the end of the file has been
reached.
For other, non-Writable
serialization frameworks (such as Apache Thrift), you should use
these two methods:
public Object next(Object key) throws IOException public Object getCurrentValue(Object val) throws IOException
In this case, you need to make sure that the serialization you
want to use has been set in the io.serializations
property; see Serialization Frameworks.
If the next()
method
returns a non-null
object, a
key-value pair was read from the stream and the value can be
retrieved using the getCurrentValue()
method. Otherwise, if
next()
returns null
, the end of the file has been
reached.
The program in Example 4-11
demonstrates how to read a sequence file that has Writable
keys and
values. Note how the types are discovered from the SequenceFile.Reader
via calls to getKeyClass()
and getValueClass()
, then ReflectionUtils
is used to create an
instance for the key and an instance for the value. By using this
technique, the program can be used with any sequence file that has
Writable
keys and values.
public class SequenceFileReadDemo { public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); long position = reader.getPosition(); while (reader.next(key, value)) { String syncSeen = reader.syncSeen() ? "*" : ""; System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value); position = reader.getPosition(); // beginning of next record } } finally { IOUtils.closeStream(reader); } } }
Another feature of the program is that it displays the
position of the sync points in the sequence
file. A sync point is a point in the stream which can be used to
resynchronize with a record boundary if the reader is “lost”—for
example, after seeking to an arbitrary position in the stream. Sync
points are recorded by SequenceFile.Writer
, which inserts a
special entry to mark the sync point every few records as a sequence
file is being written. Such entries are small enough to incur only a
modest storage overhead—less than 1%. Sync points always align with
record boundaries.
Running the program in Example 4-11 shows the sync points in the sequence file as asterisks. The first one occurs at position 2021 (the second one occurs at position 4075, but is not shown in the output):
% hadoop SequenceFileReadDemo numbers.seq
[128] 100 One, two, buckle my shoe
[173] 99 Three, four, shut the door
[220] 98 Five, six, pick up sticks
[264] 97 Seven, eight, lay them straight
[314] 96 Nine, ten, a big fat hen
[359] 95 One, two, buckle my shoe
[404] 94 Three, four, shut the door
[451] 93 Five, six, pick up sticks
[495] 92 Seven, eight, lay them straight
[545] 91 Nine, ten, a big fat hen
[590] 90 One, two, buckle my shoe
...
[1976] 60 One, two, buckle my shoe
[2021*] 59 Three, four, shut the door
[2088] 58 Five, six, pick up sticks
[2132] 57 Seven, eight, lay them straight
[2182] 56 Nine, ten, a big fat hen
...
[4557] 5 One, two, buckle my shoe
[4602] 4 Three, four, shut the door
[4649] 3 Five, six, pick up sticks
[4693] 2 Seven, eight, lay them straight
[4743] 1 Nine, ten, a big fat hen
There are two ways to seek to a given position in a sequence
file. The first is the seek()
method, which positions the reader at the given point in the file.
For example, seeking to a record boundary works as expected:
reader.seek(359); assertThat(reader.next(key, value), is(true)); assertThat(((IntWritable) key).get(), is(95));
But if the position in the file is not at a record boundary,
the reader fails when the next()
method is called:
reader.seek(360); reader.next(key, value); // fails with IOException
The second way to find a record boundary makes use of sync
points. The sync(long position)
method on SequenceFile.Reader
positions the reader at the next sync point after position
. (If there are no sync points in
the file after this position, then the reader will be positioned at
the end of the file.) Thus, we can call sync()
with any position in the stream—a
nonrecord boundary, for example—and the reader will be reestablish
itself at the next sync point so reading can continue:
reader.sync(360); assertThat(reader.getPosition(), is(2021L)); assertThat(reader.next(key, value), is(true)); assertThat(((IntWritable) key).get(), is(59));
Warning
SequenceFile.Writer
has a
method called sync()
for
inserting a sync point at the current position in the stream. This
is not to be confused with the identically named but otherwise
unrelated sync()
method defined
by the Syncable
interface for
synchronizing buffers to the underlying device.
Sync points come into their own when using sequence files as input to MapReduce, since they permit the file to be split, so different portions of it can be processed independently by separate map tasks. See SequenceFileInputFormat.
Displaying a SequenceFile with the command-line interface
The hadoop fs
command has a
-text
option to display sequence
files in textual form. It looks at a file’s magic number so that it
can attempt to detect the type of the file and appropriately convert
it to text. It can recognize gzipped files and sequence files;
otherwise, it assumes the input is plain text.
For sequence files, this command is really useful only if the
keys and values have a meaningful string representation (as defined
by the toString()
method). Also,
if you have your own key or value classes, then you will need to
make sure they are on Hadoop’s classpath.
Running it on the sequence file we created in the previous section gives the following output:
% hadoop fs -text numbers.seq | head
100 One, two, buckle my shoe
99 Three, four, shut the door
98 Five, six, pick up sticks
97 Seven, eight, lay them straight
96 Nine, ten, a big fat hen
95 One, two, buckle my shoe
94 Three, four, shut the door
93 Five, six, pick up sticks
92 Seven, eight, lay them straight
91 Nine, ten, a big fat hen
Sorting and merging SequenceFiles
The most powerful way of sorting (and merging) one or more sequence files is to use MapReduce. MapReduce is inherently parallel and will let you specify the number of reducers to use, which determines the number of output partitions. For example, by specifying one reducer, you get a single output file. We can use the sort example that comes with Hadoop by specifying that the input and output are sequence files, and by setting the key and value types:
%hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \
-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat \
-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
-outKey org.apache.hadoop.io.IntWritable \
-outValue org.apache.hadoop.io.Text \
numbers.seq sorted
%hadoop fs -text sorted/part-00000 | head
1 Nine, ten, a big fat hen 2 Seven, eight, lay them straight 3 Five, six, pick up sticks 4 Three, four, shut the door 5 One, two, buckle my shoe 6 Nine, ten, a big fat hen 7 Seven, eight, lay them straight 8 Five, six, pick up sticks 9 Three, four, shut the door 10 One, two, buckle my shoe
Sorting is covered in more detail in Sorting.
As an alternative to using MapReduce for sort/merge, there is
a SequenceFile.Sorter
class that
has a number of sort()
and
merge()
methods. These functions
predate MapReduce, and are lower-level functions than MapReduce (for
example, to get parallelism, you need to partition your data
manually), so in general MapReduce is the preferred approach to sort
and merge sequence files.
The SequenceFile Format
A sequence file consists of a header followed by one or more
records (see Figure 4-2). The first three
bytes of a sequence file are the bytes SEQ
, which acts a magic number, followed
by a single byte representing the version number. The header
contains other fields including the names of the key and value
classes, compression details, user-defined metadata, and the sync
marker.[35] Recall that the sync marker is used to allow a reader
to synchronize to a record boundary from any position in the file.
Each file has a randomly generated sync marker, whose value is
stored in the header. Sync markers appear between records in the
sequence file. They are designed to incur less than a 1% storage
overhead, so they don’t necessarily appear between every pair of
records (such is the case for short records).
The internal format of the records depends on whether compression is enabled, and if it is, whether it is record compression or block compression.
If no compression is enabled (the default), then each record
is made up of the record length (in bytes), the key length, the key
and then the value. The length fields are written as four-byte
integers adhering to the contract of the
writeInt()
method of java.io.DataOutput
. Keys and values are
serialized using the Serialization
defined for the class being
written to the sequence file.
The format for record compression is almost identical to no compression, except the value bytes are compressed using the codec defined in the header. Note that keys are not compressed.
Block compression compresses multiple records at once; it is
therefore more compact than and should generally be preferred over
record compression because it has the opportunity to take advantage
of similarities between records. (See Figure 4-3.) Records are added to a block
until it reaches a minimum size in bytes, defined by the io.seqfile.compress.blocksize
property:
the default is 1,000,000 bytes. A sync marker is written before the
start of every block. The format of a block is a field indicating
the number of records in the block, followed by four compressed
fields: the key lengths, the keys, the value lengths, and the
values.
MapFile
A MapFile
is a sorted
SequenceFile
with an index to
permit lookups by key. MapFile
can
be thought of as a persistent form of java.util.Map
(although it doesn’t implement
this interface), which is able to grow beyond the size of a Map
that is kept in memory.
Writing a MapFile
Writing a MapFile
is
similar to writing a SequenceFile
: you create an instance of
MapFile.Writer
, then call the
append()
method to add entries in
order. (Attempting to add entries out of order will result in an
IOException
.) Keys must be
instances of WritableComparable
, and values must
be Writable
—contrast this to
SequenceFile
, which can use any
serialization framework for its entries.
The program in Example 4-12 creates a
MapFile
, and writes some entries
to it. It is very similar to the program in Example 4-10 for creating a SequenceFile
.
public class MapFileWriteDemo { private static final String[] DATA = { "One, two, buckle my shoe", "Three, four, shut the door", "Five, six, pick up sticks", "Seven, eight, lay them straight", "Nine, ten, a big fat hen" }; public static void main(String[] args) throws IOException { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); IntWritable key = new IntWritable(); Text value = new Text(); MapFile.Writer writer = null; try { writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass()); for (int i = 0; i < 1024; i++) { key.set(i + 1); value.set(DATA[i % DATA.length]); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } }
Let’s use this program to build a MapFile
:
% hadoop MapFileWriteDemo numbers.map
If we look at the MapFile
,
we see it’s actually a directory containing two files called
data and index:
% ls -l numbers.map
total 104
-rw-r--r-- 1 tom tom 47898 Jul 29 22:06 data
-rw-r--r-- 1 tom tom 251 Jul 29 22:06 index
Both files are SequenceFile
s. The data file contains all of the entries, in
order:
% hadoop fs -text numbers.map/data | head
1 One, two, buckle my shoe
2 Three, four, shut the door
3 Five, six, pick up sticks
4 Seven, eight, lay them straight
5 Nine, ten, a big fat hen
6 One, two, buckle my shoe
7 Three, four, shut the door
8 Five, six, pick up sticks
9 Seven, eight, lay them straight
10 Nine, ten, a big fat hen
The index file contains a fraction of the keys, and contains a mapping from the key to that key’s offset in the data file:
% hadoop fs -text numbers.map/index
1 128
129 6079
257 12054
385 18030
513 24002
641 29976
769 35947
897 41922
As we can see from the output, by default only every 128th key
is included in the index, although you can change this value either
by setting the io.map.index.interval
property or by calling the
setIndexInterval()
method on the MapFile.Writer
instance. A reason to
increase the index interval would be to decrease the amount of
memory that the MapFile
needs to
store the index. Conversely, you might decrease the interval to
improve the time for random (since fewer records need to be skipped
on average) at the expense of memory usage.
Since the index is only a partial index of keys, MapFile
is not able to provide methods to
enumerate, or even count, all the keys it contains. The only way to
perform these operations is to read the whole file.
Reading a MapFile
Iterating through the entries in order in a MapFile
is similar to the procedure for a
SequenceFile
: you create a
MapFile.Reader
, then call the
next()
method until it returns
false
, signifying that no entry
was read because the end of the file was reached:
public boolean next(WritableComparable key, Writable val) throws IOException
A random access lookup can be performed by calling the
get()
method:
public Writable get(WritableComparable key, Writable val) throws IOException
The return value is used to determine if an entry was found in
the MapFile
; if it’s null
, then no value exists for the given
key
. If key
was found, then the value for that key
is read into val
, as well as
being returned from the method call.
It might be helpful to understand how this is implemented.
Here is a snippet of code that retrieves an entry for the MapFile
we created in the previous
section:
Text value = new Text(); reader.get(new IntWritable(496), value); assertThat(value.toString(), is("One, two, buckle my shoe"));
For this operation, the MapFile.Reader
reads the index file into memory (this is cached so
that subsequent random access calls will use the same in-memory
index). The reader then performs a binary search on the in-memory
index to find the key in the index that is less than or equal to the
search key, 496. In this example, the index key found is 385, with
value 18030, which is the offset in the data file. Next the reader seeks to this
offset in the data file and
reads entries until the key is greater than or equal to the search
key, 496. In this case, a match is found and the value is read from
the data file. Overall, a
lookup takes a single disk seek and a scan through up to 128 entries
on disk. For a random-access read, this is actually very
efficient.
The getClosest()
method is
like get()
except it returns the
“closest” match to the specified key, rather than returning null
on no match. More precisely, if the
MapFile
contains the specified
key then that is the entry returned; otherwise, the key in the
MapFile
that is immediately after
(or before, according to a boolean
argument) the specified key is
returned.
A very large MapFile
’s
index can take up a lot of memory. Rather than reindex to change the
index interval, it is possible to load only a fraction of the index
keys into memory when reading the MapFile
by setting the io.map.index.skip
property. This property
is normally 0
, which means no
index keys are skipped; a value of 1
means skip one key for every key in the
index (so every other key ends up in the index), 2
means skip two keys for every key in the
index (so one third of the keys end up in the index), and so on.
Larger skip values save memory but at the expense of lookup time,
since more entries have to be scanned on disk, on average.
Converting a SequenceFile to a MapFile
One way of looking at a MapFile
is as an indexed and sorted
SequenceFile
. So it’s quite
natural to want to be able to convert a SequenceFile
into a MapFile
. We covered how to sort a SequenceFile
in Sorting and merging SequenceFiles, so here we look at how
to create an index for a SequenceFile
. The program in Example 4-13 hinges around the static utility method
fix()
on MapFile
, which recreates the index for a
MapFile
.
public class MapFileFixer { public static void main(String[] args) throws Exception { String mapUri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(mapUri), conf); Path map = new Path(mapUri); Path mapData = new Path(map, MapFile.DATA_FILE_NAME); // Get key and value types from data sequence file SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf); Class keyClass = reader.getKeyClass(); Class valueClass = reader.getValueClass(); reader.close(); // Create the map file index file long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf); System.out.printf("Created MapFile %s with %d entries\n", map, entries); } }
The fix()
method is usually
used for recreating corrupted indexes, but since it creates a new
index from scratch, it’s exactly what we need here. The recipe is as
follows:
Sort the sequence file numbers.seq into a new directory called number.map that will become the
MapFile
. (If the sequence file is already sorted, then you can skip this step. Instead, copy it to a file number.map/data, then go to step 3.)%
hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \
-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat \
-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
-outKey org.apache.hadoop.io.IntWritable \
-outValue org.apache.hadoop.io.Text \
numbers.seq numbers.map
Rename the MapReduce output to be the data file:
%
hadoop fs -mv numbers.map/part-00000 numbers.map/data
Create the index file:
%
hadoop MapFileFixer numbers.map
Created MapFile numbers.map with 100 entries
[30] At the time of this writing, Hadoop’s ZIP integration is incomplete. See https://issues.apache.org/jira/browse/HADOOP-1824.
[31] Jeff Gilchrist’s Archive Comparison Test at http://compression.ca/act/act-summary.html contains benchmarks for compression and decompression speed, and compression ratio for a wide range of tools.
[32] This example is based on one from the article Supplementary Characters in the Java Platform.
[33] You can find the latest status for a Thrift Serialization
at https://issues.apache.org/jira/browse/HADOOP-3787,
and a Protocol Buffers Serialization
at https://issues.apache.org/jira/browse/HADOOP-3788.
[34] In a similar vein, the blog post “A Million Little Files” by
Stuart Sierra includes code for converting a tar file into a SequenceFile
http://stuartsierra.com/2008/04/24/a-million-little-files.
[35] Full details of the format of these fields may be found in
SequenceFile
’s documentation
and source code.
Get Hadoop: The Definitive Guide now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.