Extending Apache Pig with Python UDFs
Introduction
Apache Pig is a popular system for executing complex Hadoop map-reduce based data-flows. It adds a layer of abstraction on top of Hadoop's map-reduce mechanisms in order to allow developers to take a high-level view of the data and operations on that data. Pig allows you to do things more explicitly. For example, you can join two or more data sources (much like an SQL join). Writing a join as a map and reduce function is a bit of a drag and it's usually worth avoiding. So Pig is great because it simplifies complex tasks - it provides a high-level scripting language that allows users to take more of a big-picture view of their data flow.
Pig is especially great because it is extensible. This tutorial will focus on its extensibility. By the end of this tutorial, you will be able to write PigLatin scripts that execute Python code as a part of a larger map-reduce workflow. Pig can be extended with other languages too, but for now we'll stick to Python.
Before we continue
This tutorial relies on a bunch of knowledge. It'll be very useful if you know a little Python and PigLatin. It'll also be useful to know a bit about how map-reduce works in the context of Hadoop.
User Defined Functions (UDFs)
A Pig UDF is a function that is accessible to Pig, but written in a language that isn't PigLatin. Pig allows you to register UDFs for use within a PigLatin script. A UDF needs to fit a specific prototype - you can't just write your function however you want because then Pig won't know how to call your function, it won't know what kinds of arguments it needs, and it won't know what kind of return value to expect. There are a couple of basic UDF types:
Eval UDFs
This is the most common type of UDF. It's used in FOREACH
type statements. Here's an example of an eval function in action:
users = LOAD 'user_data' AS (name: chararray);
upper_users = FOREACH users GENERATE my_udfs.to_upper_case(name);
This code is fairly simple - Pig doesn't really do string processing so we introduce a UDF that does. There are some missing pieces that I'll get to later, specifically how Pig knows what my_udfs
means and suchlike.
Aggregation UDFs
These are just a special case of an eval UDF. An Aggregate function is usually applied to grouped data. For example:
user_sales = LOAD 'user_sales' AS (name: chararray, price: float);
grouped_sales = GROUP user_sales BY name;
number_of_sales = FOREACH grouped_sales GENERATE group, COUNT(user_sales);
In other words, an aggregate UDF is a udf that is used to combine multiple pieces of information. Here we are aggregating sales data to show how many purchases were made by each user.
Filter UDFs
A filter UDF returns a boolean value. If you have a data source that has a bunch of rows and only a portion of those rows are useful for the current analysis then a filter function of some kind would be useful. An example of a filter function is action follows:
user_messages = LOAD 'user_twits' AS (name:chararray, message:chararray);
rude_messages = FILTER user_messages by my_udfs.contains_naughty_words(message);
Enough talk, let's code
In this section we'll be writing a couple of Python UDFs and making them accessible within PigLatin scripts.
Here's about the simplest Python UDF you can write:
from pig_util import outputSchema
@outputSchema('word:chararray')
def hi_world():
return "hello world"
The data output from a function has a specific form. Pig likes it if you specify the schema of the data because then it knows what it can do with that data. That's what the output_schema
decorator is for. There are a bunch of different ways to specify a schema, we'll get to that in a little bit.
Now if that were saved in a file called "my_udfs.py" you would be able to make use of it in a PigLatin script like so:
-- first register it to make it available
REGISTER 'myudf.py' using jython as my_special_udfs
users = LOAD 'user_data' AS (name: chararray);
hello_users = FOREACH users GENERATE name, my_special_udfs.hi_world();
Specifying the UDF output schema
Now a UDF has input and output. This little section is all about the outputs. Here we'll go over the different ways you can specify the output format of a Python UDF through use of the outputSchema
decorator. We have a few options, here they are:
# our original udf
# it returns a single chararray (that's PigLatin for String)
@outputSchema('word:chararray')
def hi_world():
return "hello world"
# this one returns a Python tuple. Pig recognises the first element
# of the tuple as a chararray like before, and the next one as a
# long (a kind of integer)
@outputSchema("word:chararray,number:long")
def hi_everyone():
return "hi there", 15
#we can use outputSchema to define nested schemas too, here is a bag of tuples
@outputSchema('some_bag:bag{t:(field_1:chararray, field_2:int)}')
def bag_udf():
return [
('hi',1000),
('there',2000),
('bill',0)
]
#and here is a map
@outputSchema('something_nice:map[]')
def my_map_maker():
return {"a":"b", "c":"d", "e","f"}
So outputSchema
can be used to imply that a function outputs one or a combination of basic types. Those types are:
- chararray: like a string
- bytearray: a bunch of bytes in a row. Like a string but not as human friendly
- long: long integer
- int: normal integer
- double: floating point number
- datetime
- boolean
If no schema is specified then Pig assumes that the UDF outputs a bytearray.
UDF arguments
Not only does a UDF have outputs but inputs as well! This sentence should be filed under 'dah'. I reserved it for a separate section so as not to clutter the discussion on output schemas. This part is fairly straight-forward so I'm just going to breeze through it...
First some UDFs:
def deal_with_a_string(s1):
return s1 + " for the win!"
def deal_with_two_strings(s1,s2):
return s1 + " " + s2
def square_a_number(i):
return i*i
def now_for_a_bag(lBag):
lOut = []
for i,l in enumerate(lBag):
lNew = [i,] + l
lOut.append(lNew)
return lOut
And here we make use of those UDFs in a PigLatin script:
REGISTER 'myudf.py' using jython as myudfs
users = LOAD 'user_data' AS (firstname: chararray, lastname:chararray,some_integer:int);
winning_users = FOREACH users GENERATE myudfs.deal_with_a_string(firstname);
full_names = FOREACH users GENERATE myudfs.deal_with_two_strings(firstname,lastname);
squared_integers = FOREACH users GENERATE myudfs.square_a_number(some_integer);
users_by_number = GROUP users by some_integer;
indexed_users_by_number = FOREACH users_by_number GENERATE group,myudfs.now_for_a_bag(users);
Beyond Standard Python UDFs
There are a couple of gotchas to using Python in the form of a UDF. Firstly, even though we are writing our UDFs in Python, Pig executes them in Jython. Jython is an implementation of Python that runs on the Java Virtual Machine (JVM). Most of the time this is not an issue as Jython strives to implement all of the same features of CPython but there are some libraries that it doesn't allow. For example you can't use numpy
from Jython.
Besides that, Pig doesn't really allow for Python Filter UDFs. You can only do stuff like this:
user_messages = LOAD 'user_twits' AS (name:chararray, message:chararray);
--add a field that says whether it is naughty (1) or not (0)
messages_with_rudeness = FOREACH user_messages GENERATE name,message,contains_naughty_words(message) as naughty;
--then filter by the naughty field
filtered_messages = FILTER messages_with_rudeness by (naughty==1);
-- and finally strip away the naughty field
rude_messages = FOREACH filtered_messages GENERATE name,message;
Python Streaming UDFs
Pig allows you to hook into the Hadoop Streaming API, this allows us to get around the Jython issue when we need to. If you haven't heard of Hadoop Streaming before, here is the low down: Hadoop allows you to write mappers and reducers in any language that gives you access to stdin and stdout. So that's pretty much any language you want. Like Python 3 or even Cow. Since this is a Python tutorial the examples that follow will all be in Python but you can plug in whatever you want.
Here's a simple Python streaming script, lets call it simple_stream.py
:
#! /usr/bin/env python
import sys
import string
for line in sys.stdin:
if len(line) == 0: continue
l = line.split() #split the line by whitespace
for i,s in enumerate(l):
print "{key}\t{value}\n".format(key=i,value=s) # give out a key value pair for each word in the line
The aim is to get Hadoop to run the script on each node. That means that the hash bang line (#!
) needs to be valid on every node, all the import statements must be valid on every node (any packages imported must be installed on each node); and any other system level files or resources accessed within the Python script must be accessible in the same way on every node.
Ok, onto the Pig stuff...
To make the streaming UDF accessible to Pig we make use of the define
statement. You can read all about it here
Here is how we can use it with our simple_stream script:
DEFINE stream_alias 'simple_stream.py' SHIP('simple_stream.py');
user_messages = LOAD 'user_twits' AS (name:chararray, message:chararray);
just_messages = FOREACH user_messages generate message;
streamed = STREAM just_messages THROUGH stream_alias;
DUMP streamed;
Lets look at that that DEFINE
statement a little closer. The general format we are using is:
DEFINE alias 'command' SHIP('files');
The alias is the name we use to access our streaming function from within our PigLatin script. The command is the system command Pig will call when it needs to use our streaming function. And finally SHIP
tells Pig which files and dependencies Pig needs to distribute to the Hadoop nodes for the command to be able to work.
Then once we have the resources we want to pass though the our streaming function we just use the STREAM
command as above.
And that's it
Well, sort of. PigLatin is quite a big thing, this tutorial just barely scraped the surface of its capabilities. If all the LOADing and FOREACHing and suchlike didn't make sense to you the I would suggest checking out a more introductory PigLatin tutorial before coming back here. This tutorial should be enough to get you started in using Python from within Pig jobs.
Python is also quite a big thing. Understanding the Python import system is really worthwhile if you want to use Python on a Hadoop cluster. It's also worthwhile understanding some little details like how Python decorators work.
There are also some more technical ways of calling Python from Pig, this tutorial aimed to be an introduction to UDFs, not a definitive guide. For more examples and more in-depth discussions of the different decorators and suchlike that Pig makes available to Jython based UDFs I would suggest taking a look at Pig's official documentation.
Another topic only touched on briefly was Hadoop Streaming, this in itself is a powerful technology but actually pretty easy to use once you get started. I've made use of the Streaming API many times without needing anything as complicated as PigLatin - it's worthwhile being able to use that API as a standalone thing.
~$ python udf.py
Traceback (most recent call last):
File “udf.py”, line 1, in <module>
from pig_util import outputSchema
ImportError: No module named pig_util
grunt> B = FOREACH A GENERATE name, narendra.hi_world();
2019-04-24 14:07:49,143 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1070: Could not resolve narendra.hi_world using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Details at logfile: /home/narendra/pig_1556092883657.log