Our team has been discussing how we want to handle programming in this environment. This is due to the fact that while we all have a programming background, none of us has ever worked with Java. So for the benefit of getting this project up and running as quickly as possible, we decided to pursue the severely under-documented streaming python method.
So to get started I reviewed a great blog by Michael Gnoll who goes over the basics: http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
Now in our environment, Hadoop will be consuming vast amount of serialized or delimited data. So we're fortunate enough to have some "structure" on the input. This also means we can easily deserialize on the map phase. So for my first test I wanted to take a simple pipe (|) delimited file and get a count of one of the column values in this case the 6th column.
So I first constructed my super simple mapper.py and reducer.py files:
Mapper.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
# input comes from STDIN (standard input) | |
for line in sys.stdin: | |
# remove leading and trailing whitespace | |
line = line.strip() | |
# split the line into words | |
words = line.split('|') | |
#we only care about the 6th column | |
word = words[5] | |
# increase counters | |
print '%s\t%s' % (word, 1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from operator import itemgetter | |
import sys | |
current_word = None | |
current_count = 0 | |
word = None | |
# input comes from STDIN | |
for line in sys.stdin: | |
# remove leading and trailing whitespace | |
line = line.strip() | |
# parse the input we got from mapper.py | |
word, count = line.split('\t', 1) | |
# convert count (currently a string) to int | |
try: | |
count = int(count) | |
except ValueError: | |
# count was not a number, so silently | |
# ignore/discard this line | |
continue | |
# this IF-switch only works because Hadoop sorts map output | |
# by key (here: word) before it is passed to the reducer | |
if current_word == word: | |
current_count += count | |
else: | |
if current_word: | |
# write result to STDOUT | |
print '%s\t%s' % (current_word, current_count) | |
current_count = count | |
current_word = word | |
# do not forget to output the last word if needed! | |
if current_word == word: | |
print '%s\t%s' % (current_word, current_count) |
So I loaded my test data up to HDFS (~ 50 gigs worth outta do it). I then loaded my python up to my local directory. I ran a quick test file (same file layout obviously, but fewer records) to ensure my python worked.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cat test.txt | mapper.py | reducer.py |
This returned some data, so I was ready to go.
So I ran the following command to initiate a stream mapreduce:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/usr/lib/hadoop/bin/hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.1.jar -file /home/bgriffith/character/mapper.py -mapper mapper.py -file /home/bgriffith/character/reducer.py -reducer reducer.py -input /tmp/test/character/input/* -output /tmp/test/character/output/ |
And..... FAILURE. sigh
Yup... not very helpful. However, after some digging around, I discovered what the issue was.
dos2unix
/facepalm
Gotta love mixed environments! Anyway after running simple "dos2unix mapper.py" and "dos2unix reducer.py", I reran the execution code and:
Success! While the performance wasn't incredibly awesome (~18 minutes to process 458,848,279 rows, compared to 5 minutes in SQL Server), its a start. Going to start messing around with more advanced python methods and see what we can do.