What is Google? You can say it is a system that indexes Internet web pages so you can later find them by searching using keywords. That was easy. But how does Google index billion of web pages efficiently? The answer is by employing massive parallelization through MapReduce. In this article I describe a simple problem and proceed to solve it with and without MapReduce. Then to finalize I show how MapReduce makes it straightforward to distribute the work in a cluster for parallel processing.

The Problem (in English)

I want to index files by word so I can later search and return all files that contain a given word together with the number of occurrences of that word. So a simple INPUT could be three files:

file1.txt => "foo foo bar cat dog dog"
file2.txt => "foo house cat cat dog"
file3.txt => "foo bird foo foo"

The OUTPUT is a hash map indexing each word to the files that contain that word, together with a counter for the number of occurrences of that word in the file. So for the three files above, we would end up with the following hash map:

bar => [ (file1.txt, 1) ]
foo => [ (file1.txt, 2), (file3.txt, 3), (file2.txt, 1) ]
cat => [ (file1.txt, 1), (file2.txt, 2) ]
bird => [ (file3.txt, 1) ]
dog => [ (file1.txt, 2), (file2.txt, 1) ]
house => [ (file2.txt, 1) ]

So from the hash map above you can quickly tell that the word “cat” is present on two files: “file1.txt” and “file2.txt”. In addition you know that in “file2.txt” it occurs two times and in “file1.txt” it occurs one time. The bottom line is that by building this hash map beforehand, you can quickly perform searches without having to scan through the entire content of your web pages each time.

The Problem (in Code)

public class FileMatch {

	private final String filename;
	private int occurrences;

	public FileMatch(String filename) {
		this.filename = filename;
		this.occurrences = 0;
	}

	public void inc() {
		this.occurrences++;
	}

	public String getFilename() {
		return filename;
	}

	public int getOccurrences() {
		return occurrences;
	}

	@Override
	public String toString() {
		return "(" + filename + ", " + occurrences + ")";
	}
}

So our index represented by a hash map will be something like:

// word => filename => FileMatch
Map<String, Map<String, FileMatch>> index = new HashMap<String, Map<String, FileMatch>>();

Approach #1: Without MapReduce

In this first approach we just count word by word, in a loop, filling our hash map with the results.

public class WithoutMapReduce {

	public static void main(String[] args) {

		Map<String, Map<String, FileMatch>> index = new HashMap<String, Map<String, FileMatch>>();

		addToIndex("file1.txt", "foo foo bar cat dog dog", index);
		addToIndex("file2.txt", "foo house cat cat dog", index);
		addToIndex("file3.txt", "foo bird foo foo", index);

		printIndex(index);
	}

	public static void addToIndex(final String filename,
                                final String fileContents,   
                                final Map<String, Map<String, FileMatch>> index) {

		String[] words = fileContents.split("\\s+");

		for(String word: words) {

			Map<String, FileMatch> fileMatches = index.get(word);

			if (fileMatches == null) {
				fileMatches = new HashMap<String, FileMatch>();
				index.put(word, fileMatches);
			}

			FileMatch fileMatch = fileMatches.get(filename);

			if (fileMatch == null) {
				fileMatch = new FileMatch(filename);
				fileMatches.put(filename, fileMatch);
			}

			fileMatch.inc();
		}
	}

	public static void printIndex(Map<String, Map<String, FileMatch>> index) {
		  // omitted for clarity...
  }
}

Approach #2: With MapReduce

So MapReduce takes the problem above and breaks it down in two independent phases: the Map phase and the Reduce phase. In practice there is a third pre-reduce phase called Grouping, but the only phases that get parallelized as we will see with approach #3 are the map and reduce phases.

Map
// MAP:

List<MappedItem> mappedItems = new LinkedList<MappedItem>();

mappedItems.addAll(map("file1.txt", "foo foo bar cat dog dog"));
mappedItems.addAll(map("file2.txt", "foo house cat cat dog"));
mappedItems.addAll(map("file3.txt", "foo bird foo foo"));

Above you can see that we execute the Map operation on all files, creating a list of MappedItems:

public class MappedItem {

    private final String word;
    private final String file;

    public MappedItem(String word, String file) {
        this.word = word;
        this.file = file;
    }

    public String getWord() {
        return word;
    }

    public String getFile() {
        return file;
    }

    @Override
    public String toString() {
    	return "(" + word + ", " + file + ")";
    }
}

