Tuesday, July 31, 2012

Simple mapreduce with python (curse you dos2unix!!)

So after several days of messing around with Cloudera's training VM, I finally got my hands a real 6 node cluster.

Our team has been discussing how we want to handle programming in this environment.  This is due to the fact that while we all have a programming background, none of us has ever worked with Java.  So for the benefit of getting this project up and running as quickly as possible, we decided to pursue the severely under-documented streaming python method.

So to get started I reviewed a great blog by Michael Gnoll who goes over the basics:  http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

Now in our environment, Hadoop will be consuming vast amount of serialized or delimited data.  So we're fortunate enough to have some "structure" on the input.  This also means we can easily deserialize on the map phase.  So for my first test I wanted to take a simple pipe (|) delimited file and get a count of one of the column values in this case the 6th column.

So I first constructed my super simple mapper.py and reducer.py files:

Mapper.py

#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split('|')
#we only care about the 6th column
word = words[5]
# increase counters
print '%s\t%s' % (word, 1)
view raw Mapper.ph hosted with ❤ by GitHub
Reducer.py

#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
view raw Reducer.py hosted with ❤ by GitHub

So I loaded my test data up to HDFS (~ 50 gigs worth outta do it).  I then loaded my python up to my local directory.  I ran a quick test file (same file layout obviously, but fewer records) to ensure my python worked.
cat test.txt | mapper.py | reducer.py
view raw gistfile1.sh hosted with ❤ by GitHub

This returned some data, so I was ready to go.

So I ran the following command to initiate a stream mapreduce:
/usr/lib/hadoop/bin/hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.0.1.jar -file /home/bgriffith/character/mapper.py -mapper mapper.py -file /home/bgriffith/character/reducer.py -reducer reducer.py -input /tmp/test/character/input/* -output /tmp/test/character/output/
view raw commandline hosted with ❤ by GitHub

And..... FAILURE.  sigh

Yup... not very helpful.  However, after some digging around, I discovered what the issue was.

dos2unix

/facepalm

Gotta love mixed environments!  Anyway after running simple "dos2unix mapper.py" and "dos2unix reducer.py", I reran the execution code and:


Success!  While the performance wasn't incredibly awesome (~18 minutes to process 458,848,279 rows, compared to 5 minutes in SQL Server), its a start.  Going to start messing around with more advanced python methods and see what we can do.

No comments:

Post a Comment