Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: 0.23.2
    • Fix Version/s: 0.23.2
    • Component/s: mrv2
    • Labels:
      None

      Description

      Inspired by Tenzing, in 5.1 MapReduce Enhanceemtns:

      Sort Avoidance. Certain operators such as hash join
      and hash aggregation require shuffling, but not sorting. The
      MapReduce API was enhanced to automatically turn off
      sorting for these operations. When sorting is turned off, the
      mapper feeds data to the reducer which directly passes the
      data to the Reduce() function bypassing the intermediate
      sorting step. This makes many SQL operators significantly
      more ecient.

      There are a lot of applications which need aggregation only, not sorting.Using sorting to achieve aggregation is costly and inefficient. Without sorting, up application can make use of hash table or hash map to do aggregation efficiently.But application should bear in mind that reduce memory is limited, itself is committed to manage memory of reduce, guard against out of memory. Map-side combiner is not supported, you can also do hash aggregation in map side as a workaround.

      the following is the main points of sort avoidance implementation

      1. add a configuration parameter mapreduce.sort.avoidance, boolean type, to turn on/off sort avoidance workflow.Two type of workflow are coexist together.
      2. key/value pairs emitted by map function is sorted by partition only, using a more efficient sorting algorithm: counting sort.
      3. map-side merge, use a kind of byte merge, which just concatenate bytes from generated spills, read in bytes, write out bytes, without overhead of key/value serialization/deserailization, comparison, which current version incurs.
      4. reduce can start up as soon as there is any map output available, in contrast to sort workflow which must wait until all map outputs are fetched and merged.
      5. map output in memory can be directly consumed by reduce.When reduce can't catch up with the speed of incoming map outputs, in-memory merge thread will kick in, merging in-memory map outputs onto disk.
      6. sequentially read in on-disk files to feed reduce, in contrast to currently implementation which read multiple files concurrently, result in many disk seek. Map output in memory take precedence over on disk files in feeding reduce function.

      I have already implement this feature based on hadoop CDH3U3 and done some performance evaluation, you can reference to https://github.com/hanborq/hadoop for details. Now,I'm willing to port it into yarn. Welcome for commenting.

      1. MAPREDUCE-4039-branch-0.23.2.patch
        41 kB
        anty.rao
      2. IndexedCountingSortable.java
        4 kB
        anty.rao
      3. MAPREDUCE-4039-branch-0.23.2.patch
        39 kB
        anty.rao
      4. MAPREDUCE-4039-branch-0.23.2.patch
        39 kB
        anty.rao

        Issue Links

          Activity

          Hide
          Schubert Zhang added a comment -

          Nice summary Anty, and expect your patch on 0.23.

          The more detailed description and benchmark on this feature, you can refer to http://www.slideshare.net/hanborq/hanborq-optimizations-on-hadoop-mapreduce-20120216a or http://www.slideshare.net/schubertzhang/hanborq-optimizations-on-hadoop-map-reduce-20120221a

          Show
          Schubert Zhang added a comment - Nice summary Anty, and expect your patch on 0.23. The more detailed description and benchmark on this feature, you can refer to http://www.slideshare.net/hanborq/hanborq-optimizations-on-hadoop-mapreduce-20120216a or http://www.slideshare.net/schubertzhang/hanborq-optimizations-on-hadoop-map-reduce-20120221a
          Hide
          Arun C Murthy added a comment -

          +1, looks like a good addition.

          We should be careful to ensure we document this adequately.

          Show
          Arun C Murthy added a comment - +1, looks like a good addition. We should be careful to ensure we document this adequately.
          Hide
          anty.rao added a comment -

          I am a little confused about the implementation of Reader of IFile.
          In previous hadoop version, IFile reader will read in a bunch of key/value pairs from the disk one time, then serve it directly from in memory.I think this strategy is common and good.However, in yarn for each requested key/value pairs reader will go hit the disk(though pre-read will do some help). Am i miss something?Can someone shed light on me?

          Show
          anty.rao added a comment - I am a little confused about the implementation of Reader of IFile. In previous hadoop version, IFile reader will read in a bunch of key/value pairs from the disk one time, then serve it directly from in memory.I think this strategy is common and good.However, in yarn for each requested key/value pairs reader will go hit the disk(though pre-read will do some help). Am i miss something?Can someone shed light on me?
          Hide
          Schubert Zhang added a comment -

          Patch is available by Anty, someone to have a review?

          Show
          Schubert Zhang added a comment - Patch is available by Anty, someone to have a review?
          Hide
          Arun C Murthy added a comment -

          Can someone shed light on me?

          Anty - IFile.Reader in hadoop-0.23 uses IFileInputStream which does the 'full' read into a in-memory buffer similar to what was happening in IFile.Reader in hadoop-0.20.xxx and should give similar characteristics. Makes sense?

          Show
          Arun C Murthy added a comment - Can someone shed light on me? Anty - IFile.Reader in hadoop-0.23 uses IFileInputStream which does the 'full' read into a in-memory buffer similar to what was happening in IFile.Reader in hadoop-0.20.xxx and should give similar characteristics. Makes sense?
          Hide
          Arun C Murthy added a comment -

          Anty - I spent a bit of time thinking of this.

          Some early thoughts... seems to me that we are better of doing a bit of surgery on the MR runtime before we do this.

          We could consider making the MapOutputBuffer pluggable for a start so we can split the 'full sort' and the 'hash sort' implementations. Similarly, we could implement a pluggable Shuffle to not block until outputs of all maps are not available.

          This way we can cleanly layer the necessary features.

          Thoughts?

          Show
          Arun C Murthy added a comment - Anty - I spent a bit of time thinking of this. Some early thoughts... seems to me that we are better of doing a bit of surgery on the MR runtime before we do this. We could consider making the MapOutputBuffer pluggable for a start so we can split the 'full sort' and the 'hash sort' implementations. Similarly, we could implement a pluggable Shuffle to not block until outputs of all maps are not available. This way we can cleanly layer the necessary features. Thoughts?
          Hide
          anty.rao added a comment -

          Arun, Thanks for you reply!

          Anty - IFile.Reader in hadoop-0.23 uses IFileInputStream which does the 'full' read into a in-memory buffer similar to what was happening in IFile.Reader in hadoop-0.20.xxx and should give similar characteristics. Makes sense?

          IFileInputStream only do data checksum not data buffering . Data buffering is enforced on IFile.Reader level in hadoop-0.20.xxx
          1. In hadoop-0.20.xxx, a bunch of data is first read into a byte array

              byte[] buffer = null;

          the initial size of this buffer is controlled by the parameter io.file.buffer.size. Then key/value pair is read from this in-memory buffer.
          2. In hadoop-0.23, this variable buffer still exist ,but isn't in use actually. So, each key/value pair is read directly from the underling IFileInputStream.

          Show
          anty.rao added a comment - Arun, Thanks for you reply! Anty - IFile.Reader in hadoop-0.23 uses IFileInputStream which does the 'full' read into a in-memory buffer similar to what was happening in IFile.Reader in hadoop-0.20.xxx and should give similar characteristics. Makes sense? IFileInputStream only do data checksum not data buffering . Data buffering is enforced on IFile.Reader level in hadoop-0.20.xxx 1. In hadoop-0.20.xxx, a bunch of data is first read into a byte array byte [] buffer = null ; the initial size of this buffer is controlled by the parameter io.file.buffer.size . Then key/value pair is read from this in-memory buffer. 2. In hadoop-0.23, this variable buffer still exist ,but isn't in use actually. So, each key/value pair is read directly from the underling IFileInputStream.
          Hide
          anty.rao added a comment -

          We could consider making the MapOutputBuffer pluggable for a start so we can split the 'full sort' and the 'hash sort' implementations. Similarly, we could implement a pluggable Shuffle to not block until outputs of all maps are not available.

          This way we can cleanly layer the necessary features.

          Thoughts?

          Arun, I like the idea of making MapOutputBuffer and Shuffle pluggable.I will take a crack at it, and give the feedback.

          Show
          anty.rao added a comment - We could consider making the MapOutputBuffer pluggable for a start so we can split the 'full sort' and the 'hash sort' implementations. Similarly, we could implement a pluggable Shuffle to not block until outputs of all maps are not available. This way we can cleanly layer the necessary features. Thoughts? Arun, I like the idea of making MapOutputBuffer and Shuffle pluggable.I will take a crack at it, and give the feedback.
          Hide
          longfei added a comment -

          The patch Did not give the class org.apache.hadoop.util.IndexedCountingSortable.

          Show
          longfei added a comment - The patch Did not give the class org.apache.hadoop.util.IndexedCountingSortable.
          Hide
          Kang Xiao added a comment -

          @Schubert, could you give some typical applications that benefit from sort avoidance? It seems that using this feature simple aggregation app such as wordcount will use more memory to wait for all keys processed.

          Show
          Kang Xiao added a comment - @Schubert, could you give some typical applications that benefit from sort avoidance? It seems that using this feature simple aggregation app such as wordcount will use more memory to wait for all keys processed.
          Hide
          anty.rao added a comment -

          the missing file.

          Show
          anty.rao added a comment - the missing file.
          Hide
          anty.rao added a comment -

          @Kang
          Yes, you are right.
          Using merge-sort to achieve aggregation maybe don't use so much memory as hash aggregation with this feature.But the process of merge-sort require much useless work to done, consume more resources, e.g. CPU, disk, network.
          it's just a tradeoff according to your usecase, latency requirement, etc.

          Show
          anty.rao added a comment - @Kang Yes, you are right. Using merge-sort to achieve aggregation maybe don't use so much memory as hash aggregation with this feature.But the process of merge-sort require much useless work to done, consume more resources, e.g. CPU, disk, network. it's just a tradeoff according to your usecase, latency requirement, etc.
          Hide
          Ahmed Radwan added a comment -

          Thanks Anty! a couple of initial comments:

          • IndexedCountingSortable.java doesn't compile (seems the the same class contents are pasted twice). I think you want to add this file to the location: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IndexedCountingSortable.java. It'll be easier to create a new diff including this new file and upload an updated patch.
          • The patch also doesn't include any new or updated unit tests.
          Show
          Ahmed Radwan added a comment - Thanks Anty! a couple of initial comments: IndexedCountingSortable.java doesn't compile (seems the the same class contents are pasted twice). I think you want to add this file to the location: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IndexedCountingSortable.java. It'll be easier to create a new diff including this new file and upload an updated patch. The patch also doesn't include any new or updated unit tests.
          Hide
          anty.rao added a comment -

          @Ahmed Radwan
          Sorry for the delay
          Submit a new batch.

          Show
          anty.rao added a comment - @Ahmed Radwan Sorry for the delay Submit a new batch.
          Hide
          Ahmed Radwan added a comment -

          Thanks Anty! I'll be happy to review your patch. I have noticed that the updated patch still have no unit tests, are you planning to add them? Also, will you be providing a corresponding patch for branch-1?

          Show
          Ahmed Radwan added a comment - Thanks Anty! I'll be happy to review your patch. I have noticed that the updated patch still have no unit tests, are you planning to add them? Also, will you be providing a corresponding patch for branch-1?
          Hide
          anty.rao added a comment -

          Ahmed,thank you for spacing time to review this patch.
          But, when sort plugin is finished, this patch will need be modified.

          Show
          anty.rao added a comment - Ahmed,thank you for spacing time to review this patch. But, when sort plugin is finished, this patch will need be modified.
          Hide
          Ahmed Radwan added a comment -

          when sort plugin is finished, this patch will need be modified.

          Sure Anty, are you already working on updating the patch with pluggable MapOutputBuffer and Shuffle then?

          Show
          Ahmed Radwan added a comment - when sort plugin is finished, this patch will need be modified. Sure Anty, are you already working on updating the patch with pluggable MapOutputBuffer and Shuffle then?
          Hide
          Ahmed Radwan added a comment -

          Hi Anty, Do you think you'll be able to update the patch and add the unit tests? With a new feature like that, unit tests are essential. Please let us know if you need any help.

          Show
          Ahmed Radwan added a comment - Hi Anty, Do you think you'll be able to update the patch and add the unit tests? With a new feature like that, unit tests are essential. Please let us know if you need any help.
          Hide
          anty.rao added a comment -

          Hi Ahmed

          Sure Anty, are you already working on updating the patch with pluggable MapOutputBuffer and Shuffle then?

          No, the sort plugin feature seems not completed, So i don't start out

          Hi Anty, Do you think you'll be able to update the patch and add the unit tests? With a new feature like that, unit tests are essential. Please let us know if you need any help.

          I will add some unit tests.

          Show
          anty.rao added a comment - Hi Ahmed Sure Anty, are you already working on updating the patch with pluggable MapOutputBuffer and Shuffle then? No, the sort plugin feature seems not completed, So i don't start out Hi Anty, Do you think you'll be able to update the patch and add the unit tests? With a new feature like that, unit tests are essential. Please let us know if you need any help. I will add some unit tests.
          Hide
          Mariappan Asokan added a comment -

          Hi Anty,
          I have submitted a patch for MAPREDUCE-2454 to support a pluggable sort for MR. I have some ideas on implementing a NullSortPlugin which will be a special sort plugin that avoids sorting. The NullSortPlugin can live outside the Hadoop MR code. I can share my thoughts if you are interested.
          Thanks.
          – Asokan

          Show
          Mariappan Asokan added a comment - Hi Anty, I have submitted a patch for MAPREDUCE-2454 to support a pluggable sort for MR. I have some ideas on implementing a NullSortPlugin which will be a special sort plugin that avoids sorting. The NullSortPlugin can live outside the Hadoop MR code. I can share my thoughts if you are interested. Thanks. – Asokan
          Hide
          anty.rao added a comment -

          Hi: Asokan
          Of course,i'm interested.Glad that you can share your thoughts!

          Show
          anty.rao added a comment - Hi: Asokan Of course,i'm interested.Glad that you can share your thoughts!
          Hide
          Mariappan Asokan added a comment -

          Hi Anty,
          Sorry, I did not get back to you. Please take a look at the patch for MAPREDUCE-2454. I added a test that has a very simple implementation of NullSortPlugin. You can take a look at the code there.

          – Asokan

          Show
          Mariappan Asokan added a comment - Hi Anty, Sorry, I did not get back to you. Please take a look at the patch for MAPREDUCE-2454 . I added a test that has a very simple implementation of NullSortPlugin. You can take a look at the code there. – Asokan
          Hide
          Mariappan Asokan added a comment -

          Hi Anty,
          Now that MAPREDUCE-4809, MAPREDUCE-4807, MAPREDUCE-4808, and MAPREDUCE-4049 are all committed, it is possible to implement sort avoidance as implementation of plugins for MapOutputCollector and ShuffleConsumerPlugin with a special implementation of MergeManager.

          If you don't mind, I can assign this Jira to me and work on it. Please let me know.

          Thanks.

          – Asokan

          Show
          Mariappan Asokan added a comment - Hi Anty, Now that MAPREDUCE-4809 , MAPREDUCE-4807 , MAPREDUCE-4808 , and MAPREDUCE-4049 are all committed, it is possible to implement sort avoidance as implementation of plugins for MapOutputCollector and ShuffleConsumerPlugin with a special implementation of MergeManager. If you don't mind, I can assign this Jira to me and work on it. Please let me know. Thanks. – Asokan
          Hide
          anty.rao added a comment -

          Mariappan Asokan
          You can get on with it!
          Looking forward to seeing this feature incorporated to trunk.

          Show
          anty.rao added a comment - Mariappan Asokan You can get on with it! Looking forward to seeing this feature incorporated to trunk.
          Hide
          Arun C Murthy added a comment -

          it is possible to implement sort avoidance as implementation of plugins for MapOutputCollector and ShuffleConsumerPlugin with a special implementation of MergeManager

          Asokan - as I've mentioned before several times on other jiras, I believe this is the wrong way to go about doing adding this feature. Happy to discuss alternates.

          Show
          Arun C Murthy added a comment - it is possible to implement sort avoidance as implementation of plugins for MapOutputCollector and ShuffleConsumerPlugin with a special implementation of MergeManager Asokan - as I've mentioned before several times on other jiras, I believe this is the wrong way to go about doing adding this feature. Happy to discuss alternates.
          Hide
          Mariappan Asokan added a comment -

          Hi Arun,
          If you have alternate ideas, please feel free to post. We can discuss about them.

          Thanks.

          – Asokan

          Show
          Mariappan Asokan added a comment - Hi Arun, If you have alternate ideas, please feel free to post. We can discuss about them. Thanks. – Asokan

            People

            • Assignee:
              anty
              Reporter:
              anty.rao
            • Votes:
              3 Vote for this issue
              Watchers:
              46 Start watching this issue

              Dates

              • Created:
                Updated:

                Development