Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      most applications of map-reduce care about grouping and not sorting. Sorting is a (relatively expensive) way to achieve grouping. In order to achieve just grouping - one can:

      • replace the sort on the Mappers with a HashTable - and maintain lists of key-values against each hash-bucket.
      • key-value tuples inside each hash bucket are sorted - before spilling or sending to Reducer. Anytime this is done - Combiner can be invoked.
      • HashTable is serialized by hash-bucketid. So merges (of either spills or Map Outputs) works similar to today (at least there's no change in overall compute complexity of merge)

      Of course this hashtable has nothing to do with partitioning. it's just a replacement for map-side sort.

      this is (pretty much) straight from the MARS project paper: http://www.cse.ust.hk/catalac/papers/mars_pact08.pdf. They report a 45% speedup in inverted index calculation using hashing instead of sorting (reference implementation is NOT against Hadoop though).

        Issue Links

          Activity

          Hide
          Owen O'Malley added a comment -

          Three comments spring to mind:
          1. A completely new data path is very expensive.
          2. A much easier way to achieve their result is to include the hash in the key as the first 4 bytes of the serialization, then have a comparator that uses the first 4 bytes as memcmp before it uses the dispatched raw compare method.
          3. Having a library of memcmp'able keys would be even better. Basically, you'd need to implement org.apache.hadoop.io.memcmp.(Int|Long|String|Array) that implemented Writable and a marker interface.

          Show
          Owen O'Malley added a comment - Three comments spring to mind: 1. A completely new data path is very expensive. 2. A much easier way to achieve their result is to include the hash in the key as the first 4 bytes of the serialization, then have a comparator that uses the first 4 bytes as memcmp before it uses the dispatched raw compare method. 3. Having a library of memcmp'able keys would be even better. Basically, you'd need to implement org.apache.hadoop.io.memcmp.(Int|Long|String|Array) that implemented Writable and a marker interface.
          Hide
          Joydeep Sen Sarma added a comment -

          realize this is non-trivial.

          > 2. A much easier way to achieve their result is to include the hash in the key as the first 4 bytes of the serialization, then have a comparator that uses the first 4 bytes as memcmp before it uses the dispatched raw compare method.

          not sure about this. it doesn't change the fact that all keys are being compared against each other. just because we compare fewer bytes doesn't mean that the number of comparisons goes down. it's still n.logn. the cost of each comparison is lower though.

          if we did a radix sort (first) on the leading 4 (hashed) bytes - then yeah - it changes things. but at that point we are taking (pretty much) the same approach as the paper and talking about a different data path.

          Show
          Joydeep Sen Sarma added a comment - realize this is non-trivial. > 2. A much easier way to achieve their result is to include the hash in the key as the first 4 bytes of the serialization, then have a comparator that uses the first 4 bytes as memcmp before it uses the dispatched raw compare method. not sure about this. it doesn't change the fact that all keys are being compared against each other. just because we compare fewer bytes doesn't mean that the number of comparisons goes down. it's still n.logn. the cost of each comparison is lower though. if we did a radix sort (first) on the leading 4 (hashed) bytes - then yeah - it changes things. but at that point we are taking (pretty much) the same approach as the paper and talking about a different data path.
          Hide
          Joydeep Sen Sarma added a comment -

          following up on the previous comment - reading the paper again closely - it's no longer clear to me that the paper actually implements what i filed or whether it's closer to how Owen interpreted it. But i think it's true that sorting only within hash buckets only is fundamentally lower complexity than sorting everything together.

          i am also wondering how different this is from current data path. today we anyway bucket map outputs into one bucket per reducer and sort therein. this is simply saying that we have lots of buckets per reducer (not one) - we are still sorting inside each of those - but because we are sorting much smaller sets - the complexity is lowered.

          so on the mapper side - we can (sort of) just pretend that the number of reducers is much much higher. Except when it's time to send off the data to the reducer (or spill) - we send a bunch of buckets. but this is not transparent to the merge code - it now has to merge taking the bucketid into consideration. if we prefixed each map-output key with the bucketid - then it's transparent to the merge code. so definitely some changes required - but maybe tractable?

          Show
          Joydeep Sen Sarma added a comment - following up on the previous comment - reading the paper again closely - it's no longer clear to me that the paper actually implements what i filed or whether it's closer to how Owen interpreted it. But i think it's true that sorting only within hash buckets only is fundamentally lower complexity than sorting everything together. — i am also wondering how different this is from current data path. today we anyway bucket map outputs into one bucket per reducer and sort therein. this is simply saying that we have lots of buckets per reducer (not one) - we are still sorting inside each of those - but because we are sorting much smaller sets - the complexity is lowered. so on the mapper side - we can (sort of) just pretend that the number of reducers is much much higher. Except when it's time to send off the data to the reducer (or spill) - we send a bunch of buckets. but this is not transparent to the merge code - it now has to merge taking the bucketid into consideration. if we prefixed each map-output key with the bucketid - then it's transparent to the merge code. so definitely some changes required - but maybe tractable?
          Hide
          dhruba borthakur added a comment -

          So, then are you saying that thsi is equivalent to just increasing the number of reducers? And make each reducer receive the records from one bucket.

          Show
          dhruba borthakur added a comment - So, then are you saying that thsi is equivalent to just increasing the number of reducers? And make each reducer receive the records from one bucket.
          Hide
          Joydeep Sen Sarma added a comment -

          no - the number of reducers is determined by other criteria (people here are expert on determining how to come up with that). we obviously can't have millions of reducers. but we can have millions of hash buckets that are sent in (large) groups to a much much smaller number of reducers.

          i explained in terms of larger number of reducers to explore similarities (if any) to current data path.

          Show
          Joydeep Sen Sarma added a comment - no - the number of reducers is determined by other criteria (people here are expert on determining how to come up with that). we obviously can't have millions of reducers. but we can have millions of hash buckets that are sent in (large) groups to a much much smaller number of reducers. i explained in terms of larger number of reducers to explore similarities (if any) to current data path.
          Hide
          Luke Lu added a comment -

          IMHO, it'd be easier to implement such grouping using a special group record reader (inputformat) and mapper (that uses a hash map to group things and walk the map to emit grouped key value pairs, where values can be counts or lists, at the end of the map method) than rearchitecting the current data path.

          Show
          Luke Lu added a comment - IMHO, it'd be easier to implement such grouping using a special group record reader (inputformat) and mapper (that uses a hash map to group things and walk the map to emit grouped key value pairs, where values can be counts or lists, at the end of the map method) than rearchitecting the current data path.
          Hide
          Joydeep Sen Sarma added a comment -

          @Luke - like this line of thinking. I am familiar with Hive - it already does map side partial aggregates for (most) group-bys and emits them (what you mentioned). However - for (most) joins and cluster-by - it depends on the sorting provided by the map-reduce framework. We could alter the these query plans to execute this algorithm. the downside is pretty obvious: the benefits would only be available to Hive users (even as the cost of implementation remains fairly high). it would not be reusable by Pig for example - nor streaming users, etc.

          extrapolating ur idea a bit - we could try to implement this as a library - something that intercepts the output of the mapper (and combiner) and the input to the reduce function (that could be inserted into the data path via config settings). (There's already a configurable output collector interface on the map output side. not sure about reducer input). We could turn off map-side sorting in the map-reduce framework itself. the main concern i would have is that it would seem too hard for these output collectors to implements spills (/merges) to (/from) disk. Perhaps if that functionality can be extracted out of the map-reduce core and provided as a library that these output collectors can use - then it's all feasible.

          Show
          Joydeep Sen Sarma added a comment - @Luke - like this line of thinking. I am familiar with Hive - it already does map side partial aggregates for (most) group-bys and emits them (what you mentioned). However - for (most) joins and cluster-by - it depends on the sorting provided by the map-reduce framework. We could alter the these query plans to execute this algorithm. the downside is pretty obvious: the benefits would only be available to Hive users (even as the cost of implementation remains fairly high). it would not be reusable by Pig for example - nor streaming users, etc. extrapolating ur idea a bit - we could try to implement this as a library - something that intercepts the output of the mapper (and combiner) and the input to the reduce function (that could be inserted into the data path via config settings). (There's already a configurable output collector interface on the map output side. not sure about reducer input). We could turn off map-side sorting in the map-reduce framework itself. the main concern i would have is that it would seem too hard for these output collectors to implements spills (/merges) to (/from) disk. Perhaps if that functionality can be extracted out of the map-reduce core and provided as a library that these output collectors can use - then it's all feasible.
          Hide
          Arun C Murthy added a comment -

          This is a great candidate for MR2.

          It's a new pipeline which would be the most efficient though:

          The output collector would hash rather than sort and spill in order of keys, thus keeping the combiner optional.

          The twist is that you wouldn't do a 2nd or 3rd or n-th level merge in the map. Just the segments out and get the reduce to think there are more segments than #maps (additional index at the top). Most of the times, each map-output fits in memory of the reduce and thus you wouldn't seek anymore than today. The 2+ level merges don't change in the reduce.

          Thoughts?

          Show
          Arun C Murthy added a comment - This is a great candidate for MR2. It's a new pipeline which would be the most efficient though: The output collector would hash rather than sort and spill in order of keys, thus keeping the combiner optional. The twist is that you wouldn't do a 2nd or 3rd or n-th level merge in the map. Just the segments out and get the reduce to think there are more segments than #maps (additional index at the top). Most of the times, each map-output fits in memory of the reduce and thus you wouldn't seek anymore than today. The 2+ level merges don't change in the reduce. Thoughts?
          Hide
          He Yongqiang added a comment -

          something that may help get more discussions:

          we are trying to experiment on a new output collector. Here are some of our thoughts:
          1) group key, value in a memoryblock which is basically start-end pointer to the big kvbuffer. One memoryblock must belong to one reducer.
          2) use quicksort to sort data in memoryblock
          3) use binaryinsert sort when doing spill. in this phase, since memoryblocks are grouped by reducer already, so this will not sort memoryblocks across reducers

          On group by and join, if not enforced sorting, only a grouping is needed, no need for a global sort.
          On the mapper side:
          have an hashtable for memoryblocks. And use the hash to decide which memory block to go. this will only help reduce number of sort across memoryblocks, but will not eliminate them. This is because of memory constrain(in which case we need to borrow memory from other memory block) and collision.

          On the reduce side:
          all mappers are applying the same rule, so can add some metadata for each mapper's output to help reducer side decide whether or not need to compare.

          Show
          He Yongqiang added a comment - something that may help get more discussions: we are trying to experiment on a new output collector. Here are some of our thoughts: 1) group key, value in a memoryblock which is basically start-end pointer to the big kvbuffer. One memoryblock must belong to one reducer. 2) use quicksort to sort data in memoryblock 3) use binaryinsert sort when doing spill. in this phase, since memoryblocks are grouped by reducer already, so this will not sort memoryblocks across reducers On group by and join, if not enforced sorting, only a grouping is needed, no need for a global sort. On the mapper side: have an hashtable for memoryblocks. And use the hash to decide which memory block to go. this will only help reduce number of sort across memoryblocks, but will not eliminate them. This is because of memory constrain(in which case we need to borrow memory from other memory block) and collision. On the reduce side: all mappers are applying the same rule, so can add some metadata for each mapper's output to help reducer side decide whether or not need to compare.
          Hide
          He Yongqiang added a comment -

          with 1) 2) and 3), in some testcases, we are seeing 20%-40% CPU saving on the mapper side. And it helps a lot to reduce the cpu used for mem sort. But we are definitely doing more tests on this.

          Show
          He Yongqiang added a comment - with 1) 2) and 3), in some testcases, we are seeing 20%-40% CPU saving on the mapper side. And it helps a lot to reduce the cpu used for mem sort. But we are definitely doing more tests on this.
          Hide
          Todd Lipcon added a comment -

          Would be curious to see what the CPU impact of introducing a faster raw comparator would be. See HBASE-4012 for one optimization that would be easy to try out.

          Show
          Todd Lipcon added a comment - Would be curious to see what the CPU impact of introducing a faster raw comparator would be. See HBASE-4012 for one optimization that would be easy to try out.
          Hide
          He Yongqiang added a comment -

          yeah, actually we tried that with the native comparator code from Baidu (see patches for HCE), and the difference is not very big, sometimes is worse (maybe because of some jni cost, but havn't look into it).

          Show
          He Yongqiang added a comment - yeah, actually we tried that with the native comparator code from Baidu (see patches for HCE), and the difference is not very big, sometimes is worse (maybe because of some jni cost, but havn't look into it).
          Hide
          Todd Lipcon added a comment -

          The JNI cost makes sense, but the linked HBase JIRA doesn't use JNI. It uses sun.misc.unsafe calls which are actually JVM intrinsics (ie they get directly compiled into assembly, rather than going through the whole calling-convention + safepoint shenanigans that JNI does)

          Show
          Todd Lipcon added a comment - The JNI cost makes sense, but the linked HBase JIRA doesn't use JNI. It uses sun.misc.unsafe calls which are actually JVM intrinsics (ie they get directly compiled into assembly, rather than going through the whole calling-convention + safepoint shenanigans that JNI does)
          Hide
          Binglin Chang added a comment -

          we are trying to experiment on a new output collector. Here are some of our thoughts:

          Nice work, I'm very interested in how it is done exactly
          Actually I'm considering further optimizations, not just grouping, but to do "foldl" style aggregation operations directly in HashTable liked data structure at map output collector stage and reducer side.
          It seems hyracks already to that, and google mentioned this in the paper "Tenzing A SQL Implementation On The MapReduce Framework".

          Show
          Binglin Chang added a comment - we are trying to experiment on a new output collector. Here are some of our thoughts: Nice work, I'm very interested in how it is done exactly Actually I'm considering further optimizations, not just grouping, but to do "foldl" style aggregation operations directly in HashTable liked data structure at map output collector stage and reducer side. It seems hyracks already to that, and google mentioned this in the paper "Tenzing A SQL Implementation On The MapReduce Framework".
          Hide
          Todd Lipcon added a comment -

          Just added MAPREDUCE-3235 and HADOOP-7761 as related JIRAs. The combination of those got a 40% CPU speedup on terasort in my tests.

          Show
          Todd Lipcon added a comment - Just added MAPREDUCE-3235 and HADOOP-7761 as related JIRAs. The combination of those got a 40% CPU speedup on terasort in my tests.
          Hide
          He Yongqiang added a comment -

          @Binglin, cool. Can you generate a patch based on the facebook hadoop github repository when you are done?

          Show
          He Yongqiang added a comment - @Binglin, cool. Can you generate a patch based on the facebook hadoop github repository when you are done?
          Hide
          He Yongqiang added a comment -

          @Binglin, we will first try to deploy the code internally, and then will try to push the code to fb hadoop github (or sent you offline when it is almost done), and maybe you can do more improvements on that.

          Show
          He Yongqiang added a comment - @Binglin, we will first try to deploy the code internally, and then will try to push the code to fb hadoop github (or sent you offline when it is almost done), and maybe you can do more improvements on that.
          Hide
          Binglin Chang added a comment -

          @YongQiang
          Thanks!
          Originally, I plan to support grouping in nativetask, but after optimizing sort, I found the sort phrase no longer a bottleneck anymore, especially in large jobs with many reduce tasks. In fact the current sort implementation is not optimized at all(just use stl::sort), so replacing sort with grouping may not have a big effect after sort is optimized. Above all, I prefer hash aggregation, the bad thing about it is it must change combiner/reducer api, this won't be a problem for the under dev nativetask, but is much complicated in java, I will create a issue for discussion.
          I prefer sent to me offline, can't wait to see

          Show
          Binglin Chang added a comment - @YongQiang Thanks! Originally, I plan to support grouping in nativetask, but after optimizing sort, I found the sort phrase no longer a bottleneck anymore, especially in large jobs with many reduce tasks. In fact the current sort implementation is not optimized at all(just use stl::sort), so replacing sort with grouping may not have a big effect after sort is optimized. Above all, I prefer hash aggregation, the bad thing about it is it must change combiner/reducer api, this won't be a problem for the under dev nativetask, but is much complicated in java, I will create a issue for discussion. I prefer sent to me offline, can't wait to see
          Hide
          Binglin Chang added a comment -

          I created MAPREDUCE-3247 for the discussion about hashtable based join/aggregation.

          Show
          Binglin Chang added a comment - I created MAPREDUCE-3247 for the discussion about hashtable based join/aggregation.
          Hide
          alex gemini added a comment -

          spark-and-shark also said it has a hash based reduce faster then hadoop sort on hadoop summit 2012 presentation.

          Show
          alex gemini added a comment - spark-and-shark also said it has a hash based reduce faster then hadoop sort on hadoop summit 2012 presentation.
          Hide
          alex gemini added a comment -

          I guess main point is we need a per-chunk comparison instead of a per-record comparison whether is based on hash (like this jira suggested) or minor range (like google tenzing's block shuffle).

          Show
          alex gemini added a comment - I guess main point is we need a per-chunk comparison instead of a per-record comparison whether is based on hash (like this jira suggested) or minor range (like google tenzing's block shuffle).
          Hide
          Jerry Chen added a comment -

          +1 I think this feature is valuable and I would take time to work on this. The hash based algorithm can both used for group by and for join. Both of them are not requiring a global sort.

          Show
          Jerry Chen added a comment - +1 I think this feature is valuable and I would take time to work on this. The hash based algorithm can both used for group by and for join. Both of them are not requiring a global sort.
          Hide
          Jerry Chen added a comment -

          MAPREDUCE-2454 is dealing with a new feature which support pluggable sort alogrithm. I think these feature would better based on that piece of work.

          Show
          Jerry Chen added a comment - MAPREDUCE-2454 is dealing with a new feature which support pluggable sort alogrithm. I think these feature would better based on that piece of work.

            People

            • Assignee:
              Unassigned
              Reporter:
              Joydeep Sen Sarma
            • Votes:
              1 Vote for this issue
              Watchers:
              43 Start watching this issue

              Dates

              • Created:
                Updated:

                Development