Wednesday, October 6, 2010

Google's MapReduce in 98 Lines of Python

MapReduce is the magic sauce that makes Google run.  Not just search but a large part of their infrastructure is programmed in this paradigm.  If you want to see how this can be implemented in python, read on.

Lately I've been not only learning more python but also learning about the MapReduce algorithm. Naturally I started with the many freely available web resources, from brief overviews to instructive video tutorials to detailed Google Research articles on MapReduce.

This is all well and fine but I wanted to know how to actually write MapReduce applications in python, and to obtain a better understanding of the magic behind the algorithm itself.  I found a small python multi-server implementation of MapReduce named mincemeat, another called octopy, as well as interfaces to large non-python systems such as Hadoop.  However I was still unable to find a quick and dirty implementation of MapReduce that was high-level, concise, easy to run, easy to understand, and relevant.  So I wrote one.

To fulfill my requirement of high-level, I wanted to actually use python-native lists, tuples, queues, and threading.  I didn't want to get stuck with low-level socket communication and endless writing and parsing of intermediate flat files.  To fulfill my requirement of concise, I wanted the whole MapReduce algorithm to be implemented in a single python class of 100 lines of code or less, and we're talking properly well-formatted python code, not a series of Perl one-liners.  To fulfill my requirement that it be easy to run, I should be able to put the whole thing in one python file and call it from the command line with a python interpreter.  It should only use the python 2.6+ standard library so there is nothing you should have to install if you already have python.  Nor should you need to setup a multi-server environment, multiple servers on a network, you should be able to run it on a single machine while still observing multiple simultaneous map and reduce work tasks.  To fulfill my requirement that it be easy to understand, it should accurately and clearly implement the major steps of MapReduce, with comments where necessary.  One should be able to insert print statements of the intermediate steps at any point in the computation and see exactly what is going on.  To fulfill my requirement of relevant code, not just an academic exercise, I wanted actual python source code examples running the MapReduce class on real data.

That being said, it is important to know what this implementation of MapReduce in python is not.  It is not web-scale without significant changes - in 100 lines of code there is only so much one can do.  It does not implement some significant reliability provisions Google uses such as distributed multiple-copy data and monitored task restarts.  It does not handle mapper-reducer pairing and partitioning for grouped calculations.  It ignores the important principle of locality to dispatch particular workers to machines where applicable data already resides.  It bypasses the key efficiency step of combiners to aid in the speed of reducing tasks.  It loads the data directly from the internet or local disk and stores intermediate results in memory instead of using a distributed filesystem like GFS or HDFS. It doesn't support differing numbers of parse, map, merge, sort, reduce, and output functions with varying resource allocations to each. And last but not least, it bases its multiple worker implementation on python threading with the Thread and synchronized Queue coordinating classes.  This means that while in theory there is nothing to prevent an interface-identical implementation of these python standard classes which are distributed across thousands of machines, as written the code runs all the workers only on a single machine.

But enough talk.  It's time to show the goods.  Let's see some code.
import threading
import Queue
import operator
import urllib
import re

