We will now consider some of the more complex and most critical parts of Pig: data input and output. Operating on huge data sets is inherently I/O-intensive. Hadoop’s massive parallelism and movement of processing to the data mitigates but does not remove this. Having efficient methods to load and store data is therefore critical. Pig provides default load and store functions for text data and for HBase, but many users find they need to write their own load and store functions to handle the data formats and storage mechanisms they use.
As with evaluation functions, the design goal for
load and store functions was to make easy things easy and hard things
possible. Also, we wanted to make load and store functions a thin wrapper
over Hadoop’s InputFormat
and
OutputFormat
. The intention is that once you have an
input format and output format for your data, the additional work of
creating and storing Pig tuples is minimal. In the same way evaluation
functions were implemented, more complex features such as schema management
and projection push down are done via separate interfaces to avoid
cluttering the base interface. Pig’s load and store functions were completely rewritten
between versions 0.6 and 0.7. This chapter will cover only the interfaces
for 0.7 and later releases.
One other important design goal for load and store
functions is to not assume that the input sources and output sinks are HDFS.
In the examples throughout this book, A = load 'foo';
has
implied that foo
is a file, but there is no need for that to be
the case. foo
is a resource locator that makes sense to your
load function. It could be an HDFS file, an HBase table, a database JDBC
connection string, or a web service URL. Because reading from HDFS is the
most common case, many defaults and helper functions are provided for this
case.
In this chapter we will walk through writing a load
function and a store function for JSON data on HDFS, JsonLoader
and JsonStorage
, respectively. These are
located in the example code in udfs/java/com/acme/io. They use the Jackson JSON library, which is included in your Pig
distribution. However, the Jackson JAR is not shipped to the backend by Pig,
so when using these UDFs in your script, you will need to register the
Jackson JAR in addition to the acme examples JAR:
register 'acme.jar'; register 'src/pig/trunk/build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar';
These UDFs will serve as helpful examples, but they will not cover all of the functionality of load and store functions. For those sections not shown in these examples, we will look at other existing load and store functions.
Pig’s load function is built on top of a Hadoop
InputFormat
, the class that Hadoop uses to read
data. InputFormat
serves two purposes: it
determines how input will be split between map tasks, and it provides a
RecordReader
that produces key-value pairs as input
to those map tasks. The load function takes these key-value pairs and
returns a Pig Tuple
.
The base class for the load function is
LoadFunc
. This is an abstract class, which allows it to
provide helper functions and default implementations. Many load functions
will only need to extend LoadFunc
.
Load functions’ operations are split between Pig’s
frontend and backend. On the frontend, Pig does job planning and
optimization, and load functions participate in this in several ways that
we will discuss later. On the backend, load functions get each record from
the RecordReader
, convert it to a tuple, and pass
it on to Pig’s map task. Load functions also need to be able to pass data
between the frontend and backend invocations so they can maintain
state.
For all load functions, Pig must do three things as part of frontend planning: 1) it needs to know the input format it should use to read the data; 2) it needs to be sure that the load function understands where its data is located; and 3) it needs to know how to cast bytearrays returned from the load function.
Pig needs to know which
InputFormat
to use for reading your input. It
calls getInputFormat
to get an instance of the input
format. It gets an instance rather than the class itself so that your
load function can control the instantiation: any generic parameters,
constructor arguments, etc. For our example load function, this method
is very simple. It uses TextInputFormat
, an
input format that reads text data from HDFS files:
// JsonLoader.java public InputFormat getInputFormat() throws IOException { return new TextInputFormat(); }
Pig communicates the location string provided
by the user to the load function via
setLocation
. So, if the load operator in Pig Latin is A = load
'input';
, “input” is the location string. This
method is called on both the frontend and backend, possibly multiple
times. Thus you need to take care that this method does not do
anything that will cause problems if done more than one time. Your
load function should communicate the location to its input format. For
example, JsonLoader
passes the filename via a
helper method on FileInputFormat
(a superclass
of TextInputFormat
):
// JsonLoader.java public void setLocation(String location, Job job) throws IOException { FileInputFormat.setInputPaths(job, location); }
The Hadoop Job
is
passed along with the location because that is where input formats
usually store their configuration information.
setLocation
is called on both the
frontend and backend because input formats store their location in the
Job
object, as shown in the preceding example. For
MapReduce jobs, which always have only one input, this works. For Pig
jobs, where the same input format might be used to load multiple
different inputs (such as in the join or union case), one instance of the input path will
overwrite another in the Job
object. To work around
this, Pig remembers the location in an input-specific parameter and
calls setLocation
again on the backend so that the input
format can get itself set up properly before reading.
For files on HDFS, the location provided by the user might be relative rather than absolute. To deal with this, Pig needs to resolve these to absolute locations based on the current working directory at the time of the load. Consider the following Pig Latin:
cd /user/joe; input1 = load 'input'; cd /user/fred; input2 = load 'input';
These two load
statements
should load different files. But Pig cannot assume it understands how
to turn a relative path into an absolute path, because it does not
know what that input is. It could be an HDFS path, a database table
name, etc. So it leaves this to the load function. Before calling
setLocation
, Pig passes the location string to
relativeToAbsolutePath
to do any necessary conversion.
Because most loaders are reading from HDFS, the default implementation
in LoadFunc
handles the HDFS case. If your loading will
never need to do this conversion, it should override this method and
return the location string passed to it.
Some Pig functions, such as PigStorage
and HBaseStorage
, load data by
default without understanding its type information, and place the data
unchanged in DataByteArray
objects. At a later
time, when Pig needs to cast that data to another type, it does not
know how to because it does not understand how the data is represented
in the bytearray. Therefore, it relies on the load function to provide
a method to cast from bytearray to the appropriate type.
Pig determines which set of casting functions
to use by calling getLoadCaster
on the load function.
This should return either null
, which indicates that your
load function does not expect to do any bytearray casts, or an
implementation of the LoadCaster
interface,
which will be used to do the casts. We will look at the methods of
LoadCaster
in Casting bytearrays.
Our example loader returns null
because it provides typed data based on the stored schema and,
therefore, does not expect to be casting data. Any bytearrays in its
data are binary data that should not be cast.
As with evaluation functions, load functions can make use
of UDFContext
to pass information from frontend
invocations to backend invocations. For details on
UDFContext
, see UDFContext.
One significant difference between using
UDFContext
in evaluation and load functions is
determining the instance-specific signature of the function. In
evaluation functions, constructor arguments were suggested as a way to
do this. For load functions, the input location usually will be the
differentiating factor. However, LoadFunc
does
not guarantee that it will call setLocation
before other
methods where you might want to use UDFContext
.
To work around this, setUDFContextSignature
is provided. It
provides an instance-unique signature that you can use when calling
getUDFProperties
. This method is guaranteed to be called
before any other methods on LoadFunc
in both the
frontend and backend. Your UDF can then store this signature and use it
when getting its property object:
// JsonLoader.java private String udfcSignature = null; public void setUDFContextSignature(String signature) { udfcSignature = signature; }
setLocation
is the only method in
the load function that is guaranteed to be called on the frontend. It is
therefore the best candidate for storing needed information to
UDFContext
. You might need to check that the data
you are writing is available and nonnull to avoid overwriting your
values when setLocation
is called on the backend.
On the backend, your load function takes the key-value
pairs produced by its input format and produces Pig
Tuple
s.
Before reading any data, Pig gives your load
function a chance to set itself up by calling prepareToRead
. This is called in each map
task and passes a copy of the RecordReader
,
which your load function will need later to read records from the
input. RecordReader
is a class that
InputFormat
uses to read records from an input
split. Pig obtains the record reader it passes to
prepareToRead
by calling getRecordReader
on
the input format that your store function returned from
getInputFormat
. Pig also passes an instance of the
PigSplit
that contains the Hadoop
InputSplit
corresponding to the partition of
input this instance of your load function will read. If you need
split-specific information, you can get it from here.
Our example loader, beyond storing the record
reader, also reads the schema file that was stored into
UDFContext
in the frontend so that it knows how
to parse the input file. Notice how it uses the signature passed in
setUDFContextSignature
to access the appropriate
properties object. Finally, it creates a
JsonFactory
object that is used to generate a
parser for each line:
// JsonLoader.java public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { this.reader = reader; // Get the schema string from the UDFContext object. UDFContext udfc = UDFContext.getUDFContext(); Properties p = udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature}); String strSchema = p.getProperty("pig.jsonloader.schema"); if (strSchema == null) { throw new IOException("Could not find schema in UDF context"); } // Parse the schema from the string stored in the properties object. ResourceSchema schema = new ResourceSchema(Utils.getSchemaFromString(strSchema)); fields = schema.getFields(); jsonFactory = new JsonFactory(); }
Now we have reached the meat of your load
function, reading records from its record reader and returning tuples
to Pig. Pig will call getNext
and place the resulting
tuple into its processing pipeline. It will keep doing this until
getNext
returns a null
, which indicates that
the input for this split has been fully read.
Pig does not copy the tuple that results from this method, but instead feeds it directly to its pipeline to avoid the copy overhead. This means this method cannot reuse objects, and instead must create a new tuple and contents for each record it reads. On the other hand, record readers may choose to reuse their key and value objects from record to record; most standard implementations do. So, before writing a loader that tries to be efficient and wraps the keys and values from the record reader directly into the tuple to avoid a copy, you must make sure you understand how the record reader is managing its data.
For information on creating the appropriate Java objects when constructing tuples for Pig, see Interacting with Pig values.
Our sample load function’s implementation of
getNext
reads the value from the Hadoop record (the key
is ignored), constructs a JsonParser
to parse
it, parses the fields, and returns the resulting tuple. If there are
parse errors, it does not throw an exception. Instead,
it returns a tuple with null fields where the data could not be
parsed. This prevents bad lines from causing the whole job to fail.
Warnings are issued so that users can see which records were
ignored:
// JsonLoader.java public Tuple getNext() throws IOException { Text val = null; try { // Read the next key-value pair from the record reader. If it's // finished, return null. if (!reader.nextKeyValue()) return null; // Get the current value. We don't use the key. val = (Text)reader.getCurrentValue(); } catch (InterruptedException ie) { throw new IOException(ie); } // Create a parser specific for this input line. This might not be the // most efficient approach. ByteArrayInputStream bais = new ByteArrayInputStream(val.getBytes()); JsonParser p = jsonFactory.createJsonParser(bais); // Create the tuple we will be returning. We create it with the right // number of fields, as the Tuple object is optimized for this case. Tuple t = tupleFactory.newTuple(fields.length); // Read the start object marker. Throughout this file if the parsing // isn't what we expect, we return a tuple with null fields rather than // throwing an exception. That way a few mangled lines don't fail the job. if (p.nextToken() != JsonToken.START_OBJECT) { log.warn("Bad record, could not find start of record " + val.toString()); return t; } // Read each field in the record. for (int i = 0; i < fields.length; i++) { t.set(i, readField(p, fields[i], i)); } if (p.nextToken() != JsonToken.END_OBJECT) { log.warn("Bad record, could not find end of record " + val.toString()); return t; } p.close(); return t; } private Object readField(JsonParser p, ResourceFieldSchema field, int fieldnum) throws IOException { // Read the next token. JsonToken tok = p.nextToken(); if (tok == null) { log.warn("Early termination of record, expected " + fields.length + " fields bug found " + fieldnum); return null; } // Check to see if this value was null. if (tok == JsonToken.VALUE_NULL) return null; // Read based on our expected type. switch (field.getType()) { case DataType.INTEGER: // Read the field name. p.nextToken(); return p.getValueAsInt(); case DataType.LONG: p.nextToken(); return p.getValueAsLong(); case DataType.FLOAT: p.nextToken(); return (float)p.getValueAsDouble(); case DataType.DOUBLE: p.nextToken(); return p.getValueAsDouble(); case DataType.BYTEARRAY: p.nextToken(); byte[] b = p.getBinaryValue(); // Use the DBA constructor that copies the bytes so that we own // the memory. return new DataByteArray(b, 0, b.length); case DataType.CHARARRAY: p.nextToken(); return p.getText(); case DataType.MAP: // Should be a start of the map object. if (p.nextToken() != JsonToken.START_OBJECT) { log.warn("Bad map field, could not find start of object, field " + fieldnum); return null; } Map<String, String> m = new HashMap<String, String>(); while (p.nextToken() != JsonToken.END_OBJECT) { String k = p.getCurrentName(); String v = p.getText(); m.put(k, v); } return m; case DataType.TUPLE: if (p.nextToken() != JsonToken.START_OBJECT) { log.warn("Bad tuple field, could not find start of object, " + "field " + fieldnum); return null; } ResourceSchema s = field.getSchema(); ResourceFieldSchema[] fs = s.getFields(); Tuple t = tupleFactory.newTuple(fs.length); for (int j = 0; j < fs.length; j++) { t.set(j, readField(p, fs[j], j)); } if (p.nextToken() != JsonToken.END_OBJECT) { log.warn("Bad tuple field, could not find end of object, " + "field " + fieldnum); return null; } return t; case DataType.BAG: if (p.nextToken() != JsonToken.START_ARRAY) { log.warn("Bad bag field, could not find start of array, " + "field " + fieldnum); return null; } s = field.getSchema(); fs = s.getFields(); // Drill down the next level to the tuple's schema. s = fs[0].getSchema(); fs = s.getFields(); DataBag bag = bagFactory.newDefaultBag(); JsonToken innerTok; while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) { if (innerTok != JsonToken.START_OBJECT) { log.warn("Bad bag tuple field, could not find start of " + "object, field " + fieldnum); return null; } t = tupleFactory.newTuple(fs.length); for (int j = 0; j < fs.length; j++) { t.set(j, readField(p, fs[j], j)); } if (p.nextToken() != JsonToken.END_OBJECT) { log.warn("Bad bag tuple field, could not find end of " + "object, field " + fieldnum); return null; } bag.add(t); } return bag; default: throw new IOException("Unknown type in input schema: " + field.getType()); } }
Your load function can provide more complex features by implementing additional interfaces. (Implementation of these interfaces is optional.)
Many data storage mechanisms can record the schema along with the data. Pig does not assume the ability to store schemas, but if your storage can hold the schema, it can be very useful. This frees script writers from needing to specify the field names and types as part of the load operator in Pig Latin. This is user-friendly and less error-prone, and avoids the need to rewrite scripts when the schema of your data changes.
Some types of data storage also partition the
data. If Pig understands this partitioning, it can load only those
partitions that are needed for a particular script. Both of these
functions are enabled by implementing the
LoadMetadata
interface.
getSchema
in the
LoadMetadata
interface gives your load function
a chance to provide a schema. It is passed the location string the
user provides as well as the Hadoop Job
object, in
case it needs information in this object to open the schema. It is
expected to return a ResourceSchema
, which
represents the data that will be returned.
ResourceSchema
is very similar to the
Schema
class used by evaluation functions. (See
Input and Output Schemas for details.) There is one
important difference, however. In
ResourceFieldSchema
, the schema object
associated with a bag always has one field, which is a tuple. The
schema for the tuples in the bag is described by that tuple’s
ResourceFieldSchema
.
Our example load and store functions keep the
schema in a side file[28] named _schema in
HDFS. Our implementation of getSchema
reads this file and
also serializes the schema into UDFContext
so
that it is available on the backend:
// JsonLoader.java public ResourceSchema getSchema(String location, Job job) throws IOException { // Open the schema file and read the schema. // Get an HDFS handle. FileSystem fs = FileSystem.get(job.getConfiguration()); DataInputStream in = fs.open(new Path(location + "/_schema")); String line = in.readLine(); in.close(); // Parse the schema. ResourceSchema s = new ResourceSchema(Utils.getSchemaFromString(line)); if (s == null) { throw new IOException("Unable to parse schema found in file " + location + "/_schema"); } // Now that we have determined the schema, store it in our // UDFContext properties object so we have it when we need it on the // backend. UDFContext udfc = UDFContext.getUDFContext(); Properties p = udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature}); p.setProperty("pig.jsonloader.schema", line); return s; }
Once your loader implements
getSchema
, load
statements that use your
loader do not need to declare their schemas in order for the field
names to be used in the script. For example, if we had data with a
schema of user:chararray, age:int,
gpa:double
, the following Pig Latin will compile and
run:
register 'acme.jar'; register 'src/pig/trunk/build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar'; A = load 'input' using com.acme.io.JsonLoader(); B = foreach A generate user; dump B;
LoadMetadata
also
includes a getStatistics
method. Pig does not yet make
use of statistics in job planning; this method is for future
use.
Some types of storage partition their data, allowing you
to read only the relevant sections for a given job. The
LoadMetadata
interface also provides methods
for working with partitions in your data. In order for Pig to request
the relevant partitions, it must know how the data is partitioned. Pig
determines this by calling getPartitionKeys
. If this
returns a null
or the LoadMetadata
interface is not implemented by your loader, Pig will assume it needs
to read the entire input.
Pig expects getPartitionKeys
to
return an array of strings, where each string represents one field
name. Those fields are the keys used to partition the data. Pig will
look for a filter
statement immediately following the load
statement
that includes one or more of these fields. If such a statement is
found, it will be passed to setPartitionFilter
. If the
filter
includes both partition and nonpartition keys and
it can be split,[29] Pig will split it and pass just the
partition-key-related expression to setPartitionFilter
.
As an example, consider an HCatalog[30] table web_server_logs that is partitioned by two
fields, date
and colo
:
logs = load 'web_server_logs' using HCatLoader(); cleaned = filter logs by date = '20110614' and NotABot(user_id); ...
Pig will call getPartitionKeys
,
and HCatLoader
will return two key names,
date
and colo
. Pig will find the date
field in the filter
statement and rewrite the filter as shown in the following example,
pushing down the date = '20110614'
predicate to
HCatLoader
via
setPartitionFilter
:
logs = load 'web_server_logs' using HCatLoader(); cleaned = filter logs by NotABot(user_id); ...
It is now up to HCatalog loader to assure that
it only returns data from web_server_logs where date
is
20110614
.
The one exception to this is fields used in eval funcs or filter funcs. Pig assumes that loaders do not understand how to invoke UDFs, so Pig will not push these expressions.
Our example loader works on file data, so it
does not implement getPartitionKeys
or
setPartitionFilter
. For an example implementation of
these methods, see the HCatalog
code at http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?view=markup.
If you need to control how binary data that your loader
loads is cast to other data types, you can implement the
LoadCaster
interface. Because this interface
contains a lot of methods, implementers often implement it as a
separate class. This also allows load functions to share implementations
of LoadCaster
, since Java does not support
multiple inheritance.
The interface consists of a series of methods:
bytesToInteger
, bytesToLong
, etc. These will
be called to convert a bytearray to the appropriate type. Starting in 0.9, there are two bytesToMap
methods. You should implement the one that takes a
ResourceFieldSchema
; the other one is for
backward-compatibility. The bytesToBag
,
bytesToTuple
, and bytesToMap
methods take a
ResourceFieldSchema
that describes the field being
converted. Calling getSchema
on this object will return a
schema that describes this bag, tuple, or map, if one exists. If Pig
does not know the intended structure of the object,
getSchema
will return null. Keep in mind that the schema
of the bag will be one field, a tuple, which in turn will have a
schema describing the contents of that tuple.
A default load caster, Utf8StorageConverter
, is
provided. It handles converting UTF8-encoded text to Pig types. Scalar
conversions are done in a straightforward way. Maps are expected to be
surrounded by []
(square brackets), with keys
separated by values with #
(hash) and key-value
pairs separated by ,
(commas). Tuples are
surrounded by ()
(parentheses) and have fields
separated by ,
(commas). Bags are surrounded by
{}
(braces) and have tuples separated by
,
(commas). There is no ability to escape these
special characters.
Often a Pig Latin script will need to read only a few fields in the input. Some types of storage formats store their data by fields instead of by records (for example, Hive’s RCFile). For these types of formats, there is a significant performance gain to be had by loading only those fields that will be used in the script. Even for record-oriented storage formats, it can be useful to skip deserializing fields that will not be used.
As part of its optimizations, Pig analyzes Pig
Latin scripts and determines what fields in an input it needs at each
step in the script. It uses this information to aggressively drop
fields it no longer needs. If the loader implements the
LoadPushDown
interface, Pig can go a step further
and provide this information to the loader.
Once Pig knows the fields it needs, it
assembles them in a RequiredFieldList
and
passes that to pushProjection
. In the load function’s
reply, it indicates whether it can meet the request. It responds with
a RequiredFieldResponse
, which is a fancy
wrapper around a Boolean. If the Boolean is true, Pig will assume that
only the required fields are being returned from getNext
.
If it is false, Pig will assume that all fields are being returned by
getNext
, and it will handle dropping the extra ones
itself.
The RequiredField
class
used to describe which fields are required is slightly complex. Beyond
allowing a user to specify whether a given field is required, it
provides the ability to specify which subfields of that field are
required. For example, for maps, certain keys can be listed as
required. For tuples and bags, certain fields can be listed as
required.
Load functions that implement
LoadPushDown
should not modify the schema
object returned by getSchema
. This should always be the
schema of the full input. Pig will manage the translation between the
schema having all of the fields and the results of
getNext
having only some.
Our example loader does not implement
LoadPushDown
. For an example of a loader that
does, see HCatLoader
at http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?view=markup.
Pig’s store function is, in many ways, a mirror image of the
load function. It is built on top of Hadoop’s OutputFormat
. It takes Pig
Tuple
s and creates key-value pairs that its
associated output format writes to storage.
StoreFunc
is an abstract class, which
allows it to provide default implementations for some methods. However,
some functions implement both load and store functionality;
PigStorage
is one example. Because Java does not support multiple inheritance, the interface
StoreFuncInterface
is provided. These dual
load/store functions can implement this interface rather than extending
StoreFunc
.
Store function operations are split between the
frontend and backend of Pig. Pig does planning and optimization on the
frontend. Store functions have an opportunity at this time to check that a
valid schema is being used and set up the storage location. On the
backend, store functions take a tuple from Pig, convert it to a key-value
pair, and pass it to a Hadoop RecordWriter
. Store
functions can pass information from frontend invocations to backend
invocations via UDFContext
.
Store functions have three tasks to fulfill on the frontend:
Instantiate the
OutputFormat
they will use to store data.Check the schema of the data being stored.
Record the location where the data will be stored.
Pig calls getOutputFormat
to get an instance of the
output format that your store function will use to store records. This
method returns an instance rather than the classname or the class
itself. This allows your store function to control how the class is
instantiated. The example store function JsonStorage
uses
TextOutputFormat
. This is an output format that
stores text data in HDFS. We have to instantiate this with a key of
LongWritable
and a value of
Text
to match the expectations of
TextInputFormat
:
// JsonStorage.java public OutputFormat getOutputFormat() throws IOException { return new TextOutputFormat<LongWritable, Text>(); }
Pig calls setStoreLocation
to communicate the
location string the user provides to your store function. Given the
Pig Latin store Z into 'output';
, “output”
is the location string. This method, called on both the frontend and
the backend, could be called multiple times; consequently, it should
not have any side effects that will cause a problem if this happens.
Your store function will need to communicate the location to its
output format. Our example store function uses the FileOutputFormat
utility function
setOutputPath
to do this:
// JsonStorage.java public void setStoreLocation(String location, Job job) throws IOException { FileOutputFormat.setOutputPath(job, new Path(location)); }
The Hadoop Job
is
passed to this function as well. Most output formats store the
location information in the job.
Pig calls setStoreLocation
on
both the frontend and backend because output formats usually store
their location in the job, as we see in our example store function.
This works for MapReduce jobs, where a single output format is
guaranteed. But due to the split
operator, Pig can have
more than one instance of the same store function in a job. If
multiple instances of a store function call
FileOutputFormat.setOutputPath
, whichever instance calls
it last will overwrite the others. Pig avoids this by keeping
output-specific information and calling setStoreLocation
again on the backend so that it can properly configure the output
format.
For HDFS files, the user might provide a
relative path. Pig needs to resolve these to absolute paths using the
current working directory at the time the store is called. To
accomplish this, Pig calls relToAbsPathForStoreLocation
with the user-provided location string before calling
setStoreLocation
. This method translates between relative
and absolute paths. For store functions writing to HDFS, the default
implementation in StoreFunc
handles the
conversion. If you are writing a store function that does not use file
paths (e.g., HBase), you should override this method to return the
string it is passed.
As part of frontend planning, Pig gives your store
function a chance to check the schema of the data to be stored. If you
are storing data to a system that expects a certain schema for the
output (such as an RDBMS) or you cannot store certain data types, this
is the place to perform those checks. Oddly enough, this method
returns a void rather than a Boolean. So if you detect an issue with
the schema, you must throw an IOException
.
Our example store function does not have
limitations on the schemas it can store. However, it uses this
function as a place to serialize the schema into
UDFContext
so that it can be used on the
backend when writing data:
// JsonStorage.java public void checkSchema(ResourceSchema s) throws IOException { UDFContext udfc = UDFContext.getUDFContext(); Properties p = udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature}); p.setProperty("pig.jsonstorage.schema", s.toString()); }
Store functions work with UDFContext
exactly as load
functions do, but with one exception: the signature for store functions
is passed to the store function via
setStoreFuncUDFContextSignature
. See Passing Information from the Frontend to the Backend for a discussion of how load functions
work with UDFContext
. Our example store function
stores the signature in a member variable for later use:
// JsonStorage.java public void setStoreFuncUDFContextSignature(String signature) { udfcSignature = signature; }
During backend processing, the store function is first initialized, and then takes Pig tuples and converts them to key-value pairs to be written to storage.
Pig calls your store function’s prepareToWrite
method in each map or reduce
task before writing any data. This call passes a RecordWriter
instance to use when
writing data. RecordWriter
is a class that
OutputFormat
uses to write individual records.
Pig will get the record writer it passes to your store function by
calling getRecordWriter
on the output format your store
function returned from getOutputFormat
. Your store
function will need to keep this reference so that it can be used in
putNext
.
The example store function
JsonStorage
also uses this method to read the
schema out of the UDFContext
. It will use this
schema when storing data. Finally, it creates a
JsonFactory
for use in
putNext
:
// JsonStorage.java public void prepareToWrite(RecordWriter writer) throws IOException { // Store the record writer reference so we can use it when it's time // to write tuples. this.writer = writer; // Get the schema string from the UDFContext object. UDFContext udfc = UDFContext.getUDFContext(); Properties p = udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature}); String strSchema = p.getProperty("pig.jsonstorage.schema"); if (strSchema == null) { throw new IOException("Could not find schema in UDF context"); } // Parse the schema from the string stored in the properties object. ResourceSchema schema = new ResourceSchema(Utils.getSchemaFromString(strSchema)); fields = schema.getFields(); // Build a Json factory. jsonFactory = new JsonFactory(); jsonFactory.configure( JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false); }
putNext
is the core method in the store
function class. Pig calls this method for every tuple it needs to
store. Your store function needs to take these tuples and produce the
key-value pairs that its output format expects. For information on the
Java objects in which the data will be stored and how to extract them,
see Interacting with Pig values.
JsonStorage
encodes the
contents of the tuple in JSON format and writes the resulting string
into the value field of TextOutputFormat
. The
key field is left null:
// JsonStorage.java public void putNext(Tuple t) throws IOException { // Build a ByteArrayOutputStream to write the JSON into. ByteArrayOutputStream baos = new ByteArrayOutputStream(BUF_SIZE); // Build the generator. JsonGenerator json = jsonFactory.createJsonGenerator(baos, JsonEncoding.UTF8); // Write the beginning of the top-level tuple object. json.writeStartObject(); for (int i = 0; i < fields.length; i++) { writeField(json, fields[i], t.get(i)); } json.writeEndObject(); json.close(); // Hand a null key and our string to Hadoop. try { writer.write(null, new Text(baos.toByteArray())); } catch (InterruptedException ie) { throw new IOException(ie); } } private void writeField(JsonGenerator json, ResourceFieldSchema field, Object d) throws IOException { // If the field is missing or the value is null, write a null. if (d == null) { json.writeNullField(field.getName()); return; } // Based on the field's type, write it out. switch (field.getType()) { case DataType.INTEGER: json.writeNumberField(field.getName(), (Integer)d); return; case DataType.LONG: json.writeNumberField(field.getName(), (Long)d); return; case DataType.FLOAT: json.writeNumberField(field.getName(), (Float)d); return; case DataType.DOUBLE: json.writeNumberField(field.getName(), (Double)d); return; case DataType.BYTEARRAY: json.writeBinaryField(field.getName(), ((DataByteArray)d).get()); return; case DataType.CHARARRAY: json.writeStringField(field.getName(), (String)d); return; case DataType.MAP: json.writeFieldName(field.getName()); json.writeStartObject(); for (Map.Entry<String, Object> e : ((Map<String, Object>)d).entrySet()) { json.writeStringField(e.getKey(), e.getValue().toString()); } json.writeEndObject(); return; case DataType.TUPLE: json.writeFieldName(field.getName()); json.writeStartObject(); ResourceSchema s = field.getSchema(); if (s == null) { throw new IOException("Schemas must be fully specified to use " + "this storage function. No schema found for field " + field.getName()); } ResourceFieldSchema[] fs = s.getFields(); for (int j = 0; j < fs.length; j++) { writeField(json, fs[j], ((Tuple)d).get(j)); } json.writeEndObject(); return; case DataType.BAG: json.writeFieldName(field.getName()); json.writeStartArray(); s = field.getSchema(); if (s == null) { throw new IOException("Schemas must be fully specified to use " + "this storage function. No schema found for field " + field.getName()); } fs = s.getFields(); if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) { throw new IOException("Found a bag without a tuple " + "inside!"); } // Drill down the next level to the tuple's schema. s = fs[0].getSchema(); if (s == null) { throw new IOException("Schemas must be fully specified to use " + "this storage function. No schema found for field " + field.getName()); } fs = s.getFields(); for (Tuple t : (DataBag)d) { json.writeStartObject(); for (int j = 0; j < fs.length; j++) { writeField(json, fs[j], t.get(j)); } json.writeEndObject(); } json.writeEndArray(); return; } }
When jobs fail after execution has started, your store
function may need to clean up partially stored results. Pig will call
cleanupOnFailure
to give your store function an opportunity
to do this. It passes the location string and the job object so that
your store function knows what it should clean up. In the HDFS case, the
default implementation handles removing any output files created by the
store function. You need to implement this method only if you are
storing data somewhere other than HDFS.
If your storage format can store schemas in addition to
data, your store function can implement the interface
StoreMetadata
. This provides a
storeSchema
method that is called by Pig as part of its
frontend operations. Pig passes storeSchema
a
ResourceSchema
, the location string, and the job
object so that it can connect to its storage. The
ResourceSchema
is very similar to the
Schema
class described in Input and Output Schemas. There is one important difference,
however. In ResourceFieldSchema
, the schema
object associated with a bag always has one field, which is a tuple. The
schema for the tuples in the bag is described by that tuple’s
ResourceFieldSchema
.
The example store function JsonStorage
stores the schema in a side file named _schema in the same directory as the data.
The schema is stored as a string, using the toString
method
provided by the class:
// JsonStorage.java public void storeSchema(ResourceSchema schema, String location, Job job) throws IOException { // Store the schema in a side file in the same directory. MapReduce // does not include files starting with "_" when reading data for a job. FileSystem fs = FileSystem.get(job.getConfiguration()); DataOutputStream out = fs.create(new Path(location + "/_schema")); out.writeBytes(schema.toString()); out.writeByte('\n'); out.close(); }
StoreMetadata
also has a
storeStatistics
function, but Pig does not use this
yet.
[28] A file in the same directory that is not a part file. Side
files start with an underscore character. MapReduce’s
FileInputFormat
knows to ignore them when
reading input for a job.
[29] Meaning that the filter can be broken into two filters—one
that contains the partition keys and one that does not—and produce
the same end result. This is possible when the expressions are
connected by and
but not when they are connected by
or
.
[30] HCatalog is a table-management service for Hadoop. It includes Pig load and store functions. See Metadata in Hadoop for more information on HCatalog.
Get Programming Pig now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.