It is time to dig into Pig Latin. This chapter provides you with the basics of Pig Latin, enough to write your first useful scripts. More advanced features of Pig Latin are covered in Chapter 6.
Pig Latin is a dataflow language. Each processing
step results in a new data set, or relation. In input = load
'data'
, input
is the name of the relation that results
from loading the data set data. A
relation name is referred to as an alias. Relation names look like
variables, but they are not. Once made, an assignment is permanent. It is
possible to reuse relation names; for example, this is legitimate:
A = load 'NYSE_dividends' (exchange, symbol, date, dividends); A = filter A by dividends > 0; A = foreach A generate UPPER(symbol);
However, it is not recommended. It looks here as
if you are reassigning A
, but really you are creating
new relations called A
, losing track of the old
relations called A
. Pig is smart enough to keep up, but
it still is not a good practice. It leads to confusion when trying to read
your programs (which A
am I referring
to?) and when reading error messages.
In addition to relation names, Pig Latin also has
field names. They name a field (or column) in a relation. In
the previous snippet of Pig Latin, dividends
and symbol
are examples of field names. These are
somewhat like variables in that they will contain a different value for
each record as it passes through the pipeline, but you cannot assign
values to them.
Both relation and field names must start with an alphabetic
character, and then they can have zero or more alphabetic, numeric, or
_
(underscore) characters. All characters in the name
must be ASCII.
Unfortunately, Pig Latin cannot decide whether it is
case-sensitive. Keywords in Pig Latin are not case-sensitive; for
example, LOAD
is equivalent to load
. But
relation and field names are. So A = load 'foo';
is not
equivalent to a = load 'foo';
. UDF names are also
case-sensitive, thus COUNT
is not the same UDF as
count
.
Before you can do anything of interest, you need to be able to add inputs and outputs to your data flows.
The first step to any data flow is to specify your input.
In Pig Latin this is done with the load
statement. By
default, load
looks for your data on HDFS in a
tab-delimited file using the default load function
PigStorage
. divs = load
'/data/examples/NYSE_dividends';
will look for a file called
NYSE_dividends in the directory
/data/examples. You can also
specify relative path names. By default, your Pig jobs will run in your
home directory on HDFS, /users/yourlogin
.
Unless you change directories, all relative paths will be evaluated from
there. You can also specify a full URL for the path, for example,
hdfs://nn.acme.com/data/examples/NYSE_dividends
to read the file from the HDFS instance that has nn.acme.com
as a NameNode.
In practice, most of your data will not be in
tab-separated text files. You also might be loading data from storage
systems other than HDFS. Pig allows you to specify the function for
loading your data with the using
clause. For example, if you wanted to
load your data from HBase, you would use the loader for HBase:
divs = load 'NYSE_dividends' using HBaseStorage();
If you do not specify a load function, the
built-in function PigStorage
will be used. You can also
pass arguments to your load function via the using
clause.
For example, if you are reading comma-separated text data,
PigStorage
takes an argument to indicate which character to
use as a separator:
divs = load 'NYSE_dividends' using PigStorage(',');
The load
statement also can have an
as
clause, which allows you to specify the
schema of the data you are loading. (The syntax and semantics of
declaring schemas in Pig Latin is discussed in Schemas.)
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
When specifying a “file” to read
from HDFS, you can specify directories. In this case, Pig will find all
files under the directory you specify and use them as input for that
load
statement. So, if you had a directory input with two datafiles today and yesterday under it, and you specified
input as your file to load, Pig
will read both today and yesterday as input. If the directory you
specify has other directories, files in those directories will be
included as well.
PigStorage
and
TextLoader
, the two built-in Pig load functions that
operate on HDFS files, support globs.[6] With globs, you can read multiple files that are not under
the same directory or read some but not all files under a directory.
Table 5-1 describes globs that are valid in
Hadoop 0.20. Be aware that glob meaning is determined by
HDFS underneath Pig, so the globs that will work for you depend on your
version of HDFS. Also, if you are issuing Pig Latin commands from a Unix
shell command line, you will need to escape many of the glob characters
to prevent your shell from expanding them.
Table 5-1. Globs in Hadoop 0.20
After you have finished processing your data, you will
want to write it out somewhere. Pig provides the
store
statement for this purpose. In many ways it is
the mirror image of the load
statement. By default,
Pig stores your data on HDFS in a tab-delimited file using PigStorage
:[7]
store processed into '/data/examples/processed';
Pig will write the results of your processing into a directory processed in the directory /data/examples. You can specify relative path names, as well as a full URL for the path, such as hdfs://nn.acme.com/data/examples/processed.
If you do not specify a store function,
PigStorage
will be used. You can specify a different store
function with a using
clause:
store processed into 'processed' using HBaseStorage();
You can also pass arguments to your store
function. For example, if you want to store your data as comma-separated
text data, PigStorage
takes an argument to indicate
which character to use as a separator:
store processed into 'processed' using PigStorage(',');
As noted in Running Pig, when
writing to a filesystem, processed
will be a directory with part files rather than a single file. But how
many part files will be created? That depends on the parallelism of the
last job before the store
. If it has reduces, it will be
determined by the parallel level set for that job. See Parallel for information on how this is determined. If it
is a map-only job, it will be determined by the number of maps, which is
controlled by Hadoop and not Pig.
In most cases you will want to store your data somewhere
when you are done processing it. But occasionally you will want to see
it on the screen. This is particularly useful during debugging and
prototyping sessions. It can also be useful for quick ad
hoc jobs. dump
directs the output of your
script to your screen:
dump processed;
Up through version 0.7, the output of dump
matches the format of constants in Pig Latin. So, longs are followed by
an L
, and floats by an F
, and maps are surrounded by []
(brackets), tuples by ()
(parentheses), and bags by {}
(braces).
Starting with version 0.8, the L
for
longs and F
for floats have been
dropped, though the markers for the complex types have been kept. Nulls
are indicated by missing values, and fields are separated by commas.
Because each record in the output is a tuple, it is surrounded by
()
.
Relational operators are the main tools Pig Latin provides to operate on your data. They allow you to transform it by sorting, grouping, joining, projecting, and filtering. This section covers the basic relational operators. More advanced features of these operators, as well as advanced relational operators, are covered in Advanced Relational Operations. What is covered here will be enough to get you started programming in Pig Latin.
foreach
takes a set of expressions and
applies them to every record in the data pipeline, hence the name
foreach
. From these expressions it generates new
records to send down the pipeline to the next operator. For those
familiar with database terminology, it is Pig’s projection operator. For
example, the following code loads an entire record, but then removes all
but the user
and id
fields from each record:
A = load 'input' as (user:chararray, id:long, address:chararray, phone:chararray, preferences:map[]); B = foreach A generate user, id;
foreach
supports an array
of expressions. The simplest are constants and field references. The
syntax for constants has already been discussed in Types. Field references can be by name (as shown in the
preceding example) or by position. Positional references are preceded
by a $
(dollar sign) and start from
0:
prices = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); gain = foreach prices generate close - open; gain2 = foreach prices generate $6 - $3;
Relations gain
and gain2
will contain the same values.
Positional style references are useful in situations where the schema
is unknown or undeclared.
In addition to using names and positions, you
can refer to all fields using *
(asterisk), which
produces a tuple that contains all the fields. Beginning in version 0.9, you can also refer to ranges of fields
using ..
(two periods). This is
particularly useful when you have many fields and do not want to
repeat them all in your foreach
command:
prices = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); beginning = foreach prices generate ..open; -- produces exchange, symbol, date, open middle = foreach prices generate open..close; -- produces open, high, low, close end = foreach prices generate volume..; -- produces volume, adj_close
Standard arithmetic operators for integers and floating-point
numbers are supported: +
for addition,
-
for subtraction,
*
for multiplication,
and /
for division. These
operators return values of their own type, so 5/2
is 2
,
whereas 5.0/2.0
is 2.5
. In addition, for integers the
modulo operator %
is
supported. The unary negative operator (-
) is also supported for both integers and
floating-point numbers. Pig Latin obeys the standard mathematical
precedence rules. For information on what happens when arithmetic
operators are applied across different types (for example, 5/2.0
), see Casts.
Null values are viral for all arithmetic operators. That
is, x + null = null
for all values
of x
.
Pig also provides a binary condition operator, often referred to as
bincond. It begins with a Boolean
test, followed by a ?
, then the
value to return if the test is true, then a :
, and finally the value to return if the
test is false. If the test returns null, bincond returns null. Both
value arguments of the bincond must return the same type:
2 == 2 ? 1 : 4 --returns 1 2 == 3 ? 1 : 4 --returns 4 null == 2 ? 1 : 4 -- returns null 2 == 2 ? 1 : 'fred' -- type error; both values must be of the same type
To extract data from complex types, use the
projection operators. For maps this is #
(the pound or
hash), followed by the name of the key as a string. Keep in mind that
the value associated with a key may be of any type. If you reference a
key that does not exist in the map, the result is a null:
bball = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); avg = foreach bball generate bat#'batting_average';
Tuple projection is done with .
, the dot operator. As with top-level
records, the field can be referenced by name (if you have a schema for
the tuple) or by position. Referencing a nonexistent positional field
in the tuple will return null. Referencing a field name that does not
exist in the tuple will produce an error:
A = load 'input' as (t:tuple(x:int, y:int)); B = foreach A generate t.x, t.$1;
Bag projection is not as straightforward as map and tuple projection. Bags do not guarantee that their tuples are stored in any order, so allowing a projection of the tuple inside the bag would not be meaningful. Instead, when you project fields in a bag, you are creating a new bag with only those fields:
A = load 'input' as (b:bag{t:(x:int, y:int)}); B = foreach A generate b.x;
This will produce a new bag whose tuples have
only the field x
in them. You can
project multiple fields in a bag by surrounding the fields with
parentheses and separating them by commas:
A = load 'input' as (b:bag{t:(x:int, y:int)}); B = foreach A generate b.(x, y);
This seemingly pedantic distinction that
b.x
is a bag and not a scalar value
has consequences. Consider the following Pig Latin, which will not
work:
A = load 'foo' as (x:chararray, y:int, z:int); B = group A by x; -- produces bag A containing all the records for a given value of x C = foreach B generate SUM(A.y + A.z);
It is clear what the programmer is trying to
do here. But because A.y
and
B.y
are bags and the addition
operator is not defined on bags, this will produce an error.[8] The correct way to do this calculation in Pig Latin
is:
A = load 'foo' as (x:chararray, y:int, z:int); A1 = foreach A generate x, y + z as yz; B = group A1 by x; C = foreach B generate SUM(A1.yz);
User Defined Functions (UDFs) can be invoked in
foreach
. These are called evaluation functions, or eval
funcs. Because they are part of a foreach
statement, these UDFs take one record at a time and produce one
output. Keep in mind that either the input or the output can be a bag,
so this one record can contain a bag of records:
-- udf_in_foreach.pig divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); --make sure all strings are uppercase upped = foreach divs generate UPPER(symbol) as symbol, dividends; grpd = group upped by symbol; --output a bag upped for each value of symbol --take a bag of integers, produce one result for each group sums = foreach grpd generate group, SUM(upped.dividends);
In addition, eval funcs can take *
as an argument, which passes the entire
record to the function. They can also be invoked with no arguments at
all.
For a complete list of UDFs that are provided with Pig, see Appendix A. For a discussion of how to invoke UDFs not distributed as part of Pig, see User Defined Functions.
The result of each foreach
statement is a new tuple, usually with a different schema than the
tuple that was an input to foreach
. Pig can infer the
data types of the fields in this schema from the foreach
statement. But it cannot always infer the names of those fields. For
fields that are simple projections with no other operators applied,
Pig keeps the same name as before:
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
sym = foreach divs generate symbol;
describe sym;
sym: {symbol: chararray}
Once any expression beyond simple projection
is applied, Pig does not assign a name to the field. If you do not
explicitly assign a name, the field will be nameless and will be
addressable only via a positional parameter, for example, $0
. You can assign a name with the
as
clause:
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
in_cents = foreach divs generate dividends * 100.0 as dividend, dividends * 100.0;
describe in_cents;
in_cents: {dividend: double,double}
Notice that in foreach
the
as
is attached to each expression. This is different than
load
, where it is attached to the entire statement. The
reason for this will become clear when we discuss flatten
in flatten.
The filter
statement allows you to
select which records will be retained in your data pipeline. A
filter
contains a predicate. If that predicate evaluates to
true for a given record, that record will be passed down the pipeline.
Otherwise, it will not.
Predicates can contain the equality operators you expect, including ==
to test equality,
and !=
, >
, >=
, <
, and <=
. These
comparators can be used on any scalar data type. ==
and !=
can be applied to maps and tuples. To use these with two tuples, both
tuples must have either the same schema or no schema. None of the
equality operators can be applied to bags.
Pig Latin follows the operator precedence that
is standard in most programming languages, where arithmetic operators
have precedence over equality operators. So, x
+ y == a + b
is equivalent to (x + y)
== (a + b)
.
For chararrays, users can test to see whether the chararray matches a regular expression:
-- filter_matches.pig divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); startswithcm = filter divs by symbol matches 'CM.*';
Note
Pig uses Java’s regular
expression format. This format requires the entire chararray
to match, not just a portion as in Perl-style regular expressions. For
example, if you are looking for all fields that contain the string
“fred”, you must say '.*fred.*'
and not 'fred'
. The latter will match only the
chararray fred
.
You can find chararrays that do not match a
regular expression by preceding the test with not
:
-- filter_not_matches.pig divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); notstartswithcm = filter divs by not symbol matches 'CM.*';
You can combine multiple predicates into one by
using the Boolean operators and
and or
, and you can reverse the
outcome of any predicate by using the Boolean not
operator. As is standard, the precedence
of Boolean operators, from highest to lowest, is not
, and
,
or
. Thus a
and b or not c
is equivalent to (a
and b) or (not c)
.
Pig will short-circuit Boolean operations when
possible. If the first (left) predicate of an and
is false, the second (right) will not be
evaluated. So in 1 == 2 and udf(x)
,
the UDF will never be invoked. Similarly, if the first predicate of an
or
is true, the second predicate will
not be evaluted. 1 == 1 or udf(x)
will never invoke the UDF.
For Boolean operators, nulls follow the SQL trinary logic. Thus x ==
null
results in a value of null
, not true
(even when x is null
also) or false
. Filters
pass through only those values that are true
. So for a field that had three values
2
, null
, and 4
, if you applied a filter x == 2
to it, only the first record where the
value is 2
would be passed through
the filter. Likewise, x != 2
would
return only the last record where the value is 4
. The way to look for null values is to use
the is null
operator, which returns
true whenever the value is null
. To
find values that are not null, use is not
null
.
Likewise, null
neither matches nor fails to match any
regular expression value.
Just as there are UDFs to be used in evaluation
expressions, there are UDFs specifically for filtering records, called
filter funcs. These are eval funcs
that return a Boolean value and can be invoked in the
filter
statement. Filter funcs cannot be used in
foreach
statements.
The group
statement collects together
records with the same key. It is the first operator we have looked at
that shares its syntax with SQL, but it is important to understand that the grouping
operator in Pig Latin is fundamentally different than the one in SQL. In
SQL the group by
clause creates a group that must
feed directly into one or more aggregate functions. In Pig Latin there
is no direct connection between group
and aggregate
functions. Instead, group
does exactly what it says:
collects all records with the same value for the provided key together
into a bag. You can then pass this to an aggregate function if you want
or do other things with it:
-- count.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily by stock; cnt = foreach grpd generate group, COUNT(daily);
That example groups records by the key stock
and then counts them. It is just as
legitimate to group them and then store them for processing at a later
time:
-- group.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily by stock; store grpd into 'by_group';
The records coming out of the group
by
statement have two fields, the key and the bag of collected
records. The key field is named group
.[9] The bag is named for the alias that was grouped, so in the
previous examples it will be named daily
and have the same schema as the relation
daily
. If the relation daily
has no schema, the bag daily
will have no schema. For each record in
the group, the entire record (including the key) is in the bag. Changing
the last line of the previous script from store grpd...
to
describe grpd;
will produce:
grpd: {group: bytearray,daily: {exchange: bytearray,stock: bytearray}}
You can also group on multiple keys, but the keys must be surrounded by
parentheses. The resulting records still have two fields. In this case,
the group
field is a tuple with a
field for each key:
--twokey.pig daily = load 'NYSE_daily' as (exchange, stock, date, dividends); grpd = group daily by (exchange, stock); avg = foreach grpd generate group, AVG(daily.dividends); describe grpd; grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray, stock: bytearray,date: bytearray,dividends: bytearray}}
You can also use all
to group
together all of the records in your pipeline:
--countall.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily all; cnt = foreach grpd generate COUNT(daily);
The record coming out of group all
has the chararray literal all
as a key. Usually this
does not matter because you will pass the bag directly to an aggregate
function such as COUNT
. But if you
plan to store the record or use it for another purpose, you might want
to project out the artificial key first.
group
is the first operator we have
looked at that usually will force a reduce phase. Grouping means
collecting all records where the key has the same value. If the pipeline
is in a map phase, this will force it to shuffle and then reduce. If the
pipeline is already in a reduce, this will force it to pass through map,
shuffle, and reduce phases.
Because grouping collects
all records together with the same value for the
key, you often get skewed results. That is, just because you have specified
that your job have 100 reducers, there is no reason to expect that the number of
values per key will be distributed evenly. They might have a Gaussian or power law distribution.[10] For example, suppose you have an index of web pages and
you group by the base URL. Certain values such as yahoo.com
are going to have far more entries
than most, which means that some reducers get far more data than others.
Because your MapReduce job is not finished (and any subsequent ones
cannot start) until all your reducers have finished, this skew will significantly slow your processing. In some
cases it will also be impossible for one reducer to manage that much
data.
Pig has a number of ways that it tries to manage this skew to balance out the load across your reducers. The one that applies to grouping is Hadoop’s combiner. For details of how Hadoop’s combiner works, see Combiner Phase. This does not remove all skew, but it places a bound on it. And because for most jobs the number of mappers will be at most in the tens of thousands, even if the reducers get a skewed number of records, the absolute number of records per reducer will be small enough that the reducers can handle them quickly.
Unfortunately, not all calculations can be done using the combiner. Calculations that can be decomposed into any number of steps, such as sum, are called distributive. These fit nicely into the combiner. Calculations that can be decomposed into an initial step, any number of intermediate steps, and a final step are referred to as algebraic. Count is an example of such a function, where the initial step is a count and the intermediate and final steps are sums. Distributive is a special case of algebraic, where the initial, intermediate, and final steps are all the same. Session analysis, where you want to track a user’s actions on a website, is an example of a calculation that is not algebraic. You must have all the records sorted by timestamp before you can start analyzing their interaction with the site.
Pig’s operators and built-in UDFs use the
combiner whenever possible, because of its skew-reducing features and
because early aggregation greatly reduces the amount of data shipped
over the network and written to disk, thus speeding performance
significantly. UDFs can indicate when they can work with the combiner by
implementing the Algebraic
interface. For
information on how to make your UDFs use the combiner, see Algebraic Interface.
For information on how to determine the level of
parallelism when executing your group
operation, see Parallel. Also, keep in mind that when using group
all
, you are necessarily serializing your pipeline. That is, this
step and any step after it until you split out the single bag now
containing all of your records will not be done in parallel.
Finally, group
handles nulls in the
same way that SQL handles them: by collecting all records with a null
key into the same group. Note that this is in direct contradiction to
the way expressions handle nulls (remember that neither null == null
nor null
!= null
are true) and to the way join
(see
Join) handles nulls.
The order
statement sorts your data for
you, producing a total order of your output data. Total order means that
not only is the data sorted in each partition of your data, it is also
guaranteed that all records in partition n
are less than all records in partition
n - 1
for all n
. When your data is stored on HDFS, where
each partition is a part file, this means that cat
will output your data in
order.
The syntax of order
is similar to
group
. You indicate a key or set of keys by which you wish
to order your data. One glaring difference is that there are no
parentheses around the keys when multiple keys are indicated in
order
:
--order.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); bydate = order daily by date; --order2key.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); bydatensymbol = order daily by date, symbol;
It is also possible to reverse the order of the
sort by appending desc
to a key in the sort. In
order
statements with multiple keys, desc
applies only to the key it immediately follows. Other keys will still be
sorted in ascending order:
--orderdesc.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); byclose = order daily by close desc, open; dump byclose; -- open still sorted in ascending order
Data is sorted based on the types of the
indicated fields: numeric values are sorted numerically, chararray
fields are sorted lexically, and bytearray fields are sorted lexically,
using byte values rather than character values. Sorting by maps, tuples, or bags produces errors. For all
data types, nulls are taken to be smaller than all possible values for
that type, and thus will always appear first (or last when
desc
is used).
As discussed earlier in Group, skew of the values in data is
very common. This affects order
just as it does
group
, causing some reducers to take significantly longer than others. To
address this, Pig balances the output across reducers. It does this by
first sampling the input of the order
statement to get an
estimate of the key distribution. Based on this sample, it then builds a
partitioner that produces a balanced total order (for details on what a
partitioner is, see Shuffle Phase). For
example, suppose you are ordering on a chararray field with the values
a, b, e, e, e, e, e, e, m, q, r, z
,
and you have three reducers. The partitioner in this case would decide
to partition your data such that values a-e
go to reducer 1, e
goes to reducer 2, and m-z
go to reducer 3. Notice that the value
e
can be sent to either reducer 1 or
2. Some records with key e
will be
sent to reducer 1 and some to 2. This allows the partitioner to
distribute the data evenly. In practice, we rarely see variance in
reducer time exceed 10% when using this algorithm.
An important side effect of the way Pig
distributes records to minimize skew is that it breaks the MapReduce
convention that all instances of a given key are sent to the same
partition. If you have other processing that depends on this convention,
do not use Pig’s order
statement to sort data for it.
order
always causes your data
pipeline to go through a reduce phase. This is necessary to collect all
equal records together. Also, Pig adds an additional MapReduce job to
your pipeline to do the sampling. Because this sampling is very
lightweight (it reads only the first record of every block), it
generally takes less than 5% of the total job time.
The distinct
statement is very simple. It
removes duplicate records. It works only on entire records, not on
individual fields:
--distinct.pig -- find a distinct list of ticker symbols for each exchange -- This load will truncate the records, picking up just the first two fields. daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray); uniq = distinct daily;
Because it needs to collect like records
together in order to determine whether they are duplicates,
distinct
forces a reduce phase. It does make use of
the combiner to remove any duplicate records it can delete in the map
phase.
The use of distinct
shown here is equivalent to
select distinct x
in SQL. To learn how to do the equivalent
of select count(distinct x)
, see Nested foreach.
join
is one of the workhorses of data
processing, and it is likely to be in many of your Pig Latin scripts.
join
selects records from one input to put together with
records from another input. This is done by indicating keys for each
input. When those keys are equal,[11] the two rows are joined. Records for which no match is
found are dropped:
--join.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); jnd = join daily by symbol, divs by symbol;
You can also join on multiple keys. In all cases you must have the same number of keys, and they must be of the same or compatible types (where compatible means that an implicit cast can be inserted; see Casts):
-- join2key.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); jnd = join daily by (symbol, date), divs by (symbol, date);
Like foreach
, join
preserves the names of the fields of the inputs passed to it. It also
prepends the name of the relation the field came from, followed by a
::
. Adding describe jnd;
to the end of the previous example produces:
jnd: {daily::exchange: bytearray,daily::symbol: bytearray,daily::date: bytearray, daily::open: bytearray,daily::high: bytearray,daily::low: bytearray, daily::close: bytearray,daily::volume: bytearray,daily::adj_close: bytearray, divs::exchange: bytearray,divs::symbol: bytearray,divs::date: bytearray, divs::dividends: bytearray}
The daily::
prefix needs to be used only when the field name is no longer unique in
the record. In this example, you will need to use daily::date
or divs::date
if you wish to refer to one of the
date
fields after the join. But
fields such as open
and divs
do not need a prefix because there is no
ambiguity.
Pig also supports outer joins. In outer joins,
records that do not have a match on the other side are included, with
null values being filled in for the missing fields. Outer joins can be
left
, right
, or full
. A left
outer join means records from the left side will be included even when
they do not have a match on the right side. Likewise, a right outer
joins means records from the right side will be included even when they
do not have a match on the left side. A full outer join means records
from both sides are taken even when they do not have matches:
--leftjoin.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); jnd = join daily by (symbol, date) left outer, divs by (symbol, date);
outer
is a noise word and can be omitted. Unlike some SQL
implementations, full
is not a noise word. C = join A
by x outer, B by u;
will generate a syntax error, not a full
outer join.
Outer joins are supported only when Pig knows the schema of the data on the side(s) for which it might need to fill in nulls. Thus for left outer joins, it must know the schema of the right side; for right outer joins, it must know the schema of the left side; and for full outer joins, it must know both. This is because, without the schema, Pig will not know how many null values to fill in.[12]
As in SQL, null values for keys do not match anything, even null values from the other input. So, for inner joins, all records with null key values are dropped. For outer joins, they will be retained but will not match any records from the other input.
Pig can also do multiple joins in a single operation, as long as they are all being joined on the same key(s). This can be done only for inner joins:
A = load 'input1' as (x, y); B = load 'input2' as (u, v); C = load 'input3' as (e, f); alpha = join A by x, B by u, C by e;
Self joins are supported, though the data must be loaded twice:
--selfjoin.pig -- For each stock, find all dividends that increased between two dates divs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends); divs2 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends); jnd = join divs1 by symbol, divs2 by symbol; increased = filter jnd by divs1::date < divs2::date and divs1::dividends < divs2::dividends;
If the preceding code were changed to the following, it would fail:
--selfjoin.pig -- For each stock, find all dividends that increased between two dates divs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends); jnd = join divs1 by symbol, divs1 by symbol; increased = filter jnd by divs1::date < divs2::date and divs1::dividends < divs2::dividends;
It seems like this ought to work, since Pig could
split the divs1
data set and send it
to join twice. But the problem is that field names would be ambiguous
after the join, so the load
statement must be written
twice. The next best thing would be for Pig to figure out that these two
load
statements are loading the same input and then run the
load only once, but it does not do that currently.
Pig does these joins in MapReduce by using the
map phase to annotate each record with which input it came from. It then
uses the join key as the shuffle key. Thus join
forces a
new reduce phase. Once all of the records with the same value for the
key are collected together, Pig does a cross product between the records
from both inputs. To minimize memory usage, it has MapReduce order the
records coming into the reducer using the input annotation it added in
the map phase. Thus all of the records for the left input arrive first.
Pig caches these in memory. All of the records for the right input
arrive second. As each of these records arrives, it is crossed with each
record from the left side to produce an output record. In a multiway
join, the left n - 1
inputs are held
in memory, and the nth
is streamed
through. It is important to keep this in mind when writing joins in your
Pig queries if you know that one of your inputs has more records per
value of the chosen key. Placing that input on the right side of your
join will lower memory usage and possibly increase your script’s
performance.
Sometimes you want to see only a limited number of
results. limit
allows you do this:
--limit.pig divs = load 'NYSE_dividends'; first10 = limit divs 10;
The example here will return at most 10 lines
(if your input has less than 10 lines total, it will return them all).
Note that for all operators except order
, Pig does not
guarantee the order in which records are produced. Thus, because
NYSE_dividends has more than 10
records, the example script could return different results every time.
Putting an order
immediately before
the limit
will guarantee that the same results are returned
every time.
limit
causes an additional
reduce phase, since it needs to collect the records together to count
how many it is returning. It does optimize this phase by limiting the
output of each map and then applying the limit again in the reducer. In
the case where limit
is combined with order
,
the two are done together on the map and reduce. That is, on the map
side, the records are sorted by MapReduce and the limit applied in the
combiner. They are sorted again by MapReduce as part of the shuffle, and
Pig applies the limit again in the reducer.
One possible optimization that Pig does not do
is terminate reading of the input early once it has reached the number
of records specified by limit
. So, in the example, if you
hoped to use this to read just a tiny slice of your input, you will be
disappointed. Pig will still read it all.
sample
offers a simple way to get a
sample of your data. It reads through all of your data but returns only
a percentage of rows. What percentage it returns is expressed as a
double value, between 0 and 1. So, in the following example, 0.1
indicates 10%:
--sample.pig divs = load 'NYSE_dividends'; some = sample divs 0.1;
Currently the sampling algorithm is very simple.
The sample A by 0.1
is rewritten to filter A by
random() <= 0.1
. Obviously this is nondeterministic, so
results of a script with sample
will vary with every run.
Also, the percentage will not be an exact match, but close. There has
been discussion about adding more sophisticated sampling techniques, but
it has not been done yet.
One of Pig’s core claims is that it provides a
language for parallel data processing. One of the tenets of Pig’s
philosophy is that Pigs are domestic animals (see Pig Philosophy), so Pig prefers that you tell it how parallel
to be. To do this, it provides the parallel
clause.
The parallel
clause can be attached
to any relational operator in Pig Latin. However, it controls only
reduce-side parallelism, so it makes sense only for operators that force
a reduce phase. These are: group
*, order
, distinct
, join
*, limit
, cogroup
*, and
cross
. Operators marked with an asterisk have
multiple implementations, some of which force a reduce and some which do
not. For details on this and on operators not covered in this chapter,
see Chapter 6. parallel
is
ignored in local mode because all operations happen serially in local
mode:
--parallel.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); bysymbl = group daily by symbol parallel 10;
In this example, parallel
will
cause the MapReduce job spawned by Pig to have 10 reducers.
parallel
clauses apply only to the statement to which they
are attached; they do not carry through the script. So if this
group
were followed by an order
,
parallel
would need to be set for that order
separately. Most likely the group
will reduce your data
size significantly and you will want to change the parallelism:
--parallel.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); bysymbl = group daily by symbol parallel 10; average = foreach bysymbl generate group, AVG(daily.close) as avg; sorted = order average by avg desc parallel 2;
If, however, you do not want to set
parallel
separately for every reduce-invoking
operator in your script, you can set a script-wide value using the
set
command:
--defaultparallel.pig set default_parallel 10; daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); bysymbl = group daily by symbol; average = foreach bysymbl generate group, AVG(daily.close) as avg; sorted = order average by avg desc;
In this script, all MapReduce jobs will be done
with 10 reduces. When you set a default parallel level, you can still
add a parallel
clause to any statement to override the
default value. Thus it can be helpful to set a default value as a base
to use in most cases, and specifically add a parallel
clause only when you have an operator that needs a different
value.
All of this is rather static, however. What happens if you run the same script across different inputs that have different characteristics? Or what if your input data varies significantly sometimes? You do not want to have to edit your script each time. Using parameter substitution, you can write your parallel clauses with variables, providing values for those variables at runtime. See Parameter Substitution for details.
So far we have assumed that you know what your parallel value should be. See Select the Right Level of Parallelism for information on how to determine that.
Finally, what happens if you do not specify a parallel level? Before version 0.8, Pig lets MapReduce set the parallelism in that case. The MapReduce default parallelism is controlled by your cluster configuration. The installation default value is one, and most people do not change that. This most likely means that you will be running with only one reducer. This is rarely what you want.
To avoid this situation, Pig added a heuristic in 0.8 to do a gross estimate of what the parallelism should be set to if it is not set. It looks at the initial input size, assumes there will be no data size changes, and then allocates a reducer for every 1G of data. It must be emphasized that this is not a good algorithm. It is provided only to prevent mistakes that result in scripts running very slowly, and, in some extreme cases, mistakes that cause MapReduce itself to have problems. This is a safety net, not an optimizer.
Much of the power of Pig lies in its ability to let users combine irs operators with their own or others’ code via UDFs. Up through version 0.7, all UDFs must be written in Java and are implemented as Java classes.[13] This makes it very easy to add new UDFs to Pig by writing a Java class and telling Pig about your JAR file.
As of version 0.8, UDFs can also be written in Python. Pig uses Jython to execute Python UDFs, so they must be compatible with Python 2.5 and cannot use Python 3 features.
Pig itself comes packaged with some UDFs. Prior to version 0.8, this was a very limited set, including only the standard SQL aggregate functions and a few others. In 0.8, a large number of standard string-processing, math, and complex-type UDFs were added. For a complete list and description of built-in UDFs, see Built-in UDFs.
Piggybank is a collection of user-contributed UDFs that is packaged and released along with Pig. Piggybank UDFs are not included in the Pig JAR, and thus you have to register them manually in your script. See Piggybank for more information.
Of course you can also write your own UDFs or use those written by other users. For details of how to write your own, see Chapter 10. Finally, you can use some static Java functions as UDFs as well.
When you use a UDF that is not already built into Pig, you
have to tell Pig where to look for that UDF. This is done via the
register
command. For example, let’s
say you want to use the Reverse
UDF provided in
Piggybank (for information on where to find the Piggybank JAR, see Piggybank):
--register.pig
register 'your_path_to_piggybank
/piggybank.jar';
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate
org.apache.pig.piggybank.evaluation.string.Reverse(symbol);
This example tells Pig that it needs to include
code from your_path_to_piggybank
/piggybank.jar
when it produces a JAR to send to Hadoop. Pig opens all of the
registered JARs, takes out the files, and places them in the JAR that it
sends to Hadoop to run your jobs.
In this example, we have to give Pig the full
package and class name of the UDF. This verbosity can be alleviated in
two ways. The first option is to use the define
command (see define and UDFs). The second option is to include a set of
paths on the command line for Pig
to search when looking for UDFs. So if instead of invoking Pig as
pig register.pig
we change our invocation to
pig
-Dudf.import.list=org.apache.pig.piggybank.evaluation.string
register.pig
, we can change our script to:
register 'your_path_to_piggybank
/piggybank.jar';
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate Reverse(symbol);
Using yet another property, we can get rid of
the register command as well. If we add
-Dpig.additional.jars=/usr/local/pig/piggybank/piggybank.jar
to our command line, the register command is no longer necessary.
In many cases it is better to deal with
registration and definition issues explicitly in the script via the
register
and define
commands than use these
properties. Otherwise, everyone who runs your script has to know how to
configure the command line. However, in some situations your scripts
will always use the same set of JARs and always look in the same places
for them. For instance, you might have a set of JARs used by everyone in
your company. In this case, placing these properties in a shared
properties file and using that with your Pig scripts will make sharing
those UDFs easier and assure that everyone is using the correct versions
of them.
In 0.8 and later versions, the register
command can also take HDFS paths. If your JARs are stored in HDFS, you
could then say register 'hdfs://user/jar/acme.jar';
.
Starting in 0.9, register
accepts
globs. So if all of the JARs you need are stored in one directory, you
could include them all with register
'/usr/local/share/pig/udfs/*.jar'
.
register
is also used to locate resources
for Python UDFs that you use in your Pig Latin scripts. In this case
you do not register a JAR, but rather a Python script that contains
your UDF. The Python script must be in your current directory. Using
the examples provided in the example code, copying udfs/python/production.py to the data directory looks like this:
--batting_production.pig register 'production.py' using jython as bballudfs; players = load 'baseball' as (name:chararray, team:chararray, pos:bag{t:(p:chararray)}, bat:map[]); nonnull = filter players by bat#'slugging_percentage' is not null and bat#'on_base_percentage' is not null; calcprod = foreach nonnull generate name, bballudfs.production( (float)bat#'slugging_percentage', (float)bat#'on_base_percentage');
The important differences here are the
using jython
and as bballudfs
portions of
the register
statement. using jython
tells Pig that this UDF is written in Python, not Java, and it should
use Jython to compile that UDF. Pig does not know where on
your system the Jython interpreter is, so you must include jython.jar in your
classpath when invoking Pig. This can be done by setting the
PIG_CLASSPATH
environment variable.
as bballudfs
defines a namespace that UDFs from this file are placed in. All
UDFs from this file must now be invoked as
bballudfs.
. Each Python
file you load should be given a separate namespace. This avoids naming
collisions when you register two Python scripts with duplicate
function names.udfname
One caveat: Pig does not trace dependencies inside your Python scripts and send the
needed Python modules to your Hadoop cluster. You are required to make
sure the modules you need reside on the task nodes in your cluster and
that the PYTHONPATH
environment variable is set on
those nodes such that your UDFs will be able to find them for import.
This issue has been fixed after 0.9, but as of this writing is not yet
released.
As was alluded to earlier, define
can be used to provide an
alias so that you do not have to use full package names
for your Java UDFs. It can also be used to provide constructor
arguments to your UDFs. define
also is used in defining
streaming commands, but this section covers only its UDF-related
features. For information on using define
with streaming,
see stream. The following provides an example of
using define
to provide an alias for
org.apache.pig.piggybank.evaluation.string.Reverse
:
--define.pig
register 'your_path_to_piggybank
/piggybank.jar';
define reverse org.apache.pig.piggybank.evaluation.string.Reverse();
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
date:chararray, dividends:float);
backwards = foreach divs generate reverse(symbol);
Eval and filter functions can also take one or more strings as
constructor arguments. If you are using a UDF that takes constructor
arguments, define
is the place to provide those arguments.
For example, consider a method CurrencyConverter
that takes two constructor arguments, the first indicating which
currency you are converting from and the second which currency you are
converting to:
--define_constructor_args.pig register 'acme.jar'; define convert com.acme.financial.CurrencyConverter('dollar', 'euro'); divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); backwards = foreach divs generate convert(dividends);
Java has a rich collection of utilities and libraries. Because Pig is implemented in Java, some of these functions can be exposed to Pig users. Starting in version 0.8, Pig offers invoker methods that allow you to treat certain static Java functions as if they were Pig UDFs.
Any public static Java function that takes no
arguments or some combination of int
, long
,
float
, double
, String
, or arrays
thereof,[14] and returns int
, long
,
float
, double
, or String
can be
invoked in this way.
Because Pig Latin does not support overloading on return types, there is an invoker for each
return type: InvokeForInt
,
InvokeForLong
,
InvokeForFloat
,
InvokeForDouble
, and
InvokeForString
. You must pick the appropriate
invoker for the type you wish to return. This method takes two
constructor arguments. The first is the full package, classname, and
method name. The second is a space-separated list of parameters the Java
function expects. Only the types of the parameters are given. If the
parameter is an array, []
(square
brackets) are appended to the type name. If the method takes no
parameters, the second constructor argument is omitted.
For example, if you wanted to use Java’s
Integer
class to translate decimal values to
hexadecimal values, you could do:
--invoker.pig define hex InvokeForString('java.lang.Integer.toHexString', 'int'); divs = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); nonnull = filter divs by volume is not null; inhex = foreach nonnull generate symbol, hex((int)volume);
If your method takes an array of types, Pig will
expect to pass it a bag where each tuple
has a single field of that type. So if you had a Java method
com.yourcompany.Stats.stdev
that took an array of
doubles, you could use it like this:
define stdev InvokeForDouble('com.acme.Stats.stdev', 'double[]'); A = load 'input' as (id: int, dp:double); B = group A by id; C = foreach B generate group, stdev(A.dp);
Warning
Invokers do not use the
Accumulator
or Algebraic
interfaces, and are thus likely to be much slower and to use much more
memory than UDFs written specifically for Pig. This means that before
you pass an array argument to an invoked method, you should think
carefully about whether those inefficiencies are acceptable. For more
information on these interfaces, see Accumulator Interface and Algebraic Interface.
Invoking Java functions in this way does have a small cost because reflection is used to find and invoke the methods.
Invoker functions throw Java an
IllegalArgumentException
when they are passed
null input. You should place a filter before the invocation to prevent
this.
[6] Any loader that uses FileInputFormat
as
its InputFormat will support globs. Most loaders that load data from
HDFS use this InputFormat.
[7] A single function can be both a load and store function, as
PigStorage
is.
[8] You might object and say that Pig could figure out what is
intended here and do it, since SUM(A.y +
A.z)
could be decomposed to “foreach record in A,
add y and z and then take the sum.” This is true. But when
we change the group to a cogroup so that there are two bags
A
and B
involved (see cogroup) and change the sum to SUM(A.y + B.z)
, because neither A nor B
guarantee any ordering, this is not a well-defined operation. In
designing the language, we thought it better to be consistent and
always say that bags could not be added rather than allow it in
some instances and not others.
[9] Thus the keyword group
is overloaded in Pig
Latin. This is unfortunate and confusing, but also hard to change
now.
[10] In my experience, the vast majority of data tracking human activity follows a power law distribution.
[11] Actually, joins can be on any condition, not just equality, but Pig only supports joins on equality (called equi-joins). See cross for information on how to do non-equi-joins in Pig.
[12] You may object that Pig could determine this by looking at other records in the join and inferring the correct number of fields. However, this does not work for two reasons. First, when no schema is present, Pig does not enforce a semantic that every record has the same schema. So, assuming Pig can infer one record from another is not valid. Second, there might be no records in the join that match, and thus Pig might have no record to infer from.
[13] This is why UDF names are case-sensitive in Pig.
[14] For int, long, float, and double, invoker methods can call
Java functions that take the scalar types but not the associated
Java classes (so int
but not Integer
,
etc.).
Get Programming Pig now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.