The important code is the map method, showed below:

public static List<MappedItem> map(final String filename, final String fileContents) {

  List<MappedItem> mappedItems = new LinkedList<MappedItem>();

  String[] words = fileContents.split("\\s+");

    for(String word: words) {
        mappedItems.add(new MappedItem(word, filename));
    }

    return mappedItems;
}

It is important to understand that the Map phase returns a list of key/value pairs. In our example the key is a word and the value is the file where this word was found. It is also important to notice that the list will have duplicates. For example the item (foo, file3.txt) appears three times in the list because the word “foo” appears in the file three times. Below is the OUTPUT you should expect from the mapping phase:

[(foo, file1.txt), (foo, file1.txt), (bar, file1.txt), (cat, file1.txt), (dog, file1.txt),
(dog, file1.txt), (foo, file2.txt), (house, file2.txt), (cat, file2.txt), (cat, file2.txt),
(dog, file2.txt), (foo, file3.txt), (bird, file3.txt), (foo, file3.txt), (foo, file3.txt)]
Grouping

The intermediate phase of MapReduce is the grouping phase where the map results are grouped and prepared for the reduce phase. In that phase, you go from a List<MappedItem> to a Map<String, List<String>>:

// GROUP:

Map<String, List<String>> groupedItems = group(mappedItems);

The group method is described below:

public static Map<String, List<String>> group(List<MappedItem> mappedItems) {

    Map<String, List<String>> groupedItems = new HashMap<String, List<String>>();

    Iterator<MappedItem> iter = mappedItems.iterator();

    while(iter.hasNext()) {

        MappedItem item = iter.next();

        String word = item.getWord();
        String file = item.getFile();

        List<String> list = groupedItems.get(word);

        if (list == null) {
            list = new LinkedList<String>();
            groupedItems.put(word, list);
        }

        list.add(file);
    }

    return groupedItems;
}

The output of the Grouping phase is the output of the mapping phase without the duplicates, in other words, the list produced by the mapping phase becomes a map pointing to a list of files, as you can see below:

{bar=[file1.txt], foo=[file1.txt, file1.txt, file2.txt, file3.txt, file3.txt, file3.txt],
cat=[file1.txt, file2.txt, file2.txt], bird=[file3.txt], dog=[file1.txt, file1.txt, file2.txt],
house=[file2.txt]}
Reduce

In the final Reduce phase, the map entries produced by the grouping phase are reduced to the output we are looking for with the code below:

Map<String, Map<String, FileMatch>> index = new HashMap<String, Map<String, FileMatch>>();

Iterator<Entry<String, List<String>>> groupedIter = groupedItems.entrySet().iterator();

while(groupedIter.hasNext()) {

    Entry<String, List<String>> entry = groupedIter.next();

    String word = entry.getKey();
    List<String> list = entry.getValue();

    // REDUCE:

    Map<String, FileMatch> reducedMap = reduce(word, list);

    index.put(word, reducedMap);
}

And the reduce method:

public static Map<String, FileMatch> reduce(String word, List<String> list) {

  Map<String, FileMatch> reducedMap = new HashMap<String, FileMatch>();

  for (String filename : list) {

    FileMatch fileMatch = reducedMap.get(filename);

    if (fileMatch == null) {
      fileMatch = new FileMatch(filename);
      reducedMap.put(filename, fileMatch);
    }

    fileMatch.inc();
  }

  return reducedMap;
}

Note that the reduce method creates an entry for each word in our final hash map (i.e. index). That said, after the map, grouping and reduce phases, our final OUTPUT is the same as before:

bar => [ (file1.txt, 1) ]
foo => [ (file1.txt, 2), (file3.txt, 3), (file2.txt, 1) ]
cat => [ (file1.txt, 1), (file2.txt, 2) ]
bird => [ (file3.txt, 1) ]
dog => [ (file1.txt, 2), (file2.txt, 1) ]
house => [ (file2.txt, 1) ]

Approach #3: MapReduce with Parallelization

So why go through the trouble of MapReduce? The answer is massive parallelization. If you take a look on our previous MapReduce solution, you will notice that the map and reduce work can be easily broken down in independent jobs and distributed across a cluster of machines that can perform the work in parallel. Let’s change our code to make this point clear and introduce two callback interfaces that will allow us to be notified by the cluster when the work is ready:

public interface MapCallback {