class MapReduce:
    ''' MapReduce - to use, subclass by defining these functions,
                    then call self.map_reduce():
        parse_fn(self, k, v) => [(k, v), ...]
        map_fn(self, k, v) => [(k, v1), (k, v2), ...]
        reduce_fn(self, k, [v1, v2, ...]) => [(k, v)]
        output_fn(self, [(k, v), ...])
    '''
    def __init__(self):
        self.data = None
        self.num_worker_threads = 5

    class SynchronizedDict(): # we need this for merging
        def __init__(self):
            self.lock = threading.Lock()
            self.d = {}
        def isin(self, k):
            with self.lock:
                if k in self.d:
                    return True
                else:
                    return False
        def get(self, k):
            with self.lock:
                return self.d[k]
        def set(self, k, v): # we don't need del
            with self.lock:
                self.d[k] = v
        def set_append(self, k, v): # for thread-safe list append
            with self.lock:
                self.d[k].append(v)
        def items(self):
            with self.lock:
                return self.d.items()

    def create_queue(self, input_list): # helper fn for queues
        output_queue = Queue.Queue()
        for value in input_list:
            output_queue.put(value)
        return output_queue

    def create_list(self, input_queue): # helper fn for queues
        output_list = []
        while not input_queue.empty():
            item = input_queue.get()
            output_list.append(item)
            input_queue.task_done()
        return output_list

    def merge_fn(self, k, v, merge_dict): # helper fn for merge
        if merge_dict.isin(k):
            merge_dict.set_append(k, v)
        else:
            merge_dict.set(k, [v])

    def process_queue(self, input_queue, fn_selector): # helper fn
        output_queue = Queue.Queue()
        if fn_selector == 'merge':
            merge_dict = self.SynchronizedDict()
        def worker():
            while not input_queue.empty():
                (k, v) = input_queue.get()
                if fn_selector in ['map', 'reduce']:
                    if fn_selector == 'map':
                        result_list = self.map_fn(k, v)
                    elif fn_selector == 'reduce':
                        result_list = self.reduce_fn(k, v)
                    for result_tuple in result_list: # flatten
                        output_queue.put(result_tuple)
                elif fn_selector == 'merge': # merge v to same k
                    self.merge_fn(k, v, merge_dict)
                else:
                    raise Exception, "Bad fn_selector="+fn_selector
                input_queue.task_done()
        for i in range(self.num_worker_threads): # start threads
            worker_thread = threading.Thread(target=worker)
            worker_thread.daemon = True
            worker_thread.start()
        input_queue.join() # wait for worker threads to finish
        if fn_selector == 'merge':
            output_list = sorted(merge_dict.items(), key=operator.itemgetter(0))
            output_queue = self.create_queue(output_list)
        return output_queue

    def map_reduce(self): # the actual map-reduce algoritm
        data_list = self.parse_fn(self.data)
        data_queue = self.create_queue(data_list) # enqueue the data so we can multi-process
        map_queue = self.process_queue(data_queue, 'map') # [(k,v),...] => [(k,v1),(k,v2),...]
        merge_queue = self.process_queue(map_queue, 'merge') # [(k,v1),(k,v2),...] => [(k,[v1,v2,...]),...]
        reduce_queue = self.process_queue(merge_queue, 'reduce') # [(k,[v1,v2,...]),...] => [(k,v),...]
        output_list = self.create_list(reduce_queue) # deque into list for output handling
        self.output_fn(output_list)

Well, there you have it. Google's MapReduce, with all my caveats of course, in less than 100 lines of python code, 98 lines to be precise which includes imports and spacing lines. Of course just the class itself isn't very useful without some examples to see how it works, so I've included a WordCount example below:

class WordCount(MapReduce):

    def __init__(self):
        MapReduce.__init__(self)
        self.min_count = 1

    def parse_fn(self, data): # break string into [(k, v), ...] tuples for each line
        data_list = map(lambda line: (None, line), data.splitlines())
        return data_list

    def map_fn(self, key, str): # return (word, 1) tuples for each word, ignore key
        word_list = []
        for word in re.split(r'\W+', str.lower()):
            bare_word = re.sub(r"[^A-Za-z0-9]*", r"", word);
            if len(bare_word) > 0:
                word_list.append((bare_word, 1))
        return word_list

    def reduce_fn(self, word, count_list): # just sum the counts
        return [(word, sum(count_list))]

    def output_fn(self, output_list): # just print the resulting list
        print "Word".ljust(15), "Count".rjust(5)
        print "______________".ljust(15), "_____".rjust(5)
        sorted_list = sorted(output_list, key=operator.itemgetter(1), reverse=True)
        for (word, count) in sorted_list:
            if count > self.min_count:
                print word.ljust(15), repr(count).rjust(5)
        print

    def test_with_monty(self):
        self.data = """The Meaning of Life is:
            try and be nice to people,
            avoid eating fat,
            read a good book every now and then,
            get some walking in,
            and try and live together in peace and harmony
            with people of all creeds and nations."""
        self.map_reduce()

    def test_with_nietzsche(self):
        self.min_count = 700
        f = urllib.urlopen("http://www.gutenberg.org/cache/epub/7205/pg7205.txt")
        self.data = f.read()
        f.close()
        self.map_reduce()


Another one which will help you understand better how to use map keys throughout the computation is DistributedGrep, which really has a different feel from WordCount, which I've included below:

class DistributedGrep(MapReduce):

    def __init__(self):
        MapReduce.__init__(self)
        self.matcher = None

    def parse_fn(self, data): # one list item per line with line number
        data_list = []
        line_num = 1
        for line in data.splitlines():
            data_list.append((line_num, line))
            line_num = line_num + 1
        return data_list

    def map_fn(self, line_num, line): # return line if matches, include line num
        matcher = self.matcher
        matched_line = []
        if matcher.match(line):
            matched_line = [(line_num, line)]
        return matched_line

    def reduce_fn(self, line_num, line_list): # identity reducer
        return [(line_num, line_list[0])] # we only ever have one line in the list

    def output_fn(self, output_list): # just print the resulting list
        print "LineNum".rjust(8), "Line".ljust(70)
        print "_______".rjust(8), "____"
        for (line_num, line) in sorted(output_list, key=operator.itemgetter(0)):
            print repr(line_num).rjust(8), line.ljust(70)
        print

    def test_with_nietzsche(self):
        self.matcher = re.compile(r".*Jahre.*")
        f = urllib.urlopen("http://www.gutenberg.org/cache/epub/7205/pg7205.txt")
        self.data = f.read()
        f.close()
        self.map_reduce()

