Details

    • Type: Improvement Improvement
    • Status: Patch Available
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: task
    • Labels:
      None
    • Environment:

      x86-64 Linux/Unix

    • Release Note:
      Task level native optimization
    • Tags:
      optimization task

      Description

      I'm recently working on native optimization for MapTask based on JNI.

      The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:

      1. Sort is about 3x-10x as fast as java(only binary string compare is supported)

      2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/

      3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill

      This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used

      There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome

      Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.

      This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too.

      1. fb-shuffle.patch
        76 kB
        Todd Lipcon
      2. DESIGN.html
        42 kB
        Binglin Chang
      3. MAPREDUCE-2841.v2.patch
        190 kB
        Binglin Chang
      4. dualpivotv20-0.patch
        4 kB
        Chris Douglas
      5. dualpivot-0.patch
        5 kB
        Chris Douglas
      6. MAPREDUCE-2841.v1.patch
        180 kB
        Binglin Chang

        Issue Links

          Activity

          Hide
          Binglin Chang added a comment -

          I just submit a demo patch to show basic ideas and current progress.
          This patch contains map task optimization using NativeMapOutputCollector.

          NativeMapOutputCollector use JNI to pass java k/v pairs and
          partition value to native side. As JNI has un-neglectable
          overhead(about 1000ns per empty JNI call wit 4-6 arguments),
          a k/v cache is added to pass k/v pairs to native side in batch.
          I suspect using a DirectBuffer will be better.

          On the native side, k/v pairs are put into partitioned buffers,
          this is different from java's single buffer approach. By doing so,
          sort can be much faster, because sort a big array is much slower
          than sort many small arrays; small array also means less cache
          miss; and partition number does not needed to be compared
          in sort.

          Two light weighted io buffers: ReadBuffer & AppendBuffer,
          which has better performance than decorator based
          java & hadoop io streams. This greatly benefits IFile
          serialization, many method can be inlined. Currently
          compression is not supported yet, but it should not be a
          complex work to add block based compressor like snappy or
          quicklz.

          This optimization is ONLY targeted for x86-64, little-endian
          and assumes unaligned 64-bit loads and stores are cheap,
          like google-snappy.

          About the patch:
          I put cpp code into src/c++/libnativetask, and use a Makefile
          separate from build.xml and src/native, because many things are
          not decided yet, so I don't want to mess up other components.

          1. cd src/c++/libnativetask
          2. make
          3. copy libnativetask.so to $HADOOP_HOME/lib/native/Linux-amd64-64/
          4. set mapred.native.map.output.collector to true in jobconf

          Again, this patch is just a demo, it is far from stable & complete,
          and has many known and unknown bugs.

          Here is some test results:
          1. running single task
          intel corei5 jdk6u24 on macbook pro
          input:
          size 250000000 bytes
          100 bytes per line
          [key 9bytes]\t[value 89bytes]\n
          KeyValueTextInputFormat
          IdentityMapper
          partition number 100
          io.sort.mb 400MB no mid-spill
          Total time and analysis:

            JAVA Native Speedup
          Sort 5.27 0.8 6.5
          IFile SER 2.8 0.9 3.1
          Total 14.13 6.34 2.2

          2. Some result about partitioned sort & spill
          Input is the same as test 1

          partition 1 10 20 40 80 200 400 800 1600 2000
          sort time 2.21 1.47 1.27 1.11 0.9 0.71 0.65 0.58 0.51 0.5
          spill time 0.49 0.43 0.41 0.39 0.36 0.33 0.32 0.32 0.29 0.28

          As is illustrated, partition number has great impact on sort & spill performance,
          This is largely because sort complexity and CPU cache effect.
          If partition number is P, recored count is N, we get:
          T = P * (N/P) * log(N/P) = N * log(N/P), so partition number matters.

          3. Terasort 10G input 40map 40reduce on 9node cluster
          io.sort.mb 500MB
          Results on jobhistory:

            Total AverageMap AverageShuffle AverageReduce
          java 54s 14s 14s 10s
          native 39s 7s 15s 9s
          java-lzo 43s 15s 8s 8s
          native-lzo ?      

          speedup: 1.38
          Map output compression can reduce shuffle time significantly,
          so with better compression speed & less shuffle time impact,
          We can get better speedup when map output compression is finished.
          I don't have large clusters to do more standard tests.

          Current progress:

          • NativeMapOutputCollector jni [done]
          • io buffers [done]
          • crc32/crc32c [done]
          • integration of jobconfs [done]
          • ifile writer [done]
          • write ifile index file [done]
          • ifile index reader [done]
          • ifile reader [done]
          • single-pass merge [done]
          • handle big key/value
            In current implementation, if key/value length exceeds
            buffer length, there will be crash, I will add big
            k/v handle soon.
          • support block compression snappy
          • multi-pass merge
          • local io counters
          • map-side combine
          • parallel spill & merge
            Java has a spill thread to do collect/spill
            concurrently. Currently NativeMapOutputCollelctor only
            use one thread, but parallel sort&spill&merge is possible,
            since k/v pairs are partitioned now. This can further
            speed up sort&spill&merge by simply adding threads.
          • reduce task optimization
          • support no sort
          • support grouping
          • support pipes API, streaming
          Show
          Binglin Chang added a comment - I just submit a demo patch to show basic ideas and current progress. This patch contains map task optimization using NativeMapOutputCollector. NativeMapOutputCollector use JNI to pass java k/v pairs and partition value to native side. As JNI has un-neglectable overhead(about 1000ns per empty JNI call wit 4-6 arguments), a k/v cache is added to pass k/v pairs to native side in batch. I suspect using a DirectBuffer will be better. On the native side, k/v pairs are put into partitioned buffers, this is different from java's single buffer approach. By doing so, sort can be much faster, because sort a big array is much slower than sort many small arrays; small array also means less cache miss; and partition number does not needed to be compared in sort. Two light weighted io buffers: ReadBuffer & AppendBuffer, which has better performance than decorator based java & hadoop io streams. This greatly benefits IFile serialization, many method can be inlined. Currently compression is not supported yet, but it should not be a complex work to add block based compressor like snappy or quicklz. This optimization is ONLY targeted for x86-64, little-endian and assumes unaligned 64-bit loads and stores are cheap, like google-snappy. About the patch: I put cpp code into src/c++/libnativetask, and use a Makefile separate from build.xml and src/native, because many things are not decided yet, so I don't want to mess up other components. cd src/c++/libnativetask make copy libnativetask.so to $HADOOP_HOME/lib/native/Linux-amd64-64/ set mapred.native.map.output.collector to true in jobconf Again, this patch is just a demo, it is far from stable & complete, and has many known and unknown bugs. Here is some test results: 1. running single task intel corei5 jdk6u24 on macbook pro input: size 250000000 bytes 100 bytes per line [key 9bytes] \t [value 89bytes] \n KeyValueTextInputFormat IdentityMapper partition number 100 io.sort.mb 400MB no mid-spill Total time and analysis:   JAVA Native Speedup Sort 5.27 0.8 6.5 IFile SER 2.8 0.9 3.1 Total 14.13 6.34 2.2 2. Some result about partitioned sort & spill Input is the same as test 1 partition 1 10 20 40 80 200 400 800 1600 2000 sort time 2.21 1.47 1.27 1.11 0.9 0.71 0.65 0.58 0.51 0.5 spill time 0.49 0.43 0.41 0.39 0.36 0.33 0.32 0.32 0.29 0.28 As is illustrated, partition number has great impact on sort & spill performance, This is largely because sort complexity and CPU cache effect. If partition number is P, recored count is N, we get: T = P * (N/P) * log(N/P) = N * log(N/P), so partition number matters. 3. Terasort 10G input 40map 40reduce on 9node cluster io.sort.mb 500MB Results on jobhistory:   Total AverageMap AverageShuffle AverageReduce java 54s 14s 14s 10s native 39s 7s 15s 9s java-lzo 43s 15s 8s 8s native-lzo ?       speedup: 1.38 Map output compression can reduce shuffle time significantly, so with better compression speed & less shuffle time impact, We can get better speedup when map output compression is finished. I don't have large clusters to do more standard tests. Current progress: NativeMapOutputCollector jni [done] io buffers [done] crc32/crc32c [done] integration of jobconfs [done] ifile writer [done] write ifile index file [done] ifile index reader [done] ifile reader [done] single-pass merge [done] handle big key/value In current implementation, if key/value length exceeds buffer length, there will be crash, I will add big k/v handle soon. support block compression snappy multi-pass merge local io counters map-side combine parallel spill & merge Java has a spill thread to do collect/spill concurrently. Currently NativeMapOutputCollelctor only use one thread, but parallel sort&spill&merge is possible, since k/v pairs are partitioned now. This can further speed up sort&spill&merge by simply adding threads. reduce task optimization support no sort support grouping support pipes API, streaming
          Hide
          Binglin Chang added a comment -

          This patch is for 0.20 branch

          Show
          Binglin Chang added a comment - This patch is for 0.20 branch
          Hide
          Binglin Chang added a comment -

          I think this work can help improving hadoop in many ways:

          1. Reduce total resource consumption, mainly CPU
          2. Speed up job execution, better response time
          3. More precise memory control, important feature in production system
          4. More programming interface, c/cpp, python, etc.
          5. Opens up further optimization possibility
          Show
          Binglin Chang added a comment - I think this work can help improving hadoop in many ways: Reduce total resource consumption, mainly CPU Speed up job execution, better response time More precise memory control, important feature in production system More programming interface, c/cpp, python, etc. Opens up further optimization possibility
          Hide
          MengWang added a comment -

          Good job!

          Show
          MengWang added a comment - Good job!
          Hide
          Scott Carey added a comment -

          I think there is quite a bit of juice to squeeze from the Java implementation. For example, Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort). Java can (and likely will in the 0.23 branch) also benefit from hardware CRC. After that, I wonder how much faster a native implementation would actually be.

          Show
          Scott Carey added a comment - I think there is quite a bit of juice to squeeze from the Java implementation. For example, Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort). Java can (and likely will in the 0.23 branch) also benefit from hardware CRC. After that, I wonder how much faster a native implementation would actually be.
          Hide
          Binglin Chang added a comment -

          Hi, Scott

          there is quite a bit of juice to squeeze from the Java implementation.

          I agree with that

          Show
          Binglin Chang added a comment - Hi, Scott there is quite a bit of juice to squeeze from the Java implementation. I agree with that
          Hide
          Binglin Chang added a comment -

          Has someone already done a benchmark of hadoop running on java 7 vs java 6, and share some results?
          I'm afraid I don't have enough resource to do standard benchmark, I can do some simple tests but may not convincing.

          Hadoop uses it's own QuickSort & HeapSort implementation and interface, if dual pivot quicksort & Timsort is much faster, I think we should do some test, and add it to hadoop(this does not require java7).

          The current implementation is very naive, and has a long way to be further optimized. For example, sort just use std::sort.

          The (very)long term goal for this work, is to provide a independent task-level native runtime and API. Users can use native api to develop applications, but java application also get part of the performance benefits. It opens up further optimization possibilities, both in framework and application layer.

          Show
          Binglin Chang added a comment - Has someone already done a benchmark of hadoop running on java 7 vs java 6, and share some results? I'm afraid I don't have enough resource to do standard benchmark, I can do some simple tests but may not convincing. Hadoop uses it's own QuickSort & HeapSort implementation and interface, if dual pivot quicksort & Timsort is much faster, I think we should do some test, and add it to hadoop(this does not require java7). The current implementation is very naive, and has a long way to be further optimized. For example, sort just use std::sort. The (very)long term goal for this work, is to provide a independent task-level native runtime and API. Users can use native api to develop applications, but java application also get part of the performance benefits. It opens up further optimization possibilities, both in framework and application layer.
          Hide
          Chris Douglas added a comment -

          Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort)

          As Binglin points out, that's easy to test given a rote translation. I ported the code here mechanically:

          http://permalink.gmane.org/gmane.comp.java.openjdk.core-libs.devel/2628


          The goal of the last few implementations was a predictable memory footprint. Per-partition buckets are obviously faster to sort, but record skew and internal fragmentation in the tracking structures (and overhead in Java objects) motivated packing records into a single buffer. Can you summarize how the memory management works in the current patch?

          It's exciting to see such a significant gain, here.

          Show
          Chris Douglas added a comment - Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort) As Binglin points out, that's easy to test given a rote translation. I ported the code here mechanically: http://permalink.gmane.org/gmane.comp.java.openjdk.core-libs.devel/2628 The goal of the last few implementations was a predictable memory footprint. Per-partition buckets are obviously faster to sort, but record skew and internal fragmentation in the tracking structures (and overhead in Java objects) motivated packing records into a single buffer. Can you summarize how the memory management works in the current patch? It's exciting to see such a significant gain, here.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12491995/dualpivotv20-0.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 3 new or modified tests.

          -1 patch. The patch command could not apply the patch.

          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/550//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12491995/dualpivotv20-0.patch against trunk revision . +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/550//console This message is automatically generated.
          Hide
          Binglin Chang added a comment -

          The goal of the last few implementations was a predictable memory footprint.

          Predictable memory foot print is very important, I strongly agree.
          Observations of our cluster's memory status shows that most OOM or high memory consumption
          cases are caused by big Key/Value in InputReader, Key/Value writable and merge. Sort&spill
          is much more stable.
          I think in many cases predictable memory control is enough, rather than precise memory control,
          since it's impractical. We can use some dynamic memory if it is in a predicable range,
          for example +/-20%, +-30%, etc.
          Just an idea, what if memory related configurations can be a random variable,
          with mean & variance? Can this leads to better resource utilization? A fixed memory bound
          always means application will request more memory than they really need.

          Show
          Binglin Chang added a comment - The goal of the last few implementations was a predictable memory footprint. Predictable memory foot print is very important, I strongly agree. Observations of our cluster's memory status shows that most OOM or high memory consumption cases are caused by big Key/Value in InputReader, Key/Value writable and merge. Sort&spill is much more stable. I think in many cases predictable memory control is enough, rather than precise memory control, since it's impractical. We can use some dynamic memory if it is in a predicable range, for example +/-20%, +-30%, etc. Just an idea, what if memory related configurations can be a random variable, with mean & variance? Can this leads to better resource utilization? A fixed memory bound always means application will request more memory than they really need.
          Hide
          Binglin Chang added a comment -

          Can you summarize how the memory management works in the current patch?

          KeyValue Buffer memory management in the current patch is very simple, it has three parts:

          MemoryPool
          Hold the buffer of size io.sort.mb, and track current buffer usage
          notice that this buffer will only occupy virtual memory not RSS(memory really used) if the memory is not
          actually accessed, this is better than java because java initialize arrays.
          Memory lazy allocation is a beautiful feature

          MemoryBlock
          Small chunk of memory block backed by MemoryPool, used by PartitionBucket
          the default size of MemoryBlock = ceil(io.sort.mb / partition / 4 / MIN_BLOCK_SIZE) / MIN_BLOCK_SIZE
          currently MIN_BLOCK_SIZE == 32K, it should be dynamically tuned according to partition number & io.sort.mb
          The purpose of MemoryBlock is to reduce CPU cache miss. When sorting large indirect addressed KV pairs,
          I guess the sort time will be dominated by RAM random reads, so MemoryBlock is used to let each bucket
          get relatively continous memory.

          PartitionBucket
          Store KV pairs for a partition, it has two arrays:
          vector<MemoryBlock *> blocks
          blocks used by this bucket
          vector<uint32_t> offsets
          KV pair start offset in MemoryPool
          this vector is not under memory control(in io.sort.mb) yet, a bug needs to be fixed
          (use memory of MemoryPool, use MemoryBlock directly or move backward from buffer end)
          it uses less memory(1/3) than java kvindices, and use 1/2 of io.sort.mb memory at
          most (when all k/v are empty), so it won't be much problem currently

          Limitations of this approach:
          Large partition number leads to small MemoryBlock
          Large Key/Value can cause memory holes in small MemoryBlock

          It's difficult to determine block size, since it relates to K/V size(like the old io.sort.record.percent),
          200MB memory can only hold 12800 16K MemoryBlocks, so if average K/V size is a little bigger than 8K,
          half of the memory will likely be wasted.
          This approach will not work well when partition number & Key/Value size is large, but this is rare case,
          and it can be improved, just for example, we can use MemoryPool directly (disable MemoryBlock) if
          io.sort.mb/partiion number is too small

          The other thing related to this is this approach only support simple synchronized collect/spill, I think this
          will not harm performance very much.
          Asynchronized collect/spill needs tuning of io.sort.spill.percent, and we can make sort&spill really fast so
          parallel collect & spill is not so important as before, we can also let the original mapper thread to do sort&spill
          by enabling parallel sort&spill.

          Show
          Binglin Chang added a comment - Can you summarize how the memory management works in the current patch? KeyValue Buffer memory management in the current patch is very simple, it has three parts: MemoryPool Hold the buffer of size io.sort.mb, and track current buffer usage notice that this buffer will only occupy virtual memory not RSS(memory really used) if the memory is not actually accessed, this is better than java because java initialize arrays. Memory lazy allocation is a beautiful feature MemoryBlock Small chunk of memory block backed by MemoryPool, used by PartitionBucket the default size of MemoryBlock = ceil(io.sort.mb / partition / 4 / MIN_BLOCK_SIZE) / MIN_BLOCK_SIZE currently MIN_BLOCK_SIZE == 32K, it should be dynamically tuned according to partition number & io.sort.mb The purpose of MemoryBlock is to reduce CPU cache miss. When sorting large indirect addressed KV pairs, I guess the sort time will be dominated by RAM random reads, so MemoryBlock is used to let each bucket get relatively continous memory. PartitionBucket Store KV pairs for a partition, it has two arrays: vector<MemoryBlock *> blocks blocks used by this bucket vector<uint32_t> offsets KV pair start offset in MemoryPool this vector is not under memory control(in io.sort.mb) yet, a bug needs to be fixed (use memory of MemoryPool, use MemoryBlock directly or move backward from buffer end) it uses less memory(1/3) than java kvindices, and use 1/2 of io.sort.mb memory at most (when all k/v are empty), so it won't be much problem currently Limitations of this approach: Large partition number leads to small MemoryBlock Large Key/Value can cause memory holes in small MemoryBlock It's difficult to determine block size, since it relates to K/V size(like the old io.sort.record.percent), 200MB memory can only hold 12800 16K MemoryBlocks, so if average K/V size is a little bigger than 8K, half of the memory will likely be wasted. This approach will not work well when partition number & Key/Value size is large, but this is rare case, and it can be improved, just for example, we can use MemoryPool directly (disable MemoryBlock) if io.sort.mb/partiion number is too small The other thing related to this is this approach only support simple synchronized collect/spill, I think this will not harm performance very much. Asynchronized collect/spill needs tuning of io.sort.spill.percent, and we can make sort&spill really fast so parallel collect & spill is not so important as before, we can also let the original mapper thread to do sort&spill by enabling parallel sort&spill.
          Hide
          Chris Douglas added a comment -

          Just an idea, what if memory related configurations can be a random variable,
          with mean & variance? Can this leads to better resource utilization? A fixed memory bound
          always means application will request more memory than they really need.
          I think in many cases predictable memory control is enough, rather than precise memory control,
          since it's impractical. We can use some dynamic memory if it is in a predicable range,
          for example +/-20%, +-30%, etc.

          The fixed memory bound definitely causes resource waste. Not only will users ask for more memory than they need (particularly since most applications are not tightly tuned), but in our clusters, users will just as often request far too little. Because tasks' memory management is uniformly specified within a job, there isn't even an opportunity for the framework to adapt to skew.

          The random memory config is an interesting idea, but failed tasks are regrettable and expensive waste. For pipelines with SLAs, "random" failures will probably motivate users to jack up their memory requirements to match the range (which, if configurable, seems to encode the same contract). The precise specification was avoiding OOMs; because the collection is across a JNI boundary, a "relaxed" predictable memory footprint could be easier to deploy, assuming a hard limit in the native code to avoid swapping.

          Thanks for the detail on the collection data structures. That makes it much easier to orient oneself in the code.

          A few quick notes on your earlier comment:

          Adding the partition to the record, again, was to make the memory more predictable. The overhead in Java tracking thousands of per-partition buckets (many going unused) was worse than the per-record overhead, particularly in large jobs. Further, user comparators are often horribly inefficient, so the partition comparison and related hit to its performance was in the noise. The cache miss is real, but hard to reason about without leaving the JVM.

          The decorator-based stream is/was? required by the serialization interface. While the current patch only supports records with a known serialized length, the contract for other types is more general. Probably too general, but users with occasional several-hundred MB records (written in chunks) exist. Supporting that in this implementation is not a critical use case, since they can just use the existing collector. Tuning this to handle memcmp types could also put the burden of user comparators on the serialization frameworks, which is probably the best strategy. Which is to say: obsoleting the existing collection framework doesn't require that this support all of its use cases, if some of those can be worked around more competently elsewhere. If its principal focus is performance, it may make sense not to support inherently slow semantics.

          Which brings up a point: what is the scope of this JIRA? A full, native task runtime is a formidable job. Even if it only supported memcmp key types, no map-side combiner, no user-defined comparators, and records smaller than its intermediate buffer, such an improvement would still cover a lot of user jobs. It might make sense to commit that subset as optional functionality first, then iterate based on feedback.

          Show
          Chris Douglas added a comment - Just an idea, what if memory related configurations can be a random variable, with mean & variance? Can this leads to better resource utilization? A fixed memory bound always means application will request more memory than they really need. I think in many cases predictable memory control is enough, rather than precise memory control, since it's impractical. We can use some dynamic memory if it is in a predicable range, for example +/-20%, +-30%, etc. The fixed memory bound definitely causes resource waste. Not only will users ask for more memory than they need (particularly since most applications are not tightly tuned), but in our clusters, users will just as often request far too little. Because tasks' memory management is uniformly specified within a job, there isn't even an opportunity for the framework to adapt to skew. The random memory config is an interesting idea, but failed tasks are regrettable and expensive waste. For pipelines with SLAs, "random" failures will probably motivate users to jack up their memory requirements to match the range (which, if configurable, seems to encode the same contract). The precise specification was avoiding OOMs; because the collection is across a JNI boundary, a "relaxed" predictable memory footprint could be easier to deploy, assuming a hard limit in the native code to avoid swapping. Thanks for the detail on the collection data structures. That makes it much easier to orient oneself in the code. A few quick notes on your earlier comment: Adding the partition to the record, again, was to make the memory more predictable. The overhead in Java tracking thousands of per-partition buckets (many going unused) was worse than the per-record overhead, particularly in large jobs. Further, user comparators are often horribly inefficient, so the partition comparison and related hit to its performance was in the noise. The cache miss is real, but hard to reason about without leaving the JVM. The decorator-based stream is/was? required by the serialization interface. While the current patch only supports records with a known serialized length, the contract for other types is more general. Probably too general, but users with occasional several-hundred MB records (written in chunks) exist. Supporting that in this implementation is not a critical use case, since they can just use the existing collector. Tuning this to handle memcmp types could also put the burden of user comparators on the serialization frameworks, which is probably the best strategy. Which is to say: obsoleting the existing collection framework doesn't require that this support all of its use cases, if some of those can be worked around more competently elsewhere. If its principal focus is performance, it may make sense not to support inherently slow semantics. Which brings up a point: what is the scope of this JIRA? A full, native task runtime is a formidable job. Even if it only supported memcmp key types, no map-side combiner, no user-defined comparators, and records smaller than its intermediate buffer, such an improvement would still cover a lot of user jobs. It might make sense to commit that subset as optional functionality first, then iterate based on feedback.
          Hide
          Binglin Chang added a comment -

          It might make sense to commit that subset as optional functionality first, then iterate based on feedback.

          I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask?
          It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK.

          Random memory config gives the Resource Scheduler more information so it may yield better schedule algorithms. As for OOM, there is a flex layer for memory control already, page cache. In typical slave node memory configuration and real cases, page cache (should) take considerable proportions of total memory(20%-50%), so for example tasks can be configured to use 60% of memory, but can have some variance in 20% range, and the variance become relatively small when multiple tasks combined to node level or whole job level.
          One of my colleague is working on shuffle service, which delegate all reduce shuffle work to a per node service, this has some aspect which is similar:
          For a single task, the variance of memory footprint is a problem, but it gets much stable for many tasks run on a node.

          Show
          Binglin Chang added a comment - It might make sense to commit that subset as optional functionality first, then iterate based on feedback. I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask? It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK. Random memory config gives the Resource Scheduler more information so it may yield better schedule algorithms. As for OOM, there is a flex layer for memory control already, page cache. In typical slave node memory configuration and real cases, page cache (should) take considerable proportions of total memory(20%-50%), so for example tasks can be configured to use 60% of memory, but can have some variance in 20% range, and the variance become relatively small when multiple tasks combined to node level or whole job level. One of my colleague is working on shuffle service, which delegate all reduce shuffle work to a per node service, this has some aspect which is similar: For a single task, the variance of memory footprint is a problem, but it gets much stable for many tasks run on a node.
          Hide
          He Yongqiang added a comment -

          we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc).

          The main problem we are interested is how big is the memory problem for the java impl.

          Also it will be very useful here to define an open benchmark.

          Show
          He Yongqiang added a comment - we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc). The main problem we are interested is how big is the memory problem for the java impl. Also it will be very useful here to define an open benchmark.
          Hide
          Allen Wittenauer added a comment -

          Sure are a lot of header files with full blown functions in them....

          Show
          Allen Wittenauer added a comment - Sure are a lot of header files with full blown functions in them....
          Hide
          Chris Douglas added a comment -

          I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask?
          It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK.

          To pair the java/c++ code, a contrib module could make sense. Client tools and dev libraries are distant goals, though.

          Contributing it to the 0.20 branch is admissible, but suboptimal. Most of the releases generated for that series are sustaining releases. While it's possible to propose a new release branch with these improvements, releasing it would be difficult. Targeting trunk would be the best approach, if you can port your code.

          we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc).

          The main problem we are interested is how big is the memory problem for the java impl.

          Memory is the problem. The bucketed sort used from 0.10 to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers). Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort. Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times. Merging from fewer files also decreases the chance of failure and reduces seeks across all drives (by spreading output over fewer disks). A precise memory footprint also helped application authors calculate the framework overhead (both memory and number of spills) from the map output size without considering the number of reducers.

          That said, jobs matching particular profiles admit far more aggressive optimization, particularly if some of the use cases are ignored. Records larger than the sort buffer, user-defined comparators (particularly on deserialized objects), the combiner, and the intermediate data format restrict the solution space and complicate implementations. There's certainly fat to be trimmed from the general implementation, but restricting the problem will admit far more streamlined solutions than identifying and branching on all the special cases.

          Show
          Chris Douglas added a comment - I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask? It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK. To pair the java/c++ code, a contrib module could make sense. Client tools and dev libraries are distant goals, though. Contributing it to the 0.20 branch is admissible, but suboptimal. Most of the releases generated for that series are sustaining releases. While it's possible to propose a new release branch with these improvements, releasing it would be difficult. Targeting trunk would be the best approach, if you can port your code. we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc). The main problem we are interested is how big is the memory problem for the java impl. Memory is the problem. The bucketed sort used from 0.10 to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers). Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort. Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times. Merging from fewer files also decreases the chance of failure and reduces seeks across all drives (by spreading output over fewer disks). A precise memory footprint also helped application authors calculate the framework overhead (both memory and number of spills) from the map output size without considering the number of reducers. That said, jobs matching particular profiles admit far more aggressive optimization, particularly if some of the use cases are ignored. Records larger than the sort buffer, user-defined comparators (particularly on deserialized objects), the combiner, and the intermediate data format restrict the solution space and complicate implementations. There's certainly fat to be trimmed from the general implementation, but restricting the problem will admit far more streamlined solutions than identifying and branching on all the special cases.
          Hide
          He Yongqiang added a comment -

          The bucketed sort used from 0.10 to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers).

          If the java impl use the similar impl as the c++ one here, the only difference will be language. right? Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there. From the sort part is that it save the number of comparison since the original sort will need to compared records from difference reducers. And the c++ impl has trick of doing prefix comparison which reduces the number of cpu ops (8 bytes compare -> one long cmp op).

          Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort.Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times.

          I totally agree the spill will be the dominate factor if it is there. So here comes the problem that how much more memory the java impl will need compared to the c++ one. 20% or 50% or 100%? so we can calculate the chance of avoidable spilling if using the c++ impl.
          (Note: based on our analysis on jobs running during the past one month, most jobs need to shuffle less than 700MB data per mapper.)

          Show
          He Yongqiang added a comment - The bucketed sort used from 0.10 to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers). If the java impl use the similar impl as the c++ one here, the only difference will be language. right? Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there. From the sort part is that it save the number of comparison since the original sort will need to compared records from difference reducers. And the c++ impl has trick of doing prefix comparison which reduces the number of cpu ops (8 bytes compare -> one long cmp op). Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort.Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times. I totally agree the spill will be the dominate factor if it is there. So here comes the problem that how much more memory the java impl will need compared to the c++ one. 20% or 50% or 100%? so we can calculate the chance of avoidable spilling if using the c++ impl. (Note: based on our analysis on jobs running during the past one month, most jobs need to shuffle less than 700MB data per mapper.)
          Hide
          Chris Douglas added a comment -

          If the java impl use the similar impl as the c++ one here, the only difference will be language. right?

          Yes, but the language difference includes other overheads (more below).

          Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there.

          Each partition bucket maintins its own memory buffer, so the memory consumed by the collection framework includes the unused space in all the partition buffers. I'm calling that, possibly imprecisely, internal fragmentation. The RawComparator interface also requires that keys be contiguous, introducing other "waste" if the partition's collection buffer were not copied whenever it is expanded (as in 0.16; the expansion/copying overhead also harms performance and makes memory usage hard to predict because both src and dst buffers exist simultaneously), i.e. a key partially serialized at the end of a slab must be realigned in a new slab. This happens at the end of the circular buffer in the current implementation, but would happen on the boundary of every partition collector chunk.

          That internal fragmentation creates unused buffer space that "prematurely" triggers a spill to reclaim the memory. Allocating smaller slabs decreases internal fragmentation, but also adds an ~8 byte object tracking overhead and GC cycles. In contrast, large allocations (like the single collection buffer) are placed directly in permgen. The 4 byte overhead per record to track the partition is a space savings over slabs exactly matching each record size, requiring at least 8 bytes per record if naively implemented.

          The current implementation is oriented toward stuffing the most records into a precisely fixed amount of memory, and adopts a few assumptions: 1) one should spill as little as possible 2) if spilling is required, at least don't block the mapper 3) packing the most records into each spill favors MapTasks with combiners. If there are cases (we all acknowledge that there are) where spilling more often but faster can compensate for that difference, then it's worth reexamining those assumptions.

          Show
          Chris Douglas added a comment - If the java impl use the similar impl as the c++ one here, the only difference will be language. right? Yes, but the language difference includes other overheads (more below). Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there. Each partition bucket maintins its own memory buffer, so the memory consumed by the collection framework includes the unused space in all the partition buffers. I'm calling that, possibly imprecisely, internal fragmentation. The RawComparator interface also requires that keys be contiguous, introducing other "waste" if the partition's collection buffer were not copied whenever it is expanded (as in 0.16; the expansion/copying overhead also harms performance and makes memory usage hard to predict because both src and dst buffers exist simultaneously), i.e. a key partially serialized at the end of a slab must be realigned in a new slab. This happens at the end of the circular buffer in the current implementation, but would happen on the boundary of every partition collector chunk. That internal fragmentation creates unused buffer space that "prematurely" triggers a spill to reclaim the memory. Allocating smaller slabs decreases internal fragmentation, but also adds an ~8 byte object tracking overhead and GC cycles. In contrast, large allocations (like the single collection buffer) are placed directly in permgen. The 4 byte overhead per record to track the partition is a space savings over slabs exactly matching each record size, requiring at least 8 bytes per record if naively implemented. The current implementation is oriented toward stuffing the most records into a precisely fixed amount of memory, and adopts a few assumptions: 1) one should spill as little as possible 2) if spilling is required, at least don't block the mapper 3) packing the most records into each spill favors MapTasks with combiners. If there are cases (we all acknowledge that there are) where spilling more often but faster can compensate for that difference, then it's worth reexamining those assumptions.
          Hide
          He Yongqiang added a comment -

          sorry, i am kind of confused. i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go.

          Show
          He Yongqiang added a comment - sorry, i am kind of confused. i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go.
          Hide
          Binglin Chang added a comment -

          I just attach a updated patch, in case if anybody has interest. Changes:

          1. Separate key type & value type, theoretically value can be any type.
          2. Add google-snappy compression
          3. iobuffer don't use templates anymore
          Show
          Binglin Chang added a comment - I just attach a updated patch, in case if anybody has interest. Changes: Separate key type & value type, theoretically value can be any type. Add google-snappy compression iobuffer don't use templates anymore
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 5 new or modified tests.

          -1 patch. The patch command could not apply the patch.

          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/563//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch against trunk revision . +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 5 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/563//console This message is automatically generated.
          Hide
          Binglin Chang added a comment -

          Hi, Yongqiang

          i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl.

          I think a pure java re-impl is possible, there are some tricks to slove memory fragmentation issues, maybe not throughly, for example letting many adjacent buckets share one MemoryBlock if partition number is too large, which is what I will do in native implementation. And again, it's hard to support stream like key/value serialization semantics, so the java re-impl has the same limitations as native impl has.
          But the low level unaligned memcmp & memcpy is hard to implement in java.

          Show
          Binglin Chang added a comment - Hi, Yongqiang i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. I think a pure java re-impl is possible, there are some tricks to slove memory fragmentation issues, maybe not throughly, for example letting many adjacent buckets share one MemoryBlock if partition number is too large, which is what I will do in native implementation. And again, it's hard to support stream like key/value serialization semantics, so the java re-impl has the same limitations as native impl has. But the low level unaligned memcmp & memcpy is hard to implement in java.
          Hide
          Binglin Chang added a comment -

          Update some test results.

          1. Terasort 10G input 40map 40reduce on 9node cluster, 7map/7reduce slot per node
          io.sort.mb 500MB
          Results on jobhistory:

            Total AverageMap AverageShuffle AverageReduce
          java 54s 14s 14s 10s
          native 39s 7s 15s 9s
          java-snappy 36s 15s 9s 8s
          native-snappy 27s 7s 7s 8s

          speedup-without-compression: 1.38
          speedup-with-compression: 1.33

          2. I did another test of big data set
          Terasort 100G 400map 400reduce on 9node cluster, 7map/7reduce slot per node

            Total AverageMap AverageShuffle AverageReduce
          java-snappy 277s 17s 28s 10s
          native-snappy 234s 10s 22s 10s

          speedup: 1.18
          When cluster is under heavy workload, the bottleneck will be shown in page cache, shuffle, so optimizations in sort&spill do not play big roles.

          3. I test the dual pivot quicksort patch provided by Chris, using the same test as test No.1
          There are no observable differences compare to old QuickSort, Average map task time for java-snappy is the same as before(15s), perhaps the data set is too small, or the bottleneck is dominated by other factors, like memory random access.

          Show
          Binglin Chang added a comment - Update some test results. 1. Terasort 10G input 40map 40reduce on 9node cluster, 7map/7reduce slot per node io.sort.mb 500MB Results on jobhistory:   Total AverageMap AverageShuffle AverageReduce java 54s 14s 14s 10s native 39s 7s 15s 9s java-snappy 36s 15s 9s 8s native-snappy 27s 7s 7s 8s speedup-without-compression: 1.38 speedup-with-compression: 1.33 2. I did another test of big data set Terasort 100G 400map 400reduce on 9node cluster, 7map/7reduce slot per node   Total AverageMap AverageShuffle AverageReduce java-snappy 277s 17s 28s 10s native-snappy 234s 10s 22s 10s speedup: 1.18 When cluster is under heavy workload, the bottleneck will be shown in page cache, shuffle, so optimizations in sort&spill do not play big roles. 3. I test the dual pivot quicksort patch provided by Chris, using the same test as test No.1 There are no observable differences compare to old QuickSort, Average map task time for java-snappy is the same as before(15s), perhaps the data set is too small, or the bottleneck is dominated by other factors, like memory random access.
          Hide
          Todd Lipcon added a comment -

          One interesting C++ vs Java factor here is that this hot code is running in short-lived map tasks, the majority of the time. So there is less time for the JIT to kick in and actually compile the sorting code. In the past I've added -Xprof to mapred.child.java.opts, or looked at oprofile-jit output, and seen a fair amount of time spent in interpreted code.

          Show
          Todd Lipcon added a comment - One interesting C++ vs Java factor here is that this hot code is running in short-lived map tasks, the majority of the time. So there is less time for the JIT to kick in and actually compile the sorting code. In the past I've added -Xprof to mapred.child.java.opts, or looked at oprofile-jit output, and seen a fair amount of time spent in interpreted code.
          Hide
          Chris Douglas added a comment -

          we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go.

          Sorry, that's what I was trying to answer. A system matching your description existed in 0.16 and tests of the current collector show it to be faster for non-degenerate cases and far more predictable. The bucketed model inherently has some internal fragmentation which can only be eliminated by using expensive buffer copies and compactions or by using per-record byte arrays, where the 8 byte object overhead exceeds the cost of tracking the partition, requiring only 4 bytes. Eliminating that overhead is impractical, but even mitigating it (e.g. allowing partitions to share slabs) requires that one implement an allocation and memory management system across Java byte arrays or ByteBuffers, themselves allocated by the JVM. I would expect that system to be easier to write and maintain than even the current impl, but not trivial if it supports all of the existing use cases and semantics. Unlike the C++ impl (and like the current one), abstractions will likely be sacrificed to avoid the overheads.

          Show
          Chris Douglas added a comment - we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go. Sorry, that's what I was trying to answer. A system matching your description existed in 0.16 and tests of the current collector show it to be faster for non-degenerate cases and far more predictable. The bucketed model inherently has some internal fragmentation which can only be eliminated by using expensive buffer copies and compactions or by using per-record byte arrays, where the 8 byte object overhead exceeds the cost of tracking the partition, requiring only 4 bytes. Eliminating that overhead is impractical, but even mitigating it (e.g. allowing partitions to share slabs) requires that one implement an allocation and memory management system across Java byte arrays or ByteBuffers, themselves allocated by the JVM. I would expect that system to be easier to write and maintain than even the current impl, but not trivial if it supports all of the existing use cases and semantics. Unlike the C++ impl (and like the current one), abstractions will likely be sacrificed to avoid the overheads.
          Hide
          Scott Carey added a comment -

          There are no observable differences compare to old QuickSort

          Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort. However, it should have fewer swaps (0.80 times as many). If the cost of comparison is high (larger records, object comparison) the effect will be minimal. If the cost of comparison is low (values, very simple objects) the performance difference can be larger, up to about 2.5x as fast for sorting an array of ints.

          http://mail.openjdk.java.net/pipermail/core-libs-dev/2010-August/004687.html (initial message, many more if you search).

          Show
          Scott Carey added a comment - There are no observable differences compare to old QuickSort Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort. However, it should have fewer swaps (0.80 times as many). If the cost of comparison is high (larger records, object comparison) the effect will be minimal. If the cost of comparison is low (values, very simple objects) the performance difference can be larger, up to about 2.5x as fast for sorting an array of ints. http://mail.openjdk.java.net/pipermail/core-libs-dev/2010-August/004687.html (initial message, many more if you search).
          Hide
          Binglin Chang added a comment -

          Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort. However, it should have fewer swaps (0.80 times as many).

          Unfortunately, the main overhead of current sort in Hadoop comes from comparison, the swaps just swap two integer index, I thinks that's why Dual-pivot quicksort don't show any improvements. As Todd's results in MAPREDUCE-3235 and I experienced in this issue, the main overhead for the current Hadoop sort implementation is cache miss, nearly all comparison operations cause 2 random memory access in a huge memory area(typically X00MB).
          So it's not language differentes, just implementation differentes, to get better performance, we can:
          Add index in MAPREDUCE-3235, or use partition bucket based sort.

          I port DualPivotQuickSort java code to C++ and tested it on my intel i5 macbookpro, with terasort 10bytes key type and word key type in RandomTextWriter.

          TeraSort input data 50MB 500000 key/value pair
          11/12/15 12:18:16 INFO qsort time: 0.23108s
          11/12/15 12:18:16 INFO std::sort time: 0.18266s
          11/12/15 12:18:17 INFO DualPivotQuicksort time: 0.17167s

          About 6% faster, I think sorting an array of ints can get much better results, cause compare two inplace ints is much faster than campare two indexed binary string.

          Some updates about my work, I almost finished whole native mapTask, and part of native reduce task.
          As for native MapTask with C++ RecordReader, Mapper, Partitioner, MapOutputCollector, a native MapTask now can process 250MB(47MB compressed) terasort input data in just 1.6s, comparing this with the earlier test results(14s for java, 7s for java with NativeMapOutputCollector), it is a huge speed up, and it can be further optimized.

          Show
          Binglin Chang added a comment - Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort. However, it should have fewer swaps (0.80 times as many). Unfortunately, the main overhead of current sort in Hadoop comes from comparison, the swaps just swap two integer index, I thinks that's why Dual-pivot quicksort don't show any improvements. As Todd's results in MAPREDUCE-3235 and I experienced in this issue, the main overhead for the current Hadoop sort implementation is cache miss, nearly all comparison operations cause 2 random memory access in a huge memory area(typically X00MB). So it's not language differentes, just implementation differentes, to get better performance, we can: Add index in MAPREDUCE-3235 , or use partition bucket based sort. I port DualPivotQuickSort java code to C++ and tested it on my intel i5 macbookpro, with terasort 10bytes key type and word key type in RandomTextWriter. TeraSort input data 50MB 500000 key/value pair 11/12/15 12:18:16 INFO qsort time: 0.23108s 11/12/15 12:18:16 INFO std::sort time: 0.18266s 11/12/15 12:18:17 INFO DualPivotQuicksort time: 0.17167s About 6% faster, I think sorting an array of ints can get much better results, cause compare two inplace ints is much faster than campare two indexed binary string. Some updates about my work, I almost finished whole native mapTask, and part of native reduce task. As for native MapTask with C++ RecordReader, Mapper, Partitioner, MapOutputCollector, a native MapTask now can process 250MB(47MB compressed) terasort input data in just 1.6s, comparing this with the earlier test results(14s for java, 7s for java with NativeMapOutputCollector), it is a huge speed up, and it can be further optimized.
          Hide
          Todd Lipcon added a comment -

          I chatted recently with the MR guys over at Facebook. They have another implementation here which they've been working on that gives similar gains, while staying all in java. The approach is something like the following:

          • map output collector collects into small buffers, each sized something close to L3 cache
          • when any buffer is full, sort it but don't spill it
          • when enough buffers are collected to fill io.sort.mb, merge them from memory to disk

          This fixes the cache locality issues that everyone has identified, but doesn't require native code. It's up on their github here: https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java

          Maybe Yongqiang can comment more on this approach? Dmytro has given permission offline for us to work from the code on their github and contribute it on trunk (they may not have time to contribute it in the nearterm)

          I think a short term goal for this area we could attack would be to make the map output collector implementation pluggable. Then people can experiment more freely with different collector implementations. I don't have time for it - just throwing it out there as a thought.

          Show
          Todd Lipcon added a comment - I chatted recently with the MR guys over at Facebook. They have another implementation here which they've been working on that gives similar gains, while staying all in java. The approach is something like the following: map output collector collects into small buffers, each sized something close to L3 cache when any buffer is full, sort it but don't spill it when enough buffers are collected to fill io.sort.mb, merge them from memory to disk This fixes the cache locality issues that everyone has identified, but doesn't require native code. It's up on their github here: https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java Maybe Yongqiang can comment more on this approach? Dmytro has given permission offline for us to work from the code on their github and contribute it on trunk (they may not have time to contribute it in the nearterm) I think a short term goal for this area we could attack would be to make the map output collector implementation pluggable. Then people can experiment more freely with different collector implementations. I don't have time for it - just throwing it out there as a thought.
          Hide
          Dong Yang added a comment -

          Beautiful works beyond HCE! Contrib to binglin~

          Show
          Dong Yang added a comment - Beautiful works beyond HCE! Contrib to binglin~
          Hide
          Binglin Chang added a comment -

          I write a design document for this work, to better describe my thoughts. The document also include some test results.
          Generally, NativeTask outperforms original MapReduce framework, about 3x-7x
          for map task, 1x-1.1x for reduce task, 1.5x-5x for whole job. If the compiler
          hypothesis is correct, the speedup could be 4.5x-12x for map task, and the
          speedup for whole job should be larger correspondingly.

          Show
          Binglin Chang added a comment - I write a design document for this work, to better describe my thoughts. The document also include some test results. Generally, NativeTask outperforms original MapReduce framework, about 3x-7x for map task, 1x-1.1x for reduce task, 1.5x-5x for whole job. If the compiler hypothesis is correct, the speedup could be 4.5x-12x for map task, and the speedup for whole job should be larger correspondingly.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 5 new or modified tests.

          -1 patch. The patch command could not apply the patch.

          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/1749//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch against trunk revision . +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 5 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/1749//console This message is automatically generated.
          Hide
          He Yongqiang added a comment -

          Really cool stuff!
          In terms of CPU performance, can you also do some comparison with new java output collector that Todd mentioned (https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java)? In facebook's internal tests, we have seen a big improvement(8x-10x) for cpu spent in sort (e.g., example 1:
          240M->30M, example 2: 9M->1.8M), and total mapper CPU of 2x ( example 1: 707M > 440M, example 2: 40M>19M). They are CPU numbers, not latency numbers. BlockMapOutputBuffer.java uses only one thread but the original collector uses 2 threads. But the latency is still improved by a lot (like 30%).

          with some analysis on performance differences, it will really help understand some bottlenecks and the difference that language brings.

          Show
          He Yongqiang added a comment - Really cool stuff! In terms of CPU performance, can you also do some comparison with new java output collector that Todd mentioned ( https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java)? In facebook's internal tests, we have seen a big improvement(8x-10x) for cpu spent in sort (e.g., example 1: 240M->30M, example 2: 9M->1.8M), and total mapper CPU of 2x ( example 1: 707M > 440M, example 2: 40M >19M). They are CPU numbers, not latency numbers. BlockMapOutputBuffer.java uses only one thread but the original collector uses 2 threads. But the latency is still improved by a lot (like 30%). with some analysis on performance differences, it will really help understand some bottlenecks and the difference that language brings.
          Hide
          Binglin Chang added a comment -

          @Yongqiang
          It seams BlockMapOutputBuffer only support BytesWritable currently? So Wordcount & Terasort can't run directly . From the test result, Map Avg time and sort time, I would say sort take about 40-50% time of whole map task, because sort is CPU intensive, the CPU time should be more, about 50%-60% maybe.
          If my compiler assumption is right, total speedup for Wordcount mapper should be 10x, sort speedup should be 10x-12x, and the rest(reader, mapper, merge, spill combined) should be (10-0.5*12)/(1-0.5)=8.
          I must say using quicksort to sort small buffers fit into cache then merge them is a good idea, I should make this optimization too.
          NativeTask currently use single thread currently, but I think all partition based collector, including BlockMapOutputCollector) can take advantage of parallel sort & spill I mentioned in the design doc, this needs code changes to other part(TaskTracker,IndexCache maybe), and change map output file to a directory.

          Show
          Binglin Chang added a comment - @Yongqiang It seams BlockMapOutputBuffer only support BytesWritable currently? So Wordcount & Terasort can't run directly . From the test result, Map Avg time and sort time, I would say sort take about 40-50% time of whole map task, because sort is CPU intensive, the CPU time should be more, about 50%-60% maybe. If my compiler assumption is right, total speedup for Wordcount mapper should be 10x, sort speedup should be 10x-12x, and the rest(reader, mapper, merge, spill combined) should be (10-0.5*12)/(1-0.5)=8. I must say using quicksort to sort small buffers fit into cache then merge them is a good idea, I should make this optimization too. NativeTask currently use single thread currently, but I think all partition based collector, including BlockMapOutputCollector) can take advantage of parallel sort & spill I mentioned in the design doc, this needs code changes to other part(TaskTracker,IndexCache maybe), and change map output file to a directory.
          Hide
          Todd Lipcon added a comment -

          I forward-ported Facebook's shuffle from their github 'production' branch into MR2. There are a few changes I had to make to adjust to the latest interfaces, and some counter-related stuff I commented out. I also fixed a perf issue where it spent a lot of time creating new Configuration objects. But the core of the code is the same.

          The patch also has a simple benchmark similar to what Binglin described above – a standalone piece of code to exercise the mapside sort/spill with nothing else in the way. This makes it very easy to compare different implementations.

          In my testing, the FB implementation is about 10% faster on a wall clock basis, and perhaps a bit better on CPU. I didn't spend a lot of time looking at results as of yet - but figured I'd post this here in case anyone else was interested.

          This is in no way meant for commit - just sharing code so other people can tweak. If someone has a version of the native mapside sort already available against trunk MR2, it would be great to have that available on the JIRA so we can compare all of the options.

          Show
          Todd Lipcon added a comment - I forward-ported Facebook's shuffle from their github 'production' branch into MR2. There are a few changes I had to make to adjust to the latest interfaces, and some counter-related stuff I commented out. I also fixed a perf issue where it spent a lot of time creating new Configuration objects. But the core of the code is the same. The patch also has a simple benchmark similar to what Binglin described above – a standalone piece of code to exercise the mapside sort/spill with nothing else in the way. This makes it very easy to compare different implementations. In my testing, the FB implementation is about 10% faster on a wall clock basis, and perhaps a bit better on CPU. I didn't spend a lot of time looking at results as of yet - but figured I'd post this here in case anyone else was interested. This is in no way meant for commit - just sharing code so other people can tweak. If someone has a version of the native mapside sort already available against trunk MR2, it would be great to have that available on the JIRA so we can compare all of the options.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12638882/fb-shuffle.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 1 new or modified test files.

          -1 javac. The applied patch generated 1491 javac compiler warnings (more than the trunk's current 1483 warnings).

          +1 javadoc. There were no new javadoc warning messages.

          +1 eclipse:eclipse. The patch built with eclipse:eclipse.

          -1 findbugs. The patch appears to introduce 10 new Findbugs (version 1.3.9) warnings.

          -1 release audit. The applied patch generated 1 release audit warnings.

          -1 core tests. The patch failed these unit tests in hadoop-common-project/hadoop-common hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core:

          org.apache.hadoop.mapreduce.v2.app.TestMRAppMaster

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//testReport/
          Release audit warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/patchReleaseAuditProblems.txt
          Findbugs warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/newPatchFindbugsWarningshadoop-mapreduce-client-core.html
          Javac warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/diffJavacWarnings.txt
          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12638882/fb-shuffle.patch against trunk revision . +1 @author . The patch does not contain any @author tags. +1 tests included . The patch appears to include 1 new or modified test files. -1 javac . The applied patch generated 1491 javac compiler warnings (more than the trunk's current 1483 warnings). +1 javadoc . There were no new javadoc warning messages. +1 eclipse:eclipse . The patch built with eclipse:eclipse. -1 findbugs . The patch appears to introduce 10 new Findbugs (version 1.3.9) warnings. -1 release audit . The applied patch generated 1 release audit warnings. -1 core tests . The patch failed these unit tests in hadoop-common-project/hadoop-common hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core: org.apache.hadoop.mapreduce.v2.app.TestMRAppMaster +1 contrib tests . The patch passed contrib unit tests. Test results: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//testReport/ Release audit warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/patchReleaseAuditProblems.txt Findbugs warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/newPatchFindbugsWarningshadoop-mapreduce-client-core.html Javac warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/diffJavacWarnings.txt Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//console This message is automatically generated.

            People

            • Assignee:
              Binglin Chang
              Reporter:
              Binglin Chang
            • Votes:
              5 Vote for this issue
              Watchers:
              43 Start watching this issue

              Dates

              • Created:
                Updated:

                Development