Lately I've been not only learning more python but also learning about the MapReduce algorithm. Naturally I started with the many freely available web resources, from brief overviews to instructive video tutorials to detailed Google Research articles on MapReduce.
This is all well and fine but I wanted to know how to actually write MapReduce applications in python, and to obtain a better understanding of the magic behind the algorithm itself. I found a small python multi-server implementation of MapReduce named mincemeat, another called octopy, as well as interfaces to large non-python systems such as Hadoop. However I was still unable to find a quick and dirty implementation of MapReduce that was high-level, concise, easy to run, easy to understand, and relevant. So I wrote one.
To fulfill my requirement of high-level, I wanted to actually use python-native lists, tuples, queues, and threading. I didn't want to get stuck with low-level socket communication and endless writing and parsing of intermediate flat files. To fulfill my requirement of concise, I wanted the whole MapReduce algorithm to be implemented in a single python class of 100 lines of code or less, and we're talking properly well-formatted python code, not a series of Perl one-liners. To fulfill my requirement that it be easy to run, I should be able to put the whole thing in one python file and call it from the command line with a python interpreter. It should only use the python 2.6+ standard library so there is nothing you should have to install if you already have python. Nor should you need to setup a multi-server environment, multiple servers on a network, you should be able to run it on a single machine while still observing multiple simultaneous map and reduce work tasks. To fulfill my requirement that it be easy to understand, it should accurately and clearly implement the major steps of MapReduce, with comments where necessary. One should be able to insert print statements of the intermediate steps at any point in the computation and see exactly what is going on. To fulfill my requirement of relevant code, not just an academic exercise, I wanted actual python source code examples running the MapReduce class on real data.
That being said, it is important to know what this implementation of MapReduce in python is not. It is not web-scale without significant changes - in 100 lines of code there is only so much one can do. It does not implement some significant reliability provisions Google uses such as distributed multiple-copy data and monitored task restarts. It does not handle mapper-reducer pairing and partitioning for grouped calculations. It ignores the important principle of locality to dispatch particular workers to machines where applicable data already resides. It bypasses the key efficiency step of combiners to aid in the speed of reducing tasks. It loads the data directly from the internet or local disk and stores intermediate results in memory instead of using a distributed filesystem like GFS or HDFS. It doesn't support differing numbers of parse, map, merge, sort, reduce, and output functions with varying resource allocations to each. And last but not least, it bases its multiple worker implementation on python threading with the Thread and synchronized Queue coordinating classes. This means that while in theory there is nothing to prevent an interface-identical implementation of these python standard classes which are distributed across thousands of machines, as written the code runs all the workers only on a single machine.
But enough talk. It's time to show the goods. Let's see some code.
import threading
import Queue
import operator
import urllib
import re
class MapReduce:
''' MapReduce - to use, subclass by defining these functions,
then call self.map_reduce():
parse_fn(self, k, v) => [(k, v), ...]
map_fn(self, k, v) => [(k, v1), (k, v2), ...]
reduce_fn(self, k, [v1, v2, ...]) => [(k, v)]
output_fn(self, [(k, v), ...])
'''
def __init__(self):
self.data = None
self.num_worker_threads = 5
class SynchronizedDict(): # we need this for merging
def __init__(self):
self.lock = threading.Lock()
self.d = {}
def isin(self, k):
with self.lock:
if k in self.d:
return True
else:
return False
def get(self, k):
with self.lock:
return self.d[k]
def set(self, k, v): # we don't need del
with self.lock:
self.d[k] = v
def set_append(self, k, v): # for thread-safe list append
with self.lock:
self.d[k].append(v)
def items(self):
with self.lock:
return self.d.items()
def create_queue(self, input_list): # helper fn for queues
output_queue = Queue.Queue()
for value in input_list:
output_queue.put(value)
return output_queue
def create_list(self, input_queue): # helper fn for queues
output_list = []
while not input_queue.empty():
item = input_queue.get()
output_list.append(item)
input_queue.task_done()
return output_list
def merge_fn(self, k, v, merge_dict): # helper fn for merge
if merge_dict.isin(k):
merge_dict.set_append(k, v)
else:
merge_dict.set(k, [v])
def process_queue(self, input_queue, fn_selector): # helper fn
output_queue = Queue.Queue()
if fn_selector == 'merge':
merge_dict = self.SynchronizedDict()
def worker():
while not input_queue.empty():
(k, v) = input_queue.get()
if fn_selector in ['map', 'reduce']:
if fn_selector == 'map':
result_list = self.map_fn(k, v)
elif fn_selector == 'reduce':
result_list = self.reduce_fn(k, v)
for result_tuple in result_list: # flatten
output_queue.put(result_tuple)
elif fn_selector == 'merge': # merge v to same k
self.merge_fn(k, v, merge_dict)
else:
raise Exception, "Bad fn_selector="+fn_selector
input_queue.task_done()
for i in range(self.num_worker_threads): # start threads
worker_thread = threading.Thread(target=worker)
worker_thread.daemon = True
worker_thread.start()
input_queue.join() # wait for worker threads to finish
if fn_selector == 'merge':
output_list = sorted(merge_dict.items(), key=operator.itemgetter(0))
output_queue = self.create_queue(output_list)
return output_queue
def map_reduce(self): # the actual map-reduce algoritm
data_list = self.parse_fn(self.data)
data_queue = self.create_queue(data_list) # enqueue the data so we can multi-process
map_queue = self.process_queue(data_queue, 'map') # [(k,v),...] => [(k,v1),(k,v2),...]
merge_queue = self.process_queue(map_queue, 'merge') # [(k,v1),(k,v2),...] => [(k,[v1,v2,...]),...]
reduce_queue = self.process_queue(merge_queue, 'reduce') # [(k,[v1,v2,...]),...] => [(k,v),...]
output_list = self.create_list(reduce_queue) # deque into list for output handling
self.output_fn(output_list)
Well, there you have it. Google's MapReduce, with all my caveats of course, in less than 100 lines of python code, 98 lines to be precise which includes imports and spacing lines. Of course just the class itself isn't very useful without some examples to see how it works, so I've included a WordCount example below:
class WordCount(MapReduce):
def __init__(self):
MapReduce.__init__(self)
self.min_count = 1
def parse_fn(self, data): # break string into [(k, v), ...] tuples for each line
data_list = map(lambda line: (None, line), data.splitlines())
return data_list
def map_fn(self, key, str): # return (word, 1) tuples for each word, ignore key
word_list = []
for word in re.split(r'\W+', str.lower()):
bare_word = re.sub(r"[^A-Za-z0-9]*", r"", word);
if len(bare_word) > 0:
word_list.append((bare_word, 1))
return word_list
def reduce_fn(self, word, count_list): # just sum the counts
return [(word, sum(count_list))]
def output_fn(self, output_list): # just print the resulting list
print "Word".ljust(15), "Count".rjust(5)
print "______________".ljust(15), "_____".rjust(5)
sorted_list = sorted(output_list, key=operator.itemgetter(1), reverse=True)
for (word, count) in sorted_list:
if count > self.min_count:
print word.ljust(15), repr(count).rjust(5)
print
def test_with_monty(self):
self.data = """The Meaning of Life is:
try and be nice to people,
avoid eating fat,
read a good book every now and then,
get some walking in,
and try and live together in peace and harmony
with people of all creeds and nations."""
self.map_reduce()
def test_with_nietzsche(self):
self.min_count = 700
f = urllib.urlopen("http://www.gutenberg.org/cache/epub/7205/pg7205.txt")
self.data = f.read()
f.close()
self.map_reduce()
Another one which will help you understand better how to use map keys throughout the computation is DistributedGrep, which really has a different feel from WordCount, which I've included below:
class DistributedGrep(MapReduce):
def __init__(self):
MapReduce.__init__(self)
self.matcher = None
def parse_fn(self, data): # one list item per line with line number
data_list = []
line_num = 1
for line in data.splitlines():
data_list.append((line_num, line))
line_num = line_num + 1
return data_list
def map_fn(self, line_num, line): # return line if matches, include line num
matcher = self.matcher
matched_line = []
if matcher.match(line):
matched_line = [(line_num, line)]
return matched_line
def reduce_fn(self, line_num, line_list): # identity reducer
return [(line_num, line_list[0])] # we only ever have one line in the list
def output_fn(self, output_list): # just print the resulting list
print "LineNum".rjust(8), "Line".ljust(70)
print "_______".rjust(8), "____"
for (line_num, line) in sorted(output_list, key=operator.itemgetter(0)):
print repr(line_num).rjust(8), line.ljust(70)
print
def test_with_nietzsche(self):
self.matcher = re.compile(r".*Jahre.*")
f = urllib.urlopen("http://www.gutenberg.org/cache/epub/7205/pg7205.txt")
self.data = f.read()
f.close()
self.map_reduce()
Of course none of this is useful if you can't actually run the code, so here's a main function below with the two examples classes run in test execution:
def main():
wc = WordCount()
wc.test_with_monty()
wc.test_with_nietzsche()
dg = DistributedGrep()
dg.test_with_nietzsche()
if __name__ == "__main__":
main()
You can paste all the following code snippets above into one file and run that in any python 2.6+ interpreter, and it should output results something like the following:
$ python map_reduce.py
Word Count
______________ _____
and 6
in 2
of 2
people 2
try 2
Word Count
______________ _____
und 3992
der 2022
ich 1714
die 1459
ist 1179
das 1103
nicht 985
zu 947
es 872
aber 857
du 856
er 854
sie 786
ihr 769
den 751
ein 746
LineNum Line
_______ ____
168 Zehn Jahre kamst du hier herauf zu meiner Höhle: du würdest deines
209 Nicht fremd ist mir dieser Wanderer: vor manchen Jahre gieng er her
3198 Also vergiengen dem Einsamen Monde und Jahre; seine Weisheit aber
4585 und unverändert durch die Jahre.
9285 von grossem Jahre: das muss sich, einer Sanduhr gleich, immer wieder
9288 - so dass alle diese Jahre sich selber gleich sind, im Grössten und
9289 auch im Kleinsten, - so dass wir selber in jedem grossen Jahre uns
9816 - Und wieder liefen Monde und Jahre über Zarathustra's Seele, und er
9931 tausend Jahren - -
10801 Meine Liebe diente ihm lange Jahre, mein Wille gierig allem seinen
To better understand the algorithm, I'd recommend going through just the first test case for WordCount which is small enough to see exactly what is happening at every step but real enough to see a genuine useful calculation in progress. You can insert print statements for the various queues, lists and tuples at each step to see exactly what is going on. In fact although I spent days studying MapReduce as an abstract algorithm and looking at code implementations, I didn't really understand it until I did this exercise of stepping through the code, which in my case was also debugging it.
Then once you've done that with the first simple test case you can try the larger WordCount corpus which is a full-size work by Nietzsche in the original German. Since the data is too large to print out all at once, I recommend only printing the first 10 items or so for each step of the process so you can see what's going on with a bigger example. Then after that you can try DistributedGrep which is an entirely different algorithm, with both a different feel and implementation, so you can move beyond the introductory word counting and see how other types of processes can be implemented in MapReduce as well.
So that's it folks, I hope you enjoy it. Improvements, additional requests, criticisms, and flames are all equally welcome. I'm especially interested in finding some more python example algorithms that can be implemented in MapReduce in a page or two, particularly ones outside what you might normally see in the standard corpus. If there's enough interest I'll do some additional examples myself and follow up in a subsequent post.
PS Thanks to MoinMoin for the html color markup program for python.
Full source code can be downloaded here: map_reduce.py
