Wednesday, October 6, 2010

Google's MapReduce in 98 Lines of Python

MapReduce is the magic sauce that makes Google run.  Not just search but a large part of their infrastructure is programmed in this paradigm.  If you want to see how this can be implemented in python, read on.

Lately I've been not only learning more python but also learning about the MapReduce algorithm. Naturally I started with the many freely available web resources, from brief overviews to instructive video tutorials to detailed Google Research articles on MapReduce.

This is all well and fine but I wanted to know how to actually write MapReduce applications in python, and to obtain a better understanding of the magic behind the algorithm itself.  I found a small python multi-server implementation of MapReduce named mincemeat, another called octopy, as well as interfaces to large non-python systems such as Hadoop.  However I was still unable to find a quick and dirty implementation of MapReduce that was high-level, concise, easy to run, easy to understand, and relevant.  So I wrote one.

To fulfill my requirement of high-level, I wanted to actually use python-native lists, tuples, queues, and threading.  I didn't want to get stuck with low-level socket communication and endless writing and parsing of intermediate flat files.  To fulfill my requirement of concise, I wanted the whole MapReduce algorithm to be implemented in a single python class of 100 lines of code or less, and we're talking properly well-formatted python code, not a series of Perl one-liners.  To fulfill my requirement that it be easy to run, I should be able to put the whole thing in one python file and call it from the command line with a python interpreter.  It should only use the python 2.6+ standard library so there is nothing you should have to install if you already have python.  Nor should you need to setup a multi-server environment, multiple servers on a network, you should be able to run it on a single machine while still observing multiple simultaneous map and reduce work tasks.  To fulfill my requirement that it be easy to understand, it should accurately and clearly implement the major steps of MapReduce, with comments where necessary.  One should be able to insert print statements of the intermediate steps at any point in the computation and see exactly what is going on.  To fulfill my requirement of relevant code, not just an academic exercise, I wanted actual python source code examples running the MapReduce class on real data.

That being said, it is important to know what this implementation of MapReduce in python is not.  It is not web-scale without significant changes - in 100 lines of code there is only so much one can do.  It does not implement some significant reliability provisions Google uses such as distributed multiple-copy data and monitored task restarts.  It does not handle mapper-reducer pairing and partitioning for grouped calculations.  It ignores the important principle of locality to dispatch particular workers to machines where applicable data already resides.  It bypasses the key efficiency step of combiners to aid in the speed of reducing tasks.  It loads the data directly from the internet or local disk and stores intermediate results in memory instead of using a distributed filesystem like GFS or HDFS. It doesn't support differing numbers of parse, map, merge, sort, reduce, and output functions with varying resource allocations to each. And last but not least, it bases its multiple worker implementation on python threading with the Thread and synchronized Queue coordinating classes.  This means that while in theory there is nothing to prevent an interface-identical implementation of these python standard classes which are distributed across thousands of machines, as written the code runs all the workers only on a single machine.

But enough talk.  It's time to show the goods.  Let's see some code.
import threading
import Queue
import operator
import urllib
import re

class MapReduce:
    ''' MapReduce - to use, subclass by defining these functions,
                    then call self.map_reduce():
        parse_fn(self, k, v) => [(k, v), ...]
        map_fn(self, k, v) => [(k, v1), (k, v2), ...]
        reduce_fn(self, k, [v1, v2, ...]) => [(k, v)]
        output_fn(self, [(k, v), ...])
    '''
    def __init__(self):
        self.data = None
        self.num_worker_threads = 5

    class SynchronizedDict(): # we need this for merging
        def __init__(self):
            self.lock = threading.Lock()
            self.d = {}
        def isin(self, k):
            with self.lock:
                if k in self.d:
                    return True
                else:
                    return False
        def get(self, k):
            with self.lock:
                return self.d[k]
        def set(self, k, v): # we don't need del
            with self.lock:
                self.d[k] = v
        def set_append(self, k, v): # for thread-safe list append
            with self.lock:
                self.d[k].append(v)
        def items(self):
            with self.lock:
                return self.d.items()

    def create_queue(self, input_list): # helper fn for queues
        output_queue = Queue.Queue()
        for value in input_list:
            output_queue.put(value)
        return output_queue

    def create_list(self, input_queue): # helper fn for queues
        output_list = []
        while not input_queue.empty():
            item = input_queue.get()
            output_list.append(item)
            input_queue.task_done()
        return output_list

    def merge_fn(self, k, v, merge_dict): # helper fn for merge
        if merge_dict.isin(k):
            merge_dict.set_append(k, v)
        else:
            merge_dict.set(k, [v])

    def process_queue(self, input_queue, fn_selector): # helper fn
        output_queue = Queue.Queue()
        if fn_selector == 'merge':
            merge_dict = self.SynchronizedDict()
        def worker():
            while not input_queue.empty():
                (k, v) = input_queue.get()
                if fn_selector in ['map', 'reduce']:
                    if fn_selector == 'map':
                        result_list = self.map_fn(k, v)
                    elif fn_selector == 'reduce':
                        result_list = self.reduce_fn(k, v)
                    for result_tuple in result_list: # flatten
                        output_queue.put(result_tuple)
                elif fn_selector == 'merge': # merge v to same k
                    self.merge_fn(k, v, merge_dict)
                else:
                    raise Exception, "Bad fn_selector="+fn_selector
                input_queue.task_done()
        for i in range(self.num_worker_threads): # start threads
            worker_thread = threading.Thread(target=worker)
            worker_thread.daemon = True
            worker_thread.start()
        input_queue.join() # wait for worker threads to finish
        if fn_selector == 'merge':
            output_list = sorted(merge_dict.items(), key=operator.itemgetter(0))
            output_queue = self.create_queue(output_list)
        return output_queue

    def map_reduce(self): # the actual map-reduce algoritm
        data_list = self.parse_fn(self.data)
        data_queue = self.create_queue(data_list) # enqueue the data so we can multi-process
        map_queue = self.process_queue(data_queue, 'map') # [(k,v),...] => [(k,v1),(k,v2),...]
        merge_queue = self.process_queue(map_queue, 'merge') # [(k,v1),(k,v2),...] => [(k,[v1,v2,...]),...]
        reduce_queue = self.process_queue(merge_queue, 'reduce') # [(k,[v1,v2,...]),...] => [(k,v),...]
        output_list = self.create_list(reduce_queue) # deque into list for output handling
        self.output_fn(output_list)

