Details
-
New Feature
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
To support parallel processing, the MapReduce computation model essentially implements group data by key, then apply the reduce function to each group. The currently implementation of MapReduce framework uses sort-merge to guarantee the computation model. While “sort-merge” is relatively expensive when only grouping is needed. And this is what hash capable to do.
We propose to implement Hash-based MapReduce which utilizes hash to guarantee the computation model instead of the sort merge. This technique will works as following:
1. At map side, the hash map output collector will collect the map output directly to the corresponding partition buffer (for each reduces) without sorting (first level partition). Each partition buffer can be flushed to disk if the buffer is full or close to full. To handling disk IO efficiently when there are too many partitions (reduces), the map side can be optimized by using a shared buffer for different partitions. Counting sort on partition number can be performed when flushing the shared buffer.
2. At reduce side, the hash shuffle will fetch its own partitions from maps as usually. While fetching, the records will be further partitioned (secondary level partition) by a universal hash function. By properly choosing the number of the partitions, every single partition should be able to fit into the memory. For cases such as much skewed distribution of the keys, the size of a partition may be too large to fit into the memory. When this happens, a parameter can be used to control whether we simply choose to fail the job or to try further partition the large partition into smaller ones using another hash function.
3. Once all the data are fetched and partitioned at reduce side, it starts iterating. A RawKeyValueIterator will be wrapped to process and iterating the partitions one by one. The processing for each partition is to load the partition into memory and a hash table can be built. And an iterator will be wrapped on the hash table to feed reduce the groups of keys and values in the hash table.
Although there are some JIRAs related in using hash in MapReduce, the process proposed here has some fundamental differences with them. MAPREDUCE-1639 (Grouping using hashing instead of sorting) is described to be replacement of map side sort only. MAPREDUCE-3247 (Add hash aggregation style data flow and/or new API) and MAPREDUCE-4039 (Sort Avoidance) are mostly focused on no sort map reduce and not trying to guarantee the computation model at the framework level. From the above process, this work is a complete hash based approach. Sort at map side and merge at reduce side are completely replaced by hash and guarantee the computation model of MapReduce.
While one potential affect to use hash without sorting is that MapReduce users should not depends on the order of different keys. The order of the keys are implied by the sort-merge process but will no longer implied when using hash for grouping keys.
This work is implemented based on the pluggable MapOutputCollector (Map side) and ShuffleConsumerPlugin (Reduce side) provided by MAPREDUCE-2454. There are no modifications to the existing MapReduce code and so keep the affect to the original implementation to minimum. The hash-based MapReduce is not used by default. To enable Hash-based MapReduce, set “mapreduce.job.map.output.collector.class” to HashMapOutputCollector class and “mapreduce.job.reduce.shuffle.consumer.plugin.class” to HashShuffle class.