Of course none of this is useful if you can't actually run the code, so here's a main function below with the two examples classes run in test execution:

def main():
    wc = WordCount()
    wc.test_with_monty()
    wc.test_with_nietzsche()

    dg = DistributedGrep()
    dg.test_with_nietzsche()

if __name__ == "__main__":
    main()

You can paste all the following code snippets above into one file and run that in any python 2.6+ interpreter, and it should output results something like the following:

$ python map_reduce.py

Word            Count
______________  _____
and                 6
in                  2
of                  2
people              2
try                 2

Word            Count
______________  _____
und              3992
der              2022
ich              1714
die              1459
ist              1179
das              1103
nicht             985
zu                947
es                872
aber              857
du                856
er                854
sie               786
ihr               769
den               751
ein               746

 LineNum Line
 _______ ____
     168 Zehn Jahre kamst du hier herauf zu meiner Höhle: du würdest deines
     209 Nicht fremd ist mir dieser Wanderer: vor manchen Jahre gieng er her
    3198 Also vergiengen dem Einsamen Monde und Jahre; seine Weisheit aber
    4585 und unverändert durch die Jahre.
    9285 von grossem Jahre: das muss sich, einer Sanduhr gleich, immer wieder
    9288 - so dass alle diese Jahre sich selber gleich sind, im Grössten und
    9289 auch im Kleinsten, - so dass wir selber in jedem grossen Jahre uns
    9816 - Und wieder liefen Monde und Jahre über Zarathustra's Seele, und er
    9931 tausend Jahren - -
   10801 Meine Liebe diente ihm lange Jahre, mein Wille gierig allem seinen


To better understand the algorithm, I'd recommend going through just the first test case for WordCount which is small enough to see exactly what is happening at every step but real enough to see a genuine useful calculation in progress. You can insert print statements for the various queues, lists and tuples at each step to see exactly what is going on. In fact although I spent days studying MapReduce as an abstract algorithm and looking at code implementations, I didn't really understand it until I did this exercise of stepping through the code, which in my case was also debugging it.

Then once you've done that with the first simple test case you can try the larger WordCount corpus which is a full-size work by Nietzsche in the original German. Since the data is too large to print out all at once, I recommend only printing the first 10 items or so for each step of the process so you can see what's going on with a bigger example. Then after that you can try DistributedGrep which is an entirely different algorithm, with both a different feel and implementation, so you can move beyond the introductory word counting and see how other types of processes can be implemented in MapReduce as well.

So that's it folks, I hope you enjoy it. Improvements, additional requests, criticisms, and flames are all equally welcome. I'm especially interested in finding some more python example algorithms that can be implemented in MapReduce in a page or two, particularly ones outside what you might normally see in the standard corpus. If there's enough interest I'll do some additional examples myself and follow up in a subsequent post.

PS Thanks to MoinMoin for the html color markup program for python.

Full source code can be downloaded here: map_reduce.py