Well, there you have it. Google's MapReduce, with all my caveats of course, in less than 100 lines of python code, 98 lines to be precise which includes imports and spacing lines. Of course just the class itself isn't very useful without some examples to see how it works, so I've included a WordCount example below:

class WordCount(MapReduce):

    def __init__(self):
        MapReduce.__init__(self)
        self.min_count = 1

    def parse_fn(self, data): # break string into [(k, v), ...] tuples for each line
        data_list = map(lambda line: (None, line), data.splitlines())
        return data_list

    def map_fn(self, key, str): # return (word, 1) tuples for each word, ignore key
        word_list = []
        for word in re.split(r'\W+', str.lower()):
            bare_word = re.sub(r"[^A-Za-z0-9]*", r"", word);
            if len(bare_word) > 0:
                word_list.append((bare_word, 1))
        return word_list

    def reduce_fn(self, word, count_list): # just sum the counts
        return [(word, sum(count_list))]

    def output_fn(self, output_list): # just print the resulting list
        print "Word".ljust(15), "Count".rjust(5)
        print "______________".ljust(15), "_____".rjust(5)
        sorted_list = sorted(output_list, key=operator.itemgetter(1), reverse=True)
        for (word, count) in sorted_list:
            if count > self.min_count:
                print word.ljust(15), repr(count).rjust(5)
        print

    def test_with_monty(self):
        self.data = """The Meaning of Life is:
            try and be nice to people,
            avoid eating fat,
            read a good book every now and then,
            get some walking in,
            and try and live together in peace and harmony
            with people of all creeds and nations."""
        self.map_reduce()

    def test_with_nietzsche(self):
        self.min_count = 700
        f = urllib.urlopen("http://www.gutenberg.org/cache/epub/7205/pg7205.txt")
        self.data = f.read()
        f.close()
        self.map_reduce()


Another one which will help you understand better how to use map keys throughout the computation is DistributedGrep, which really has a different feel from WordCount, which I've included below:

class DistributedGrep(MapReduce):

    def __init__(self):
        MapReduce.__init__(self)
        self.matcher = None

    def parse_fn(self, data): # one list item per line with line number
        data_list = []
        line_num = 1
        for line in data.splitlines():
            data_list.append((line_num, line))
            line_num = line_num + 1
        return data_list

    def map_fn(self, line_num, line): # return line if matches, include line num
        matcher = self.matcher
        matched_line = []
        if matcher.match(line):
            matched_line = [(line_num, line)]
        return matched_line

    def reduce_fn(self, line_num, line_list): # identity reducer
        return [(line_num, line_list[0])] # we only ever have one line in the list

    def output_fn(self, output_list): # just print the resulting list
        print "LineNum".rjust(8), "Line".ljust(70)
        print "_______".rjust(8), "____"
        for (line_num, line) in sorted(output_list, key=operator.itemgetter(0)):
            print repr(line_num).rjust(8), line.ljust(70)
        print

    def test_with_nietzsche(self):
        self.matcher = re.compile(r".*Jahre.*")
        f = urllib.urlopen("http://www.gutenberg.org/cache/epub/7205/pg7205.txt")
        self.data = f.read()
        f.close()
        self.map_reduce()

Of course none of this is useful if you can't actually run the code, so here's a main function below with the two examples classes run in test execution:

def main():
    wc = WordCount()
    wc.test_with_monty()
    wc.test_with_nietzsche()

    dg = DistributedGrep()
    dg.test_with_nietzsche()

if __name__ == "__main__":
    main()

You can paste all the following code snippets above into one file and run that in any python 2.6+ interpreter, and it should output results something like the following:

$ python map_reduce.py

