It is time to turn our attention to how you can
extend Pig. So far we have looked at the operators and functions Pig
provides. But Pig also makes it easy for you to add your own processing
logic via User Defined Functions (UDFs). These are written in Java and,
starting with version 0.8, in Python.[24] This chapter will walk through how you can build
evaluation functions, UDFs that operate on single
elements of data or collections of data. It will also cover how to write
filter functions, UDFs that can be used
as part of filter
statements.
UDFs are powerful tools, and thus the interfaces are somewhat complex. In designing Pig, our goal was to make easy things easy and hard things possible. So, the simplest UDFs can be implemented in a single method, but you will have to implement a few more methods to take advantage of more advanced features. We will cover both cases in this chapter.
Throughout this chapter we will use several running examples of UDFs. Some of these are built-in Pig UDFs, which can be found in your Pig distribution at src/org/apache/pig/builtin/. The others can be found on GitHub with the other example UDFs, in the directory udfs.
Pig and Hadoop are implemented in Java, and so it is natural to implement UDFs in Java. This allows UDFs access to the Hadoop APIs and to many of Pig’s facilities.
Before diving into the details, it is worth considering names. Pig locates a UDF by looking for a Java class that exactly matches the UDF name in the script. For details on where it looks, see Registering UDFs and define and UDFs. There is not an accepted standard on whether UDF names should be all uppercase, camelCased (e.g., MyUdf), or all lowercase. Even the built-in UDFs provided by Pig vary in this regard. Keep in mind that, whatever you choose, you and all of the users of your UDF will have a better user experience if you make the name short, easy to remember, and easy to type.
Writing code that will run in a parallel system presents challenges. A separate instance of your UDF will be constructed and run in each map or reduce task. It is not possible to share state across these instances because they may not all be running at the same time. There will be only one instance of your UDF per map or reduce task, so you can share state within that context.[25]
When writing code for a parallel system, you must remember the power of parallelism. Operations that are acceptable in serial programs may no longer be advisable. Consider a UDF that, when it first starts, connects to a database server to download a translation table. In a serial or low-parallelism environment, this is a reasonable approach. But if you have 10,000 map tasks in your job and they all connect to your database at once, you will most likely hear from your DBA, and the conversation is unlikely to be pleasant.
In addition to an instance in each task, Pig will construct an instance of your UDF on the frontend during the planning stage. It does this for a couple of reasons. One, it wants to test early that it can construct your UDF; it would rather fail during planning than at runtime. Two, as we will cover later in this chapter, it will ask your UDF some questions about schemas and types it accepts as part of the execution planning. It will also give your UDF a chance to store information it wants to make available to the instances of itself that will be run in the backend.
All evaluation functions extend the Java class
org.apache.pig.EvalFunc
. This class uses Java
generics. It is parameterized by the return type of your UDF. The core
method in this class is exec
. It takes one record and
returns one result, which will be invoked for every record that passes
through your execution pipeline. As input it takes a tuple, which
contains all of the fields the script passes to your UDF. It returns the
type by which you parameterized EvalFunc
. For
simple UDFs, this is the only method you need to implement. The
following code gives an example of a UDF that raises an integer to an
integral power and returns a long result:
// java/com/acme/math/Pow.java /** * A simple UDF that takes a value and raises it to the power of a second * value. It can be used in a Pig Latin script as Pow(x, y), where x and y * are both expected to be ints. */ public class Pow extends EvalFunc<Long> { public Long exec(Tuple input) throws IOException { try { /* Rather than give you explicit arguments, UDFs are always handed * a tuple. The UDF must know the arguments it expects and pull * them out of the tuple. These next two lines get the first and * second fields out of the input tuple that was handed in. Since * Tuple.get returns Objects, we must cast them to Integers. If * the case fails, an exception will be thrown. */ int base = (Integer)input.get(0); int exponent = (Integer)input.get(1); long result = 1; /* Probably not the most efficient method...*/ for (int i = 0; i < exponent; i++) { long preresult = result; result *= base; if (preresult > result) { // We overflowed. Give a warning, but do not throw an // exception. warn("Overflow!", PigWarning.TOO_LARGE_FOR_INT); // Returning null will indicate to Pig that we failed but // we want to continue execution. return null; } } return result; } catch (Exception e) { // Throwing an exception will cause the task to fail. throw new IOException("Something bad happened!", e); } } }
EvalFunc
is also used to
implement aggregation functions. Because the group
operator returns a record for each
group, with a bag containing all the records in that group, your eval
func still takes one record and returns one record. As an example of
this, let’s take a look at the implementation of exec
in
Pig’s COUNT
function. Some of the
error-handling code has been removed for ease of reading:
// src/org/apache/pig/builtin/COUNT.java public Long exec(Tuple input) throws IOException { try { // The data bag is passed to the UDF as the first element of the // tuple. DataBag bag = (DataBag)input.get(0); Iterator it = bag.iterator(); long cnt = 0; while (it.hasNext()){ Tuple t = (Tuple)it.next(); // Don't count nulls or empty tuples if (t != null && t.size() > 0 && t.get(0) != null) { cnt++; } } return cnt; } catch (Exception e) { ... } }
Just as UDFs can take complex types as input, they also can return complex types
as output. You could, for example, create a SetIntersection
UDF that took two bags as input and returned a bag as output.
UDFs can also be handed the entire record by
passing
to the UDF. You might expect
that in this case the *
input
Tuple
argument passed to the UDF would contain all the fields passed into the
operator the UDF is in. But it does not. Instead, it contains one field,
which is a tuple that contains all those fields. Consider a Pig Latin
script like this:
data = load 'input' as (x, y, z); processed = foreach data generate myudf(*);
In this case, myudf.exec
will get a
tuple with one field, which will be a tuple that will have three fields:
x
, y
, and z
.
To access the y
field of data
, you will need to call
t.get(0).get(1)
.
Evaluation functions and other UDFs are
exposed to the internals of how Pig represents data types. This means
that when you read a field and expect it to be an integer, you need to
know that it will be an instance of
java.lang.Integer
. For a complete list of Pig
types and how they are represented in Java, see Types. For most of these types, you construct the
appropriate Java objects in the normal way. However, this is not the
case for tuples and bags. These are interfaces, and they do not have
direct constructors. Instead, you must use factory classes for each of
these. This was done so that users and developers could build their
own implementations of tuple and bag and instruct Pig to use
them.
TupleFactory
is an abstract singleton
class that you must use to create tuples. You can also configure which
TupleFactory
is used, since users who provide
their own tuples will need to provide their own factory to produce
them. To get an instance of TupleFactory
to
construct tuples, call the static method
TupleFactory.getInstance()
.
You can now create new tuples with either
newTuple()
or newTuple(int size)
. Whenever
possible you should use the second method, which preallocates the
tuple with the right number of fields. This avoids the need to
dynamically grow the tuple later and is much more efficient. The
method creates a tuple with size
number of fields, all of
which are null. You can now set the fields using the
Tuple
’s set(int fieldNum, Object
val)
method. As an example, we can look at how the example load function we
will build in the next chapter creates tuples:
// JsonLoader.java private TupleFactory tupleFactory = TupleFactory.getInstance(); private Object readField(JsonParser p, ResourceFieldSchema field, int fieldnum) throws IOException { ... ResourceSchema s = field.getSchema(); ResourceFieldSchema[] fs = s.getFields(); Tuple t = tupleFactory.newTuple(fs.length); for (int j = 0; j < fs.length; j++) { t.set(j, readField(p, fs[j], j)); } ... }
If you do not know the number of fields in
the tuple when it is constructed, you can use newTuple()
.
You can then add fields using Tuple
’s
append(Object val)
method, which will append the field to
the end of the tuple.
To read data from tuples, use the
get(int fieldNum)
method. This returns a Java
Object
because the tuple does not have a schema
instance and does not know what type this field is. You must either
cast the result to the appropriate type or use the utility methods in
org.apache.pig.data.DataType
to determine the
type.
Similar to tuples, BagFactory
must be used to construct
bags. You can get an instance using
BagFactory.getInstance()
. To get a new, empty bag, call
newDefaultBag()
. You can then add tuples to it as you
construct them using DataBag
’s add(Tuple
t)
method. You should do this rather than constructing a list
of tuples and then passing it using
newDefaultBag(List<Tuple> listOfTuples)
, because
bags know how to spill to disk when they grow so large that they
cannot fit into memory. Again we can look at JsonLoader
to see an example of constructing bags:
// JsonLoader.java private BagFactory bagFactory = BagFactory.getInstance(); private Object readField(JsonParser p, ResourceFieldSchema field, int fieldnum) throws IOException { ... DataBag bag = bagFactory.newDefaultBag(); JsonToken innerTok; while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) { t = tupleFactory.newTuple(fs.length); for (int j = 0; j < fs.length; j++) { t.set(j, readField(p, fs[j], j)); } p.nextToken(); // read end of object bag.add(t); } ... }
To read data from a bag, use the iterator
provided by iterator()
. This also implements Java’s
Iterable
, so you can use the
construct for (Tuple t : bag)
.
Pig typechecks a script before running it.
EvalFunc
includes a method to allow you to turn
on type checking for your UDF as well, both for input and output.
When your UDF returns a simple type, Pig uses Java reflection to determine the return type. However,
because exec
takes a tuple, Pig has no way to determine
what input you expect your UDF to take. You can check this at runtime,
of course, but your development and testing will go more smoothly if you
check it at compile time instead. For example, we could use the
Pow
UDF example in the previous section like this:
register 'acme.jar'; A = load 'input' as (x:chararray, y :int); B = foreach A generate y, com.acme.math.Pow(x, 2); dump B;
Pig will start a job and run your tasks. All the
tasks will fail, and you will get an error message ERROR 2078: Caught error from UDF: com.acme.math.Pow
[Something bad happened!]
. Runtime exceptions like this are particularly expensive in
Hadoop, both because scheduling can take a while on a busy cluster and
because each task is tried three times before the whole job is declared
a failure. Let’s fix this UDF so it checks up front that it was given
reasonable input.
The method to declare the input your UDF expects is
outputSchema
. The method is called this because it returns
the schema that describes the UDF’s output. If your UDF does not
override this method, Pig will attempt to ascertain your return type
from the return type of your implementation of
EvalFunc
, and pass your UDF whatever input the
script indicates. If your UDF does implement this method, Pig will pass
it the schema of the input that the script has indicated to pass into
the UDF. This is also your UDF’s opportunity to throw an error if it
receives an input schema that does not match its expectations. An
implementation of this method for Pow
looks like
this:
// java/com/acme/math/Pow.java public Schema outputSchema(Schema input) { // Check that we were passed two fields if (input.size() != 2) { throw new RuntimeException( "Expected (int, int), input does not have 2 fields"); } try { // Get the types for both columns and check them. If they are // wrong, figure out what types were passed and give a good error // message. if (input.getField(0).type != DataType.INTEGER || input.getField(1).type != DataType.INTEGER) { String msg = "Expected input (int, int), received schema ("; msg += DataType.findTypeName(input.getField(0).type); msg += ", "; msg += DataType.findTypeName(input.getField(1).type); msg += ")"; throw new RuntimeException(msg); } } catch (Exception e) { throw new RuntimeException(e); } // Construct our output schema, which is one field that is a long return new Schema(new FieldSchema(null, DataType.LONG)); }
With this method added to
Pow
, when we invoke the previous script that
mistakenly tries to pass a chararray to Pow
, it now fails
almost immediately with java.lang.RuntimeException: Expected input of (int,
int), but received schema (chararray, int)
.
Pig’s Schema
is a
complicated class, and we will not delve into all its complexities here.
The following summary will be enough to help you build your own schemas
for outputSchema
. At its core,
Schema
is a list of
FieldSchema
s and a mapping of aliases to
FieldSchema
s. Each
FieldSchema
contains an alias and a type. The
types are stored as Java bytes, with constants for each type defined in the class
org.apache.pig.data.DataType
.
Schema
is a recursive structure. Each
FieldSchema
also has a
Schema
member. This member is nonnull only when
the type is complex. In the case of tuples, it defines the
schema of the tuple. In the case of bags, it defines the schema of the
tuples in the bag. Starting in 0.9, if a schema is present for a map, it
indicates the data type of values in the map. Before 0.9, maps did not have schemas:
public class Schema implements Serializable, Cloneable { // List of all fields in the schema. private List<FieldSchema> mFields; // Map of alias names to field schemas, so that lookup can be done by alias. private Map<String, FieldSchema> mAliases; // A FieldSchema represents a schema for one field. public static class FieldSchema implements Serializable, Cloneable { // Alias for this field. public String alias; // Datatype, using codes from org.apache.pig.data.DataType. public byte type; // If this is a tuple itself, it can have a schema. Otherwise, this field // must be null. public Schema schema; /** * Constructor for any type. * @param a Alias, if known. If unknown, leave null. * @param t Type, using codes from org.apache.pig.data.DataType. */ public FieldSchema(String a, byte t) { ... } } /** * Create a schema with more than one field. * @param fields List of field schemas that describes the fields. */ public Schema(List<FieldSchema> fields) { ... } /** * Create a schema with only one field. * @param fieldSchema field to put in this schema. */ public Schema(FieldSchema fieldSchema) { ... } /** * Given an alias name, find the associated FieldSchema. * @param alias Alias to look up. * @return FieldSchema, or null if no such alias is in this tuple. */ public FieldSchema getField(String alias) throws FrontendException { // some error checking omitted. return mAliases.get(alias); } /** * Given a field number, find the associated FieldSchema. * * @param fieldNum Field number to look up. * @return FieldSchema for this field. */ public FieldSchema getField(int fieldNum) throws FrontendException { // some error checking omitted. return mFields.get(fieldNum); } }
As mentioned earlier, when your UDF returns a
scalar type, Pig can use reflection to figure out that return type. When your UDF
returns a bag or a tuple, however, you will need to implement
outputSchema
if you want Pig to understand the contents of
that bag or tuple.
Our previous examples have given some hints of how to deal
with errors. When your UDF encounters an error, you have a couple of
choices on how to handle it. The most common case is to issue a warning
and return a null. This tells Pig that your UDF failed and its output
should be viewed as unknown.[26] We saw an example of this when the Pow
function detected overflow:
for (int i = 0; i < exponent; i++) { long preresult = result; result *= base; if (preresult > result) { // We overflowed. Give a warning, but do not throw an // exception. warn("Overflow!", PigWarning.TOO_LARGE_FOR_INT); // Returning null will indicate to Pig that we failed but // we want to continue execution. return null; } }
warn
, a method of
EvalFunc
, takes a message that you provide as
well as a warning code. The warning codes are in
org.apache.pig.PigWarning
, including several
user-defined codes that you can use if none of the provided codes
matches your situation. These warnings are aggregated by Pig and
reported to the user at the end of the job.
Warning and returning null is convenient because it allows your job to continue. When you are processing billions of records, you do not want your job to fail because one record out of all those billions had a chararray where you expected an int. Given enough data, the odds are overwhelming that a few records will be bad, and most calculations will be fine if a few data points are missing.
For errors that are not tolerable, your UDF can throw an exception. If Pig catches an exception, it will assume that you are asking to stop everything, and it will cause the task to fail. Hadoop will then restart your task. If any particular task fails three times, Hadoop will not restart it again. Instead, it will kill all the other tasks and declare the job a failure.
When you have concluded that you do need an
exception, you should also issue a log message so that you can read the task logs later and
get more context to determine what happened.
EvalFunc
has a member log
that is an
instance of org.apache.commons.logging.Log
.
Hadoop prints any log messages into logfiles on the task machine, which
are available from the JobTracker UI. See MapReduce Job Status for
details. You can also print info messages into the log to help you with
debugging.
In addition to error reporting, some UDFs will
need to report progress. Hadoop listens to its tasks to make sure they
are making progress. If it does not hear from a task for five minutes,
it concludes that the task died or went into an infinite loop. It then
kills the task if it is still running, cleans up its resources, and
restarts the task elsewhere. Pig reports progress to Hadoop on a regular
basis. However, if you have a UDF that is very compute-intensive and a
single invocation of it might run for more than five minutes, you should
also report progress. To do this, EvalFunc
provides a member reporter
. By invoking
report.progress()
or report.progress(String
msg)
(where msg
can say whatever you want) at least
every five minutes, your UDF will avoid being viewed as a
timeout.
Our discussion so far assumes that your UDF knows everything it needs to know at development time. This is not always the case. Consider a UDF that needs to read a lookup table from HDFS. You would like to be able to declare the filename when you use the UDF. You can do that by defining a nondefault constructor for your UDF.
By default, EvalFunc
s
have a no-argument constructor, but you can provide a constructor that
takes one or more String
arguments. This alternate
constructor is then referenced in Pig Latin by using the define
statement to define the UDF; see define and UDFs for details.
As an example, we will look at a new UDF,
MetroResolver
. This UDF takes a
city name as input and returns the name of the larger metropolitan area
that city is part of. For example, given Pasadena, it will return Los
Angeles. Based on which country the input cities are in, a different
lookup table will be needed. The name of a file in HDFS that contains
this lookup table can be provided as a constructor argument. The class
declaration, members, and constructor for our UDF look like this:
// java/com/acme/marketing/MetroResolver.java /** * A lookup UDF that maps cities to metropolitan areas. */ public class MetroResolver extends EvalFunc<String> { String lookupFile; HashMap<String, String> lookup = null; /* * @param file - File that contains a lookup table mapping cities to metro * areas. The file must be located on the filesystem where this UDF will * run. */ public MetroResolver(String file) { // Just store the filename. Don't load the lookup table, since we may // be on the frontend or the backend. lookupFile = file; } }
The UDF can now be invoked in a Pig Latin script like this:
register 'acme.jar'; define MetroResolver com.acme.marketing.MetroResolver('/user/you/cities/us'); A = load 'input' as (city:chararray); B = foreach A generate city, MetroResolver(city); dump B;
The filename /user/you/cities/us will be passed to
MetroResolver
every time Pig constructs it.
However, our UDF is not yet complete because we have not constructed the
lookup table. In fact, we explicitly set it to null. It does not make
sense to construct it in the constructor, because the constructor will
be invoked on both the frontend and backend. There are forms of dark
magic that will allow the UDF to figure out whether it is being invoked
on the frontend or backend, but I cannot recommend them, because
they are not guaranteed to work the same between releases. It is much
better to do the lookup table construction in a method that we know will
be called only in the backend.
EvalFunc
does not provide
an initialize method that it calls on the backend before it begins
processing. You can work around this by keeping a flag to determine
whether you have initialized your UDF in a given task. The
exec
function for MetroResolver
does
this by tracking whether lookup
is null:
public String exec(Tuple input) throws IOException { if (lookup == null) { // We have not been initialized yet, so do it now. lookup = new HashMap<String, String>(); // Get an instance of the HDFS FileSystem class so // we can read a file from HDFS. We need a copy of // our configuration to do that. // Read the configuration from the UDFContext. FileSystem fs = FileSystem.get(UDFContext.getUDFContext().getJobConf()); DataInputStream in = fs.open(new Path(lookupFile)); String line; while ((line = in.readLine()) != null) { String[] toks = new String[2]; toks = line.split(":", 2); lookup.put(toks[0], toks[1]); } in.close(); } return lookup.get((String)input.get(0)); }
This initialization section handles opening the
file and reading it. In order to open the file, it must first connect to
HDFS. This is accomplished by FileSystem.get
.
This method in turn needs a JobConf
object, which
is where Hadoop stores all its job information. The
JobConf
object can be obtained using
UDFContext
, which we will cover in more detail
later. Note that obtaining JobConf
in this way
works only on the backend, as no job configuration exists on the
frontend.
Once we are connected to HDFS, we open the file
and read it as we would any other file. It is parsed into two fields and
put into the hash table. All subsequent calls to exec
will
just be lookups in the hash table.
Our MetroResolver
UDF opens and
reads its lookup file from HDFS, which you will often want. However,
having hundreds or thousands of map tasks open the same file on HDFS
at the same time puts significant load on the NameNode and the DataNodes that host the file’s blocks. To avoid this
situation, Hadoop provides the distributed cache, which allows users to preload HDFS
files locally onto the nodes their tasks will run on. For details, see
Distributed Cache.
Let’s write a second version of
MetroResolver
that uses the distributed cache. Beginning in version 0.9,
EvalFunc
provides a method
getCacheFiles
that is called on the frontend. Your UDF
returns a list of files from this method that it wants in the
distributed cache. The format of each file is
client_file
#
task_file
, where
client_file
is the path to the file on your
client, and task_file
is the name the file
will be given on your task node. task_file
is relative to your UDF’s working directory on the backend. You should
place any files in your working directory rather than using an
absolute path. task_file
will be a local
file on the task node and should be read using standard Java file utilities. It should not be read using HDFS’s
FileSystem
:
// java/com/acme/marketing/MetroResolverV2.java /** * A lookup UDF that maps cities to metropolatin areas, this time using the * Distributed Cache. */ public class MetroResolverV2 extends EvalFunc<String> { String lookupFile; HashMap<String, String> lookup = null; /* * @param file - File that contains a lookup table mapping cities to metro * areas. The file must be located on the filesystem where this UDF will * run. */ public MetroResolverV2(String file) { // Just store the filename. Don't load the lookup table, since we may // be on the frontend or the backend. lookupFile = file; } public String exec(Tuple input) throws IOException { if (lookup == null) { // We have not been initialized yet, so do it now. lookup = new HashMap<String, String>(); // Open the file as a local file. FileReader fr = new FileReader("./mrv2_lookup"); BufferedReader d = new BufferedReader(fr); String line; while ((line = d.readLine()) != null) { String[] toks = new String[2]; toks = line.split(":", 2); lookup.put(toks[0], toks[1]); } fr.close(); } return lookup.get((String)input.get(0)); } public List<String> getCacheFiles() { List<String> list = new ArrayList<String>(1); // We were passed the name of the file on HDFS. Append a // name for the file on the task node. list.add(lookupFile + "#mrv2_lookup"); return list; } }
Constructor arguments work as a way to pass
information into your UDF, if you know the data at the time the script
is written. You can extend this using parameter substitution (see
Parameter Substitution) so that data can be passed when the
script is run. But some information you want to pass from frontend to
backend cannot be known when the script is run, or it might not be
accessible in String
form on the command line. An
example is collecting properties from the environment and passing
them.
To allow UDFs to pass data from the frontend
to the backend, starting in version 0.8, Pig provides a singleton class, UDFContext
. Your UDF obtains a
reference to it by calling getUDFContext
. We have already seen that
UDFs can use UDFContext
to obtain a copy of the
JobConf
. Beginning in version 0.9,
UDFContext
also captures the
System
properties on the client and carries
them to the backend. Your UDF can then obtain them by calling
getClientSystemProperties
.
UDFContext
also
provides mechanisms for you to pass a properties object explicitly for
your UDF. You can either pass a properties object for all UDFs of the
same class or pass a specific object for each instance of your UDF. To
use the same one for all instances of your UDF, call
getUDFProperties(this.getClass())
. This will return a
Properties
object that is a reference to a
properties object kept by UDFContext
.
UDFContext
will capture and transmit to the backend
any changes made in this object. You can call this in
outputSchema
, which is guaranteed to be called in the
frontend. When you want to read the data, call the same method again
in your exec
method. When using the object in the
exec
method, keep in mind that any changes made to the
returned Properties
will not be transmitted to
other instances of the UDF on the backend, unless you happen to have
another instance of the same UDF in the same task. This is a mechanism
for sending information from the frontend to the backend, not between
instances in the backend.
Sometimes you will want to transmit different
data to different instances of the same UDF. By different instances I
mean different invocations in your Pig Latin script, not different
instantiations in various map and reduce tasks. To support this,
UDFContext
provides
getUDFProperties(Class, String[])
. The constructor
arguments to your UDF are a good candidate to be passed as the array
of String
. This allows each instance of the UDF to
differentiate itself. If your UDF does not take constructor arguments,
or all arguments have the same value, you can add one unused argument
that is solely to distinguish separate instances of the UDF.
Consider a UDF that has its own properties file, which might be useful if you want to pass different properties to different UDFs, or if you have many UDF-specific properties that you want to change without changing your Pig properties file. Let’s write a second version of the stock analyzer UDF that we used in Chapter 6:
// java/com/acme/financial/AnalyzeStockV2.java /** * This UDF takes a bag of information about a stock and * produces a floating-point score between 1 and 100, * 1 being sell, 100 being buy. */ public class AnalyzeStockV2 extends EvalFunc<Float> { Random r = new Random(); Properties myProperties = null; @Override public Float exec(Tuple input) throws IOException { if (myProperties == null) { // Retrieve our class-specific properties from UDFContext. myProperties = UDFContext.getUDFContext().getUDFProperties(this.getClass()); } // Make sure the input isn't null and is of the right size. if (input == null || input.size() != 1) return null; DataBag b = (DataBag)input.get(0); for (Tuple t : b) { // Do some magic analysis, using properites from myProperties to // decide how ... } return r.nextFloat() * 100; } @Override public Schema outputSchema(Schema input) { try { // Read our properties file. Properties prop = new Properties(); prop.load(new FileInputStream("/tmp/stock.properties")); // Get a properties object specific to this UDF class. UDFContext context = UDFContext.getUDFContext(); Properties udfProp = context.getUDFProperties(this.getClass()); // Copy our properties into it. There is no need to pass it // back to UDFContext. for (Map.Entry<Object, Object> e : prop.entrySet()) { udfProp.setProperty((String)e.getKey(), (String)e.getValue()); } } catch (Exception e) { throw new RuntimeException(e); } return new Schema(new Schema.FieldSchema(null, DataType.FLOAT)); } }
Sometimes you want different UDF implementations depending
on the data type the UDF is processing. For example, MIN(long)
should return a long, whereas
MIN(int)
should return an int. To enable this,
EvalFunc
provides the method
getArgToFuncMapping
. If this method returns a null, Pig
will use the current UDF. To provide a list of alternate UDFs based on
the input types, this function returns a list of
FuncSpec
s. A FuncSpec
is a
Pig class that describes a UDF. Each of these
FuncSpec
s describes a set of expected input
arguments and the UDF, as a Java class, that should be used to handle them.
Pig’s typechecker will use this list to determine which Java
class to place in the execution pipeline (more on this later). The
getArgToFuncMapping
of Pig’s built-in
MIN
function looks like this:
// src/org/apache/pig/builtin/MIN.java public List<FuncSpec> getArgToFuncMapping() throws FrontendException { List<FuncSpec> funcList = new ArrayList<FuncSpec>(); // The first element in the list is this class itself, which is built to // handle the case where the input is a bytearray. So we return our own // classname and a schema that indicates this function expects a BAG with // tuples that have one field, which is a bytearray. generateNestedSchema is a // helper method that generates schemas of bags that have tuples with one // field. funcList.add(new FuncSpec(this.getClass().getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BYTEARRAY))); // If our input schema is a bag with tuples with one field that is a double, // then we use the class DoubleMin instead of MIN to implement min. funcList.add(new FuncSpec(DoubleMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE))); // and so on... funcList.add(new FuncSpec(FloatMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT))); funcList.add(new FuncSpec(IntMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER))); funcList.add(new FuncSpec(LongMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG))); funcList.add(new FuncSpec(StringMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY))); return funcList; }
Pig’s typechecker goes through a set of steps to
determine which FuncSpec
is the closest match,
and thus which Java class it should place in this job’s execution
pipeline. At each step, if it finds a match, it uses that match. If it
finds more than one match at a given step, it will return an error that
gives all the matching possibilities. If it finds no match in the whole
list, it will also give an error. As an example of this, let’s consider
another version of the Pow
UDF we built above. We will call
this one PowV2
. It takes either two longs or two doubles as
input. Its getArgToFuncMapping
looks like the following:
// java/com/acme/math/PowV2.java public List<FuncSpec> getArgToFuncMapping() throws FrontendException { List<FuncSpec> funcList = new ArrayList<FuncSpec>(); Schema s = new Schema(); s.add(new Schema.FieldSchema(null, DataType.DOUBLE)); s.add(new Schema.FieldSchema(null, DataType.DOUBLE)); funcList.add(new FuncSpec(this.getClass().getName(), s)); s = new Schema(); s.add(new Schema.FieldSchema(null, DataType.LONG)); s.add(new Schema.FieldSchema(null, DataType.LONG)); funcList.add(new FuncSpec(LongPow.class.getName(), s)); return funcList; }
In the typechecker’s search for the best UDF to
use, step one is to look for an exact match, where all of the expected
input declared by the UDF is matched by the actual input passed in Pig
Latin. Pow(2.0, 3.1415)
passes two doubles, so Pig Latin
will choose PowV2
. Pow(2L, 3L)
passes two longs, so LongPow
will be used.
Step two is to look for bytearrays that are
passed into the UDF and see whether a match can be made by inserting
casts for those bytearrays. For example, Pig will rewrite Pow(x,
2L)
, where x
is a bytearray, as Pow((long)x,
2L)
and use LongPow
. This rule can confuse
Pig when all arguments are bytearrays, because bytearrays can be cast to
any type. Pow(x, y)
, where both x
and
y
are bytearrays, results in an error message:
Multiple matching functions for com.acme.math.PowV2 with input schema: ({double,double}, {long,long}). Please use an explicit cast.
Step three is to look for an implicit cast that
will match one of the provided schemas. The implicit cast that is
“closest” will be used. Implicit casting of numeric types
goes from int to long to float to double, and by closest I mean the cast
that requires the least steps in that list. So, Pow(2, 2)
will use LongPow
, whereas Pow(2.0,
2)
will use PowV2
.
Step four is to look for a working combination
of steps two and three, bytearray casts plus implicit casts.
Pow(x, 3.14f)
, where x
is a bytearray, will
use PowV2
by promoting 3.14f
to a double and casting x
to a double.
If after all these steps Pig still has not found
a suitable method, it will fail and say it cannot determine which method
to use. Pow('hello', 2)
gives an error message:
Could not infer the matching function for com.acme.math.PowV2 as multiple or none of them fit. Please use an explicit cast.
Some operations you will perform in your UDFs will require more memory than is available. As an example, you might want to build a UDF that calculates the cumulative sum of a set of inputs. This will return a bag of values because, for each input, it needs to return the intermediate sum at that input.
Pig’s bags handle spilling data to disk automatically when they pass a certain size threshold or when only a certain amount of heap space remains. Spilling to disk is expensive and should be avoided whenever possible. But if you must store large amounts of data in a bag, Pig will manage it.
Bags are the only Pig data type that know how to spill. Tuples and maps must fit into memory. Bags that are too large to fit in memory can still be referenced in a tuple or a map; this will not be counted as those tuples or maps not fitting into memory.
I have already mentioned in a number of other places that there are significant advantages to using Hadoop’s combiner whenever possible. It lowers skew in your reduce tasks, as well as the amount of data sent over the network between map and reduce tasks. For details on the combiner and when it is run, see Combiner Phase.
Use of the combiner is interesting when you are working with sets of
data, usually sets you intend to aggregate and then compute a single or
small set of values for. There are two classes of functions that fit
nicely into the combiner: distributive and algebraic. A function is distributive
if the same result is obtained by 1) dividing its input set into subsets,
applying the function to those subsets, and then applying the function to
those results; or 2) applying the function to the original set. SUM
is an example of this. A function is said
to be algebraic if it can be divided into initial, intermediate, and final
functions (possibly different from the initial function), where the
initial function is applied to subsets of the input set, the intermediate
function is applied to results of the initial function, and the final
function is applied to all of the results of the intermediate function.
COUNT
is an example of an algebraic
function, with count being used as the initial function and sum as the
intermediate and final functions. A distributive function is a special
case of an algebraic function, where the initial, intermediate, and final
functions are all identical to the original function.
An EvalFunc
can declare
itself to be algebraic by implementing the Java interface
Algebraic
. Algebraic
provides three methods that allow your UDF to declare Java classes that
implement its initial, intermediate, and final functionality. These
classes must extend EvalFunc
:
// src/org/apache/pig/Algebraic.java public interface Algebraic{ /** * Get the initial function. * @return A function name of f_init. f_init should be an eval func. */ public String getInitial(); /** * Get the intermediate function. * @return A function name of f_intermed. f_intermed should be an eval func. */ public String getIntermed(); /** * Get the final function. * @return A function name of f_final. f_final should be an eval func * parameterized by the same datum as the eval func implementing this interface. */ public String getFinal(); }
Each of these methods returns a name of a Java
class, which should itself implement EvalFunc
. Pig
will use these UDFs to rewrite the execution of your script. Consider the
following Pig Latin script:
input = load 'data' as (x, y); grpd = group input by x; cnt = foreach grpd generate group, COUNT(input); store cnt into 'result';
The execution pipeline for this script would initially look like:
After being rewritten to use the combiner, it would look like:
As an example, we will walk through the
implementation for COUNT
. Its algebraic functions look like
this:
// src/org/apache/pig/builtin/COUNT.java public String getInitial() { return Initial.class.getName(); } public String getIntermed() { return Intermediate.class.getName(); } public String getFinal() { return Final.class.getName(); }
Each of these referenced classes is a static
internal class in COUNT
. The implementation of
Initial
is:
// src/org/apache/pig/builtin/COUNT.java static public class Initial extends EvalFunc<Tuple> { public Tuple exec(Tuple input) throws IOException { // Since Initial is guaranteed to be called // only in the map, it will be called with an // input of a bag with a single tuple - the // count should always be 1 if bag is nonempty, DataBag bag = (DataBag)input.get(0); Iterator it = bag.iterator(); if (it.hasNext()){ Tuple t = (Tuple)it.next(); if (t != null && t.size() > 0 && t.get(0) != null) return mTupleFactory.newTuple(Long.valueOf(1)); } return mTupleFactory.newTuple(Long.valueOf(0)); } }
Even though the initial function is guaranteed to
receive only one record in its input, that record will match the schema of
the original function. So, in the case of COUNT
, it
will be a bag. Thus, this initial method determines whether there is a
nonnull record in that bag. If so, it returns one; otherwise, it returns
zero. The return type of the initial function is a tuple. The contents of
that tuple are entirely up to you as the UDF implementer. In this case,
the initial returns a tuple with one long field.
COUNT
’s
Intermediate
class sums the counts seen so
far:
// src/org/apache/pig/builtin/COUNT.java static public class Intermediate extends EvalFunc<Tuple> { public Tuple exec(Tuple input) throws IOException { try { return mTupleFactory.newTuple(sum(input)); } catch (ExecException ee) { ... } } } static protected Long sum(Tuple input) throws ExecException, NumberFormatException { DataBag values = (DataBag)input.get(0); long sum = 0; for (Iterator<Tuple> it = values.iterator(); it.hasNext();) { Tuple t = it.next(); sum += (Long)t.get(0); } return sum; }
The input to the intermediate function is a bag of
tuples that were returned by the initial function. The intermediate
function may be called zero, one, or many times. So, it needs to output
tuples that match the input tuples it expects. The framework will handle
placing those tuples in bags. COUNT
’s intermediate
function returns a tuple with a long. As we now want to sum the previous
counts, this function implements SUM
rather than
COUNT
.
The final function is called in the reducer and is
guaranteed to be called only once. Its input type is a bag of tuples that
both the initial and intermediate implementations return. Its return type
needs to be the return type of the original UDF, which in this case is
long. In COUNT
’s case, this is the same operation as
the intermediate because it sums the intermediate sums:
// src/org/apache/pig/builtin/COUNT.java static public class Final extends EvalFunc<Long> { public Long exec(Tuple input) throws IOException { try { return sum(input); } catch (Exception ee) { ... } } }
Implementing Algebraic
does
not guarantee that the algebraic implementation will always be used. Pig
chooses the algebraic implementation only if all UDFs in the same
foreach
statement are algebraic. This is because our testing
has shown that using the combiner with data that cannot be combined
significantly slows down the job. And there is no way in Hadoop to route
some data to the combiner (for algebraic functions) and some straight to
the reducer (for nonalgebraic). This means that your UDF must always
implement the exec
method, even if you hope it will always be
used in algebraic mode. An additional motivation is to implement algebraic
mode for your UDFs when possible.
Some calculations cannot be done in an algebraic manner. In
particular, any function that requires its records to be sorted before
beginning is not algebraic. But many of these methods still do not need to
see their entire input at once; they can work on subsets of the data as
long as they are guaranteed it is all available. This means Pig does not
have to read all of the records into memory at once. Instead, it can read
a subset of the records and pass them to the UDF. To handle these cases,
Pig provides the Accumulator
interface. Rather than
calling a UDF once with the entire input set in one bag, Pig will call it
multiple times with a subset of the records. When it has passed all the
records in, it will then ask for a result. Finally, it will give the UDF a
chance to reset its state before passing it records for the next
group:
// src/org/apache/pig/Accumulator.java public interface Accumulator <T> { /** * Pass tuples to the UDF. * @param b A tuple containing a single field, which is a bag. The bag will * contain the set of tuples being passed to the UDF in this iteration. */ public void accumulate(Tuple b) throws IOException; /** * Called when all tuples from current key have been passed to accumulate. * @return the value for the UDF for this key. */ public T getValue(); /** * Called after getValue() to prepare processing for next key. */ public void cleanup(); }
As an example, let’s look at COUNT
’s implementation of the
accumulator:
// src/org/apache/pig/builtin/COUNT.java private long intermediateCount = 0L; public void accumulate(Tuple b) throws IOException { try { DataBag bag = (DataBag)b.get(0); Iterator it = bag.iterator(); while (it.hasNext()){ Tuple t = (Tuple)it.next(); if (t != null && t.size() > 0 && t.get(0) != null) { intermediateCount += 1; } } } catch (Exception e) { ... } } public void cleanup() { intermediateCount = 0L; } public Long getValue() { return intermediateCount; }
By default, Pig passes accumulate
20,000 records at once. You can modify this value by setting the property
pig.accumulative.batchsize
either on the command line or
using set
in your script.
As mentioned earlier, one major class of methods that can use the accumulator are those that require sorted input, such as session analysis. Usually such a UDF will want records within the group sorted by timestamp. As an example, let’s say you have log data from your web servers that includes the user ID, timestamp, and the URL the user viewed, and you want to do session analysis on this data:
logs = load 'serverlogs' as (id:chararray, ts: long, url: chararray); byuser = group logs by id; results = foreach byuser { sorted = order logs by ts; generate group, SessionAnalysis(sorted); };
Pig can move the sort done by the
order
statement to Hadoop, to be done as part of the shuffle
phase. Thus, Pig is still able to read a subset of records at a time from
Hadoop and pass those directly to SessionAnalysis
. This
important optimization allows accumulator UDFs to work with sorted
data.
Whenever possible, Pig will choose to use the
algebraic implementation of a UDF over the accumulator. This is because
the accumulator helps avoid spilling records to disk, but it does not
reduce network cost or help balance the reducers. If all UDFs in a
foreach
implement Accumulator
and at
least one does not implement Algebraic
, Pig will
use the accumulator. If at least one does not use the accumulator, Pig
will not use the accumulator. This is because Pig already has to read the
entire bag into memory to pass to the UDF that does not implement the
accumulator, so there is no longer any value in the accumulator.
Pig and Hadoop are implemented in Java, so Java is a natural choice for UDFs as well. But not being forced into Java would be nice. For simple UDFs of only a few lines, the cycle of write, compile, package into a JAR, and deploy is an especially heavyweight process. To allow users to write UDFs in scripting languages, we added support for UDFs in Python to Pig 0.8. We did it in such a way that supporting any scripting language that compiles down to the JVM requires only a few hundred lines of code. We hope to keep expanding the supported languages in the future.
Python UDFs consist of a single function that is
used in place of the exec
method of a Java function. They can
be annotated to indicate their schema. The more advanced features of
evaluation functions—such as overloading, constructor arguments, and
algebraic and accumulator interfaces—are not available yet.
Python UDFs are executed using the Jython framework. The benefit is that Python UDFs can be compiled to Java bytecode and run with relatively little performance penalty. The downside is that Jython is compatible with version 2.5 of Python, so Python 3 features are not available to UDF writers.
To register and define your Python UDFs in Pig
Latin, see Registering Python UDFs. In this section we
will focus on writing the UDFs themselves. Let’s take a look at the
production
UDF we used in that earlier section:
# production.py @outputSchema("production:float") def production(slugging_pct, onbase_pct): return slugging_pct + onbase_pct
The code is self-explanatory. The annotation of
@outputSchema
tells Pig that this UDF will return a float and
that the name of the field is “production”. The output schema
annotation can specify any Pig type. The syntax for tuples and bags
matches the syntax for declaring a field to be a tuple or a bag in
load
; see Schemas for details.
Sometimes schemas are variable and not statically expressible. For these cases you can provide a schema function that will define your schema. Let’s write a Python UDF that squares a number, always returning a number of the same type:
# square.py @outputSchemaFunction("schema") def square(num): return num * num @schemaFunction("schema") def schema(input): # Return whatever type we were handed return input
The input to the schema function is in the same
format as the one specified in @outputSchema
:
colname:type
. Its output is expected to be in
the same format.
If neither @outputSchema
nor
@outputSchemaFunction
is provided for a Python function, it
will be assumed to return a single bytearray value. Because there will be
no load function for the value, Pig will not be able to cast it to any
other type, so it will be worthless for anything but store
or
dump
.
In order to pass data between Java and Python, Pig must define a mapping of types. Table 10-1 describes the mapping between Pig and Python types.
Any value that is null in Pig will be translated
to the None
object in Python. Similarly, any time
the None
object is returned by Python, Pig will map
it to a null of the expected type.
One issue that Pig does not handle for your Python UDFs is bringing along dependent modules. If your Python file imports other modules, you will need to wrap those in a JAR and register that file as part of your Pig script.[27]
One last issue to consider is performance. What is the cost of using Python instead of Java? Of course it depends on your script, the computation you are doing, and your data. And because Python UDFs do not yet support advanced features such as algebraic mode, it can be harder to optimize them. Given all those caveats, tests have shown that Jython functions have a higher instantiation overhead. Once that is paid, they take about 1.2 times the amount of time as the equivalent Java functions. Due to the instantiation overhead, tests with few input lines (10,000 or so) took twice as long as their Java equivalents. These tests were run on simple functions that did almost no processing, so it is not a measure of Jython versus Java, but rather of Pig’s overhead in working with Jython.
Filter functions are evaluation functions that return a
Boolean value. Pig does not support Boolean as a full-fledged type, so
filter functions cannot appear in statements such as foreach
where the results are output to another operator. However, filter
functions can be used in filter
statements. Consider a
“nearness” function that, given two zip codes, returns true
or false depending on whether those two zip codes are within a certain
distance of each other:
/** * A filter UDF that determines whether two zip codes are within a given distance. */ public class CloseEnough extends FilterFunc { int distance; Random r = new Random(); /* * @param miles - Distance in miles that two zip codes can be apart and * still be considered close enough. */ public CloseEnough(String miles) { // UDFs can only take strings; convert to int here. distance = Integer.valueOf(miles); } public Boolean exec(Tuple input) throws IOException { // expect two strings String zip1 = (String)input.get(0); String zip2 = (String)input.get(1); // do some lookup on zip code tables return r.nextBoolean(); } }
[24] In 0.9, eval funcs can also be written in JavaScript, though this is experimental and has not yet been fully tested.
[25] Assuming there is one instance of your UDF in the script. Each reference to a UDF in a script becomes a separate instance on the backend, even if they are placed in the same map or reduce task.
[26] Recall that in Pig null means that the value is unknown, not that it is 0 or unset.
Get Programming Pig now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.