62 comments:

  1. That's pretty cool! Just to add to things, Doug Hellmann did a version of mapreduce using multiprocessing a little while back (as opposed to using threading): http://blog.doughellmann.com/2009/04/implementing-mapreduce-with.html

    ReplyDelete
    Replies
    1. I am glad that I saw this post. It is informative blog for us and we need this type of blog thanks for share this blog, Keep posting such instructional blogs and I am looking forward for your future posts. Python Projects for Students Data analytics is the study of dissecting crude data so as to make decisions about that data. Data analytics advances and procedures are generally utilized in business ventures to empower associations to settle on progressively Python Training in Chennai educated business choices. In the present worldwide commercial center, it isn't sufficient to assemble data and do the math; you should realize how to apply that data to genuine situations such that will affect conduct. In the program you will initially gain proficiency with the specialized skills, including R and Python dialects most usually utilized in data analytics programming and usage; Python Training in Chennai at that point center around the commonsense application, in view of genuine business issues in a scope of industry segments, for example, wellbeing, promoting and account. Project Center in Chennai

      Delete
  2. A great simple mapreduce example.

    I wonder if there is a possible race condition in the merge_fn. If 2 threads both call merge_fn with a new key ...

    thread 1: calls isin and returns False

    thread 2: calls isin and returns False

    thread 1: adds key to merge_dict

    thread 2: overwrites key in merge_dict


    How about an add function in the merge_dict as an alternative ...

    def add(self, k, v):
    >with self.lock:
    >>if k in self.d:
    >>>self.d[k].append(v)
    >>else:
    >>>self.d[k] = [v]

    ReplyDelete
  3. In you mapreduct example, the map workers and reduce workers can't be processed in parallel. Because reduce workers must wait for the complete of map workers. How is the true MapReduce?

    ReplyDelete
  4. Yes you are correct, the map and reduce steps cannot be in parallel. Instead, all maps run in parallel, then when all are finished, all reduce steps run, depending on algorithm in order, This is inherent to the map reduce algorithm. As you noticed, this limits parallelization. This is perhaps one reason Google has largely abandoned map reduce in favor of Bigtable-based processing, letting the database function as the point of control in the algorithms.

    ReplyDelete
  5. Hi John,
    Apologies for commenting rather than emailing, but couldn't see an address. I'd like to use this as an example in Software Carpentry (http://software-carpentry.org); could you please give me a shout and let me know how to reach you to ask about credits, etc.?
    Thanks,
    Greg (gvwilson at third hyphen bit dot com)

    ReplyDelete
  6. Sure, sorry i didnt get comment notice until now. My email is johnarleyburns@gmail.com.

    ReplyDelete
  7. Excellent pieces. Keep posting such kind of information on your blog. I really impressed by your blog.
    Android app development companies| Android phone app development|

    ReplyDelete
  8. I felt like confusing li'l bit but Ur blog was really helpful and informative. Great blog u have !!
    Free Ecommerce Software
    Web Shopping Cart

    ReplyDelete
  9. Thanks for sharing this info

    ReplyDelete
  10. My searching is ending here. Thanks for your work. Its really a great resources. I bookmarked it and check it later.
    html5 player

    ReplyDelete
  11. Wow this is really fantastic post. After find this blog there are no need to search more about Mapreduce.

    ReplyDelete
  12. Invalid realization. Main idea for mapreduce that we can't to process whole data. In your realization whole data loaded and then distributed between threads.

    Or i am wrong?

    ReplyDelete
  13. mytectra placement Portal is a Web based portal brings Potentials Employers and myTectra Candidates on a common platform for placement assistance.

    ReplyDelete
  14. I like your blog, I read this blog please update more content on python, further check it once at python online training

    ReplyDelete
  15. شركة اللمسة الأخيرة تقدم لك الحل الأمثل فلا حشرات بعد اليوم ولن تعود مرة أخرى. فنحن نستخدم أفضل المبيدات العالمية الفعالة صديقة البيئة التي لا تترك رائحة ولا سيوثر على صحة الأنسان ويقوم باستخدامها عمال مدربون يقومون برش المبيدات بشكل علمي مما يضمن لك الراحة التامة نرجو التواصل على هذا الرقم 0580002467
    شركة رش مبيدات بأبها
    شركة مكافحة حشرات بأبها
    شركة مكافحة النمل الابيض بأبها
    شركة رش مبيدات بخميس مشيط
    شركة مكافحة حشرات بخميس مشيط
    شركة مكافحة النمل الابيض بخميس مشيط
    شركة رش مبيدات بالقصيم
    شركة مكافحة حشرات بالقصيم
    شركة مكافحة حشرات بجازان
    شركة رش مبيدات بجازان

    ReplyDelete
  16. اهلاً ومرحباً بكم عملائنا الكرام نحن نقدم خدمان منزلية مميزة وذات ضمان وجودة عالية جدا عليكم بالتواصل معنا الأن وسوف نلبي طلباتكم بكافة تفاصيلها عن طريق بعض الروابط الخاص بالموقع الخاص بالشركة :.
    شركة عزل اسطح بابها
    شركة نقل عفش بابها
    شركة عزل خزانات بابها
    شركة تنظيف مجالس بابها
    شركة تنظيف شقق بابها
    شركة مكافحة النمل الأبيض بابها
    شركة ترميم منازل بابها
    شركة عزل اسطح بابها

    ReplyDelete
  17. خدمات منزلية بأرخص الأسعار عالية الدقة قمة التميز في إختيار ماكينات التنظيف أيادي عاملة خبرة منذ سنوات كافة عوامل النجاح تتوفر في شركة التميز الجنوبي يمكنكم التتبع والتواصل معنا عبر الروابط التالية :.شركة مكافحة حشرات بابها
    شركة تنظيف منازل بابها
    شركة تنظيف فلل بابها
    شركة تنظيف خزانات بابها


    شركة تنظيف بابها



    ReplyDelete
  18. شركة من الشركات الرائدة في الخدمات المنزلية شركة التميز الجنوبي تتميز بوجود عمال متميزة وعلى أعلى دقة ممكن شركة التميز الجنوبي أسعار بدون منافسة تواصل معنا الأن عبر الروابط التالية وسوف نلبي طلباتكم في اسرع وقت ممكن :.
    شركة مكافحة نمل أبيض بخميس مشيط
    شركة مكافحة حشرات بخميس مشيط
    شركة تنظيف بخميس مشيط
    شركة تنظيف خزانات بخميس مشيط
    شركة تنظيف شقق بخميس مشيط
    شركة تنظيف فلل بخميس مشيط
    شركة تنظيف مجالس بخميس مشيط
    شركة عزل خزانات بخميس مشيط

    ReplyDelete
  19. Attend The Machine Learning course Bangalore From ExcelR. Practical Machine Learning course Bangalore Sessions With Assured Placement Support From Experienced Faculty. ExcelR Offers The Machine Learning course Bangalore.
    ExcelR Machine Learning course Bangalore

    ReplyDelete
  20. شركة الأهرام للخدمات المنزلية شركة متخصصة في تقديم أعلى وأفضل خدمات تنظيف ومكافحة الحشرات والقضاء عليها نهائيا شركتنا من افضل شركات التنظيف ومكافحة الحشرات والخدمات المنزلية بشكل عام

    شركة تنظيف سجاد بالخبر
    شركة تنظيف خزانات بالخبر
    شركة تنظيف مكيفات بالخبر
    شركة تنظيف كنب بالخبر
    شركة رش مبيدات بالخبر
    شركة مكافحة حشرات بالخبر

    ReplyDelete
  21. Attend The Artificial Intelligence course From ExcelR. Practical Artificial Intelligence course Sessions With Assured Placement Support From Experienced Faculty. ExcelR Offers The Artificial Intelligence course.
    Artificial Intelligence course

    ReplyDelete
  22. You must have a lot of pride in writing quality content. I'm impressed with the amount of solid information you have written in your article. I hope to read more.
    Best Data Science training in Mumbai

    Data Science training in Mumbai

    ReplyDelete
  23. The data that you provided in the blog is informative and effective.I am happy to visit and read useful articles here. I hope you continue to do the sharing through the post to the reader. Read more about

    selenium training in chennai

    selenium training in chennai

    selenium online training in chennai

    selenium training in bangalore

    selenium training in hyderabad

    selenium training in coimbatore

    selenium online training

    ReplyDelete

  24. DevOps is currently a popular model currently organizations all over the world moving towards to it. Your post gave a clear idea about knowing the DevOps model and its importance.
    I am really enjoyed a lot when reading your well-written posts. It shows like you spend more effort and time to write this blog. I have saved it for my future reference. Keep it up the good work

    Azure Training in Chennai

    Azure Training in Bangalore

    Azure Training in Hyderabad

    Azure Training in Pune

    Azure Training | microsoft azure certification | Azure Online Training Course

    Azure Online Training

    ReplyDelete
  25. Nice information, valuable and excellent design, as share good stuff with good ideas and concepts, lots of great information and inspiration, both of which I need, thanks to offer such a helpful information here.
    DevOps Training in Chennai

    DevOps Online Training in Chennai

    DevOps Training in Bangalore

    DevOps Training in Hyderabad

    DevOps Training in Coimbatore

    DevOps Training

    DevOps Online Training

    ReplyDelete
  26. I just see the post i am so happy the post of information's.So I have really enjoyed and reading your blogs for these posts.Any way I’ll be subscribing to your feed and I hope you post again soon.
    IELTS Coaching in chennai

    German Classes in Chennai

    GRE Coaching Classes in Chennai

    TOEFL Coaching in Chennai

    spoken english classes in chennai | Communication training

    ReplyDelete
  27. This comment has been removed by the author.

    ReplyDelete
  28. I feel really happy to have seen your webpage.I am feeling grateful to read this.you gave a nice information for us.please updating more stuff content...keep up!!

    Android Training in Chennai

    Android Online Training in Chennai

    Android Training in Bangalore

    Android Training in Hyderabad

    Android Training in Coimbatore

    Android Training

    Android Online Training

    ReplyDelete
  29. This article is well formulated. I particularly like the way how you have delivered all the major points about the topic of the content in petite and crisp points.
    Data Science training in Mumbai
    Data Science course in Mumbai
    SAP training in Mumbai

    ReplyDelete