Word            Count
______________  _____
and                 6
in                  2
of                  2
people              2
try                 2

Word            Count
______________  _____
und              3992
der              2022
ich              1714
die              1459
ist              1179
das              1103
nicht             985
zu                947
es                872
aber              857
du                856
er                854
sie               786
ihr               769
den               751
ein               746

 LineNum Line
 _______ ____
     168 Zehn Jahre kamst du hier herauf zu meiner Höhle: du würdest deines
     209 Nicht fremd ist mir dieser Wanderer: vor manchen Jahre gieng er her
    3198 Also vergiengen dem Einsamen Monde und Jahre; seine Weisheit aber
    4585 und unverändert durch die Jahre.
    9285 von grossem Jahre: das muss sich, einer Sanduhr gleich, immer wieder
    9288 - so dass alle diese Jahre sich selber gleich sind, im Grössten und
    9289 auch im Kleinsten, - so dass wir selber in jedem grossen Jahre uns
    9816 - Und wieder liefen Monde und Jahre über Zarathustra's Seele, und er
    9931 tausend Jahren - -
   10801 Meine Liebe diente ihm lange Jahre, mein Wille gierig allem seinen


To better understand the algorithm, I'd recommend going through just the first test case for WordCount which is small enough to see exactly what is happening at every step but real enough to see a genuine useful calculation in progress. You can insert print statements for the various queues, lists and tuples at each step to see exactly what is going on. In fact although I spent days studying MapReduce as an abstract algorithm and looking at code implementations, I didn't really understand it until I did this exercise of stepping through the code, which in my case was also debugging it.

Then once you've done that with the first simple test case you can try the larger WordCount corpus which is a full-size work by Nietzsche in the original German. Since the data is too large to print out all at once, I recommend only printing the first 10 items or so for each step of the process so you can see what's going on with a bigger example. Then after that you can try DistributedGrep which is an entirely different algorithm, with both a different feel and implementation, so you can move beyond the introductory word counting and see how other types of processes can be implemented in MapReduce as well.

So that's it folks, I hope you enjoy it. Improvements, additional requests, criticisms, and flames are all equally welcome. I'm especially interested in finding some more python example algorithms that can be implemented in MapReduce in a page or two, particularly ones outside what you might normally see in the standard corpus. If there's enough interest I'll do some additional examples myself and follow up in a subsequent post.

PS Thanks to MoinMoin for the html color markup program for python.

Full source code can be downloaded here: map_reduce.py

14 comments:

  1. That's pretty cool! Just to add to things, Doug Hellmann did a version of mapreduce using multiprocessing a little while back (as opposed to using threading): http://blog.doughellmann.com/2009/04/implementing-mapreduce-with.html

    ReplyDelete
  2. A great simple mapreduce example.

    I wonder if there is a possible race condition in the merge_fn. If 2 threads both call merge_fn with a new key ...

    thread 1: calls isin and returns False

    thread 2: calls isin and returns False

    thread 1: adds key to merge_dict

    thread 2: overwrites key in merge_dict


    How about an add function in the merge_dict as an alternative ...

    def add(self, k, v):
    >with self.lock:
    >>if k in self.d:
    >>>self.d[k].append(v)
    >>else:
    >>>self.d[k] = [v]

    ReplyDelete
  3. In you mapreduct example, the map workers and reduce workers can't be processed in parallel. Because reduce workers must wait for the complete of map workers. How is the true MapReduce?

    ReplyDelete
  4. Yes you are correct, the map and reduce steps cannot be in parallel. Instead, all maps run in parallel, then when all are finished, all reduce steps run, depending on algorithm in order, This is inherent to the map reduce algorithm. As you noticed, this limits parallelization. This is perhaps one reason Google has largely abandoned map reduce in favor of Bigtable-based processing, letting the database function as the point of control in the algorithms.

    ReplyDelete
  5. Hi John,
    Apologies for commenting rather than emailing, but couldn't see an address. I'd like to use this as an example in Software Carpentry (http://software-carpentry.org); could you please give me a shout and let me know how to reach you to ask about credits, etc.?
    Thanks,
    Greg (gvwilson at third hyphen bit dot com)

    ReplyDelete
  6. Sure, sorry i didnt get comment notice until now. My email is johnarleyburns@gmail.com.

    ReplyDelete
  7. Excellent pieces. Keep posting such kind of information on your blog. I really impressed by your blog.
    Android app development companies| Android phone app development|

    ReplyDelete
  8. I felt like confusing li'l bit but Ur blog was really helpful and informative. Great blog u have !!
    Free Ecommerce Software
    Web Shopping Cart

    ReplyDelete
  9. Thanks for sharing this info

    ReplyDelete
  10. My searching is ending here. Thanks for your work. Its really a great resources. I bookmarked it and check it later.
    html5 player

    ReplyDelete
  11. Wow this is really fantastic post. After find this blog there are no need to search more about Mapreduce.

    ReplyDelete
  12. Invalid realization. Main idea for mapreduce that we can't to process whole data. In your realization whole data loaded and then distributed between threads.

    Or i am wrong?

    ReplyDelete