    public void mapDone(String filename, List<MappedItem> values);
}
public interface ReduceCallback {

    public void reduceDone(String word, Map<String, FileMatch> reducedMap);
}

Modifying our map and reduce methods to use the callbacks above, we have:

public static Thread map(final String filename, final String fileContents,
                                            final MapCallback mapCallback) {

  Thread t = new Thread(new Runnable() {

    @Override
    public void run() {

      List<MappedItem> mappedItems = new LinkedList<MappedItem>();

      String[] words = fileContents.split("\\s+");

        for(String word: words) {
            mappedItems.add(new MappedItem(word, filename));
        }

        mapCallback.mapDone(filename, mappedItems);

    }
  });

  t.start();

  return t;
}
public static Thread reduce(final String word, final List<String> list,
                                    final ReduceCallback reduceCallback) {

  Thread t = new Thread(new Runnable() {

    @Override
    public void run() {

      Map<String, FileMatch> reducedMap = new HashMap<String, FileMatch>();

      for (String filename : list) {

        FileMatch fileMatch = reducedMap.get(filename);

        if (fileMatch == null) {
          fileMatch = new FileMatch(filename);
          reducedMap.put(filename, fileMatch);
        }

        fileMatch.inc();
      }

      reduceCallback.reduceDone(word, reducedMap);
    }
  });

  t.start();

  return t;
}

Note that to simulate a cluster of machines we are using threads to process the work independently and in parallel. Once each thread finishes its work, it uses the callback to deliver the results back to the caller of the map and reduce methods.

To wait for all threads to complete, we can use the join() method:

public static void waitForAllThreadsToFinish(List<Thread> threads) {
  try {
    for(Thread t: threads) t.join();
  } catch(InterruptedException e) {
    throw new RuntimeException(e);
  }
}

So now the MapReduce flow, with parallelization through threads, becomes:

public static void main(String[] args) {

    // MAP:

    final List<MappedItem> mappedItems = new LinkedList<MappedItem>();

    MapCallback mapCallback = new MapCallback() {

        @Override
        public synchronized void mapDone(String filename, List<MappedItem> values) {
            mappedItems.addAll(values);
        }
    };

    List<Thread> mapThreads = new LinkedList<Thread>();

    mapThreads.add(map("file1.txt", "foo foo bar cat dog dog", mapCallback));
    mapThreads.add(map("file2.txt", "foo house cat cat dog", mapCallback));
    mapThreads.add(map("file3.txt", "foo bird foo foo", mapCallback));

    waitForAllThreadsToFinish(mapThreads); // blocking call...

    System.out.println(mappedItems);

    // GROUP:

    Map<String, List<String>> groupedItems = group(mappedItems);

    System.out.println(groupedItems);

    final Map<String, Map<String, FileMatch>> index = new HashMap<String,
                                        Map<String, FileMatch>>();

    ReduceCallback reduceCallback = new ReduceCallback() {

        @Override
        public synchronized void reduceDone(String word,
                            Map<String, FileMatch> reducedMap) {
            index.put(word, reducedMap);
        }
    };

    List<Thread> reduceThreads = new LinkedList<Thread>();

    Iterator<Entry<String, List<String>>> groupedIter = groupedItems.entrySet().iterator();

    while(groupedIter.hasNext()) {

        Entry<String, List<String>> entry = groupedIter.next();

        String word = entry.getKey();
        List<String> list = entry.getValue();

        // REDUCE:

        reduceThreads.add(reduce(word, list, reduceCallback));
    }

    waitForAllThreadsToFinish(reduceThreads); // blocking call...

    printIndex(index);
}

Notice that each job sent to a thread has a unique identifier. For the map phase it is the filename and for the reduce phase it is the word. It would not be hard to simulate a cluster node failure by timing out a thread that is taking too long and then re-send the job to another thread. That’s what frameworks like Hadoop and MongoDB do.

Note: The complete source code is available at https://github.com/saoj/mapreduce

Conclusion

MapReduce breaks the process of indexing data in two steps: map and reduce. The map step needs to be completed before the reduce step, but each step can be broken down in small pieces that are executed in parallel. When you have a large data set, the ability to use a cluster and scale horizontally becomes crucial. Frameworks like Hadoop and MongoDB can manage the execution of a MapReduce operation in a cluster of computers with support for fault-tolerance. The complexity becomes hidden from the developer who only has to worry about implementing the map and reduce functions to index the data set in any way he wants to.