Details

    • Type: Improvement Improvement
    • Status: Patch Available
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: task
    • Labels:
      None
    • Environment:

      x86-64 Linux/Unix

    • Release Note:
      Task level native optimization
    • Tags:
      optimization task

      Description

      I'm recently working on native optimization for MapTask based on JNI.

      The basic idea is that, add a NativeMapOutputCollector to handle k/v pairs emitted by mapper, therefore sort, spill, IFile serialization can all be done in native code, preliminary test(on Xeon E5410, jdk6u24) showed promising results:

      1. Sort is about 3x-10x as fast as java(only binary string compare is supported)

      2. IFile serialization speed is about 3x of java, about 500MB/s, if hardware CRC32C is used, things can get much faster(1G/

      3. Merge code is not completed yet, so the test use enough io.sort.mb to prevent mid-spill

      This leads to a total speed up of 2x~3x for the whole MapTask, if IdentityMapper(mapper does nothing) is used

      There are limitations of course, currently only Text and BytesWritable is supported, and I have not think through many things right now, such as how to support map side combine. I had some discussion with somebody familiar with hive, it seems that these limitations won't be much problem for Hive to benefit from those optimizations, at least. Advices or discussions about improving compatibility are most welcome

      Currently NativeMapOutputCollector has a static method called canEnable(), which checks if key/value type, comparator type, combiner are all compatible, then MapTask can choose to enable NativeMapOutputCollector.

      This is only a preliminary test, more work need to be done. I expect better final results, and I believe similar optimization can be adopt to reduce task and shuffle too.

      1. MAPREDUCE-2841.v2.patch
        190 kB
        Binglin Chang
      2. MAPREDUCE-2841.v1.patch
        180 kB
        Binglin Chang
      3. fb-shuffle.patch
        76 kB
        Todd Lipcon
      4. dualpivotv20-0.patch
        4 kB
        Chris Douglas
      5. dualpivot-0.patch
        5 kB
        Chris Douglas
      6. DESIGN.html
        42 kB
        Binglin Chang

        Issue Links

          Activity

          Hide
          Binglin Chang added a comment -

          I just submit a demo patch to show basic ideas and current progress.
          This patch contains map task optimization using NativeMapOutputCollector.

          NativeMapOutputCollector use JNI to pass java k/v pairs and
          partition value to native side. As JNI has un-neglectable
          overhead(about 1000ns per empty JNI call wit 4-6 arguments),
          a k/v cache is added to pass k/v pairs to native side in batch.
          I suspect using a DirectBuffer will be better.

          On the native side, k/v pairs are put into partitioned buffers,
          this is different from java's single buffer approach. By doing so,
          sort can be much faster, because sort a big array is much slower
          than sort many small arrays; small array also means less cache
          miss; and partition number does not needed to be compared
          in sort.

          Two light weighted io buffers: ReadBuffer & AppendBuffer,
          which has better performance than decorator based
          java & hadoop io streams. This greatly benefits IFile
          serialization, many method can be inlined. Currently
          compression is not supported yet, but it should not be a
          complex work to add block based compressor like snappy or
          quicklz.

          This optimization is ONLY targeted for x86-64, little-endian
          and assumes unaligned 64-bit loads and stores are cheap,
          like google-snappy.

          About the patch:
          I put cpp code into src/c++/libnativetask, and use a Makefile
          separate from build.xml and src/native, because many things are
          not decided yet, so I don't want to mess up other components.

          1. cd src/c++/libnativetask
          2. make
          3. copy libnativetask.so to $HADOOP_HOME/lib/native/Linux-amd64-64/
          4. set mapred.native.map.output.collector to true in jobconf

          Again, this patch is just a demo, it is far from stable & complete,
          and has many known and unknown bugs.

          Here is some test results:
          1. running single task
          intel corei5 jdk6u24 on macbook pro
          input:
          size 250000000 bytes
          100 bytes per line
          [key 9bytes]\t[value 89bytes]\n
          KeyValueTextInputFormat
          IdentityMapper
          partition number 100
          io.sort.mb 400MB no mid-spill
          Total time and analysis:

            JAVA Native Speedup
          Sort 5.27 0.8 6.5
          IFile SER 2.8 0.9 3.1
          Total 14.13 6.34 2.2

          2. Some result about partitioned sort & spill
          Input is the same as test 1

          partition 1 10 20 40 80 200 400 800 1600 2000
          sort time 2.21 1.47 1.27 1.11 0.9 0.71 0.65 0.58 0.51 0.5
          spill time 0.49 0.43 0.41 0.39 0.36 0.33 0.32 0.32 0.29 0.28

          As is illustrated, partition number has great impact on sort & spill performance,
          This is largely because sort complexity and CPU cache effect.
          If partition number is P, recored count is N, we get:
          T = P * (N/P) * log(N/P) = N * log(N/P), so partition number matters.

          3. Terasort 10G input 40map 40reduce on 9node cluster
          io.sort.mb 500MB
          Results on jobhistory:

            Total AverageMap AverageShuffle AverageReduce
          java 54s 14s 14s 10s
          native 39s 7s 15s 9s
          java-lzo 43s 15s 8s 8s
          native-lzo ?      

          speedup: 1.38
          Map output compression can reduce shuffle time significantly,
          so with better compression speed & less shuffle time impact,
          We can get better speedup when map output compression is finished.
          I don't have large clusters to do more standard tests.

          Current progress:

          • NativeMapOutputCollector jni [done]
          • io buffers [done]
          • crc32/crc32c [done]
          • integration of jobconfs [done]
          • ifile writer [done]
          • write ifile index file [done]
          • ifile index reader [done]
          • ifile reader [done]
          • single-pass merge [done]
          • handle big key/value
            In current implementation, if key/value length exceeds
            buffer length, there will be crash, I will add big
            k/v handle soon.
          • support block compression snappy
          • multi-pass merge
          • local io counters
          • map-side combine
          • parallel spill & merge
            Java has a spill thread to do collect/spill
            concurrently. Currently NativeMapOutputCollelctor only
            use one thread, but parallel sort&spill&merge is possible,
            since k/v pairs are partitioned now. This can further
            speed up sort&spill&merge by simply adding threads.
          • reduce task optimization
          • support no sort
          • support grouping
          • support pipes API, streaming
          Show
          Binglin Chang added a comment - I just submit a demo patch to show basic ideas and current progress. This patch contains map task optimization using NativeMapOutputCollector. NativeMapOutputCollector use JNI to pass java k/v pairs and partition value to native side. As JNI has un-neglectable overhead(about 1000ns per empty JNI call wit 4-6 arguments), a k/v cache is added to pass k/v pairs to native side in batch. I suspect using a DirectBuffer will be better. On the native side, k/v pairs are put into partitioned buffers, this is different from java's single buffer approach. By doing so, sort can be much faster, because sort a big array is much slower than sort many small arrays; small array also means less cache miss; and partition number does not needed to be compared in sort. Two light weighted io buffers: ReadBuffer & AppendBuffer, which has better performance than decorator based java & hadoop io streams. This greatly benefits IFile serialization, many method can be inlined. Currently compression is not supported yet, but it should not be a complex work to add block based compressor like snappy or quicklz. This optimization is ONLY targeted for x86-64, little-endian and assumes unaligned 64-bit loads and stores are cheap, like google-snappy. About the patch: I put cpp code into src/c++/libnativetask, and use a Makefile separate from build.xml and src/native, because many things are not decided yet, so I don't want to mess up other components. cd src/c++/libnativetask make copy libnativetask.so to $HADOOP_HOME/lib/native/Linux-amd64-64/ set mapred.native.map.output.collector to true in jobconf Again, this patch is just a demo, it is far from stable & complete, and has many known and unknown bugs. Here is some test results: 1. running single task intel corei5 jdk6u24 on macbook pro input: size 250000000 bytes 100 bytes per line [key 9bytes] \t [value 89bytes] \n KeyValueTextInputFormat IdentityMapper partition number 100 io.sort.mb 400MB no mid-spill Total time and analysis:   JAVA Native Speedup Sort 5.27 0.8 6.5 IFile SER 2.8 0.9 3.1 Total 14.13 6.34 2.2 2. Some result about partitioned sort & spill Input is the same as test 1 partition 1 10 20 40 80 200 400 800 1600 2000 sort time 2.21 1.47 1.27 1.11 0.9 0.71 0.65 0.58 0.51 0.5 spill time 0.49 0.43 0.41 0.39 0.36 0.33 0.32 0.32 0.29 0.28 As is illustrated, partition number has great impact on sort & spill performance, This is largely because sort complexity and CPU cache effect. If partition number is P, recored count is N, we get: T = P * (N/P) * log(N/P) = N * log(N/P), so partition number matters. 3. Terasort 10G input 40map 40reduce on 9node cluster io.sort.mb 500MB Results on jobhistory:   Total AverageMap AverageShuffle AverageReduce java 54s 14s 14s 10s native 39s 7s 15s 9s java-lzo 43s 15s 8s 8s native-lzo ?       speedup: 1.38 Map output compression can reduce shuffle time significantly, so with better compression speed & less shuffle time impact, We can get better speedup when map output compression is finished. I don't have large clusters to do more standard tests. Current progress: NativeMapOutputCollector jni [done] io buffers [done] crc32/crc32c [done] integration of jobconfs [done] ifile writer [done] write ifile index file [done] ifile index reader [done] ifile reader [done] single-pass merge [done] handle big key/value In current implementation, if key/value length exceeds buffer length, there will be crash, I will add big k/v handle soon. support block compression snappy multi-pass merge local io counters map-side combine parallel spill & merge Java has a spill thread to do collect/spill concurrently. Currently NativeMapOutputCollelctor only use one thread, but parallel sort&spill&merge is possible, since k/v pairs are partitioned now. This can further speed up sort&spill&merge by simply adding threads. reduce task optimization support no sort support grouping support pipes API, streaming
          Hide
          Binglin Chang added a comment -

          This patch is for 0.20 branch

          Show
          Binglin Chang added a comment - This patch is for 0.20 branch
          Hide
          Binglin Chang added a comment -

          I think this work can help improving hadoop in many ways:

          1. Reduce total resource consumption, mainly CPU
          2. Speed up job execution, better response time
          3. More precise memory control, important feature in production system
          4. More programming interface, c/cpp, python, etc.
          5. Opens up further optimization possibility
          Show
          Binglin Chang added a comment - I think this work can help improving hadoop in many ways: Reduce total resource consumption, mainly CPU Speed up job execution, better response time More precise memory control, important feature in production system More programming interface, c/cpp, python, etc. Opens up further optimization possibility
          Hide
          MengWang added a comment -

          Good job!

          Show
          MengWang added a comment - Good job!
          Hide
          Scott Carey added a comment -

          I think there is quite a bit of juice to squeeze from the Java implementation. For example, Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort). Java can (and likely will in the 0.23 branch) also benefit from hardware CRC. After that, I wonder how much faster a native implementation would actually be.

          Show
          Scott Carey added a comment - I think there is quite a bit of juice to squeeze from the Java implementation. For example, Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort). Java can (and likely will in the 0.23 branch) also benefit from hardware CRC. After that, I wonder how much faster a native implementation would actually be.
          Hide
          Binglin Chang added a comment -

          Hi, Scott

          there is quite a bit of juice to squeeze from the Java implementation.

          I agree with that

          Show
          Binglin Chang added a comment - Hi, Scott there is quite a bit of juice to squeeze from the Java implementation. I agree with that
          Hide
          Binglin Chang added a comment -

          Has someone already done a benchmark of hadoop running on java 7 vs java 6, and share some results?
          I'm afraid I don't have enough resource to do standard benchmark, I can do some simple tests but may not convincing.

          Hadoop uses it's own QuickSort & HeapSort implementation and interface, if dual pivot quicksort & Timsort is much faster, I think we should do some test, and add it to hadoop(this does not require java7).

          The current implementation is very naive, and has a long way to be further optimized. For example, sort just use std::sort.

          The (very)long term goal for this work, is to provide a independent task-level native runtime and API. Users can use native api to develop applications, but java application also get part of the performance benefits. It opens up further optimization possibilities, both in framework and application layer.

          Show
          Binglin Chang added a comment - Has someone already done a benchmark of hadoop running on java 7 vs java 6, and share some results? I'm afraid I don't have enough resource to do standard benchmark, I can do some simple tests but may not convincing. Hadoop uses it's own QuickSort & HeapSort implementation and interface, if dual pivot quicksort & Timsort is much faster, I think we should do some test, and add it to hadoop(this does not require java7). The current implementation is very naive, and has a long way to be further optimized. For example, sort just use std::sort. The (very)long term goal for this work, is to provide a independent task-level native runtime and API. Users can use native api to develop applications, but java application also get part of the performance benefits. It opens up further optimization possibilities, both in framework and application layer.
          Hide
          Chris Douglas added a comment -

          Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort)

          As Binglin points out, that's easy to test given a rote translation. I ported the code here mechanically:

          http://permalink.gmane.org/gmane.comp.java.openjdk.core-libs.devel/2628


          The goal of the last few implementations was a predictable memory footprint. Per-partition buckets are obviously faster to sort, but record skew and internal fragmentation in the tracking structures (and overhead in Java objects) motivated packing records into a single buffer. Can you summarize how the memory management works in the current patch?

          It's exciting to see such a significant gain, here.

          Show
          Chris Douglas added a comment - Java 7 uses a different sort algorithm that is often 2x as fast as Java 6 for objects (dual pivot quicksort) and a faster mergesort implementation too for arrays (TimSort) As Binglin points out, that's easy to test given a rote translation. I ported the code here mechanically: http://permalink.gmane.org/gmane.comp.java.openjdk.core-libs.devel/2628 The goal of the last few implementations was a predictable memory footprint. Per-partition buckets are obviously faster to sort, but record skew and internal fragmentation in the tracking structures (and overhead in Java objects) motivated packing records into a single buffer. Can you summarize how the memory management works in the current patch? It's exciting to see such a significant gain, here.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12491995/dualpivotv20-0.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 3 new or modified tests.

          -1 patch. The patch command could not apply the patch.

          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/550//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12491995/dualpivotv20-0.patch against trunk revision . +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 3 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/550//console This message is automatically generated.
          Hide
          Binglin Chang added a comment -

          The goal of the last few implementations was a predictable memory footprint.

          Predictable memory foot print is very important, I strongly agree.
          Observations of our cluster's memory status shows that most OOM or high memory consumption
          cases are caused by big Key/Value in InputReader, Key/Value writable and merge. Sort&spill
          is much more stable.
          I think in many cases predictable memory control is enough, rather than precise memory control,
          since it's impractical. We can use some dynamic memory if it is in a predicable range,
          for example +/-20%, +-30%, etc.
          Just an idea, what if memory related configurations can be a random variable,
          with mean & variance? Can this leads to better resource utilization? A fixed memory bound
          always means application will request more memory than they really need.

          Show
          Binglin Chang added a comment - The goal of the last few implementations was a predictable memory footprint. Predictable memory foot print is very important, I strongly agree. Observations of our cluster's memory status shows that most OOM or high memory consumption cases are caused by big Key/Value in InputReader, Key/Value writable and merge. Sort&spill is much more stable. I think in many cases predictable memory control is enough, rather than precise memory control, since it's impractical. We can use some dynamic memory if it is in a predicable range, for example +/-20%, +-30%, etc. Just an idea, what if memory related configurations can be a random variable, with mean & variance? Can this leads to better resource utilization? A fixed memory bound always means application will request more memory than they really need.
          Hide
          Binglin Chang added a comment -

          Can you summarize how the memory management works in the current patch?

          KeyValue Buffer memory management in the current patch is very simple, it has three parts:

          MemoryPool
          Hold the buffer of size io.sort.mb, and track current buffer usage
          notice that this buffer will only occupy virtual memory not RSS(memory really used) if the memory is not
          actually accessed, this is better than java because java initialize arrays.
          Memory lazy allocation is a beautiful feature

          MemoryBlock
          Small chunk of memory block backed by MemoryPool, used by PartitionBucket
          the default size of MemoryBlock = ceil(io.sort.mb / partition / 4 / MIN_BLOCK_SIZE) / MIN_BLOCK_SIZE
          currently MIN_BLOCK_SIZE == 32K, it should be dynamically tuned according to partition number & io.sort.mb
          The purpose of MemoryBlock is to reduce CPU cache miss. When sorting large indirect addressed KV pairs,
          I guess the sort time will be dominated by RAM random reads, so MemoryBlock is used to let each bucket
          get relatively continous memory.

          PartitionBucket
          Store KV pairs for a partition, it has two arrays:
          vector<MemoryBlock *> blocks
          blocks used by this bucket
          vector<uint32_t> offsets
          KV pair start offset in MemoryPool
          this vector is not under memory control(in io.sort.mb) yet, a bug needs to be fixed
          (use memory of MemoryPool, use MemoryBlock directly or move backward from buffer end)
          it uses less memory(1/3) than java kvindices, and use 1/2 of io.sort.mb memory at
          most (when all k/v are empty), so it won't be much problem currently

          Limitations of this approach:
          Large partition number leads to small MemoryBlock
          Large Key/Value can cause memory holes in small MemoryBlock

          It's difficult to determine block size, since it relates to K/V size(like the old io.sort.record.percent),
          200MB memory can only hold 12800 16K MemoryBlocks, so if average K/V size is a little bigger than 8K,
          half of the memory will likely be wasted.
          This approach will not work well when partition number & Key/Value size is large, but this is rare case,
          and it can be improved, just for example, we can use MemoryPool directly (disable MemoryBlock) if
          io.sort.mb/partiion number is too small

          The other thing related to this is this approach only support simple synchronized collect/spill, I think this
          will not harm performance very much.
          Asynchronized collect/spill needs tuning of io.sort.spill.percent, and we can make sort&spill really fast so
          parallel collect & spill is not so important as before, we can also let the original mapper thread to do sort&spill
          by enabling parallel sort&spill.

          Show
          Binglin Chang added a comment - Can you summarize how the memory management works in the current patch? KeyValue Buffer memory management in the current patch is very simple, it has three parts: MemoryPool Hold the buffer of size io.sort.mb, and track current buffer usage notice that this buffer will only occupy virtual memory not RSS(memory really used) if the memory is not actually accessed, this is better than java because java initialize arrays. Memory lazy allocation is a beautiful feature MemoryBlock Small chunk of memory block backed by MemoryPool, used by PartitionBucket the default size of MemoryBlock = ceil(io.sort.mb / partition / 4 / MIN_BLOCK_SIZE) / MIN_BLOCK_SIZE currently MIN_BLOCK_SIZE == 32K, it should be dynamically tuned according to partition number & io.sort.mb The purpose of MemoryBlock is to reduce CPU cache miss. When sorting large indirect addressed KV pairs, I guess the sort time will be dominated by RAM random reads, so MemoryBlock is used to let each bucket get relatively continous memory. PartitionBucket Store KV pairs for a partition, it has two arrays: vector<MemoryBlock *> blocks blocks used by this bucket vector<uint32_t> offsets KV pair start offset in MemoryPool this vector is not under memory control(in io.sort.mb) yet, a bug needs to be fixed (use memory of MemoryPool, use MemoryBlock directly or move backward from buffer end) it uses less memory(1/3) than java kvindices, and use 1/2 of io.sort.mb memory at most (when all k/v are empty), so it won't be much problem currently Limitations of this approach: Large partition number leads to small MemoryBlock Large Key/Value can cause memory holes in small MemoryBlock It's difficult to determine block size, since it relates to K/V size(like the old io.sort.record.percent), 200MB memory can only hold 12800 16K MemoryBlocks, so if average K/V size is a little bigger than 8K, half of the memory will likely be wasted. This approach will not work well when partition number & Key/Value size is large, but this is rare case, and it can be improved, just for example, we can use MemoryPool directly (disable MemoryBlock) if io.sort.mb/partiion number is too small The other thing related to this is this approach only support simple synchronized collect/spill, I think this will not harm performance very much. Asynchronized collect/spill needs tuning of io.sort.spill.percent, and we can make sort&spill really fast so parallel collect & spill is not so important as before, we can also let the original mapper thread to do sort&spill by enabling parallel sort&spill.
          Hide
          Chris Douglas added a comment -

          Just an idea, what if memory related configurations can be a random variable,
          with mean & variance? Can this leads to better resource utilization? A fixed memory bound
          always means application will request more memory than they really need.
          I think in many cases predictable memory control is enough, rather than precise memory control,
          since it's impractical. We can use some dynamic memory if it is in a predicable range,
          for example +/-20%, +-30%, etc.

          The fixed memory bound definitely causes resource waste. Not only will users ask for more memory than they need (particularly since most applications are not tightly tuned), but in our clusters, users will just as often request far too little. Because tasks' memory management is uniformly specified within a job, there isn't even an opportunity for the framework to adapt to skew.

          The random memory config is an interesting idea, but failed tasks are regrettable and expensive waste. For pipelines with SLAs, "random" failures will probably motivate users to jack up their memory requirements to match the range (which, if configurable, seems to encode the same contract). The precise specification was avoiding OOMs; because the collection is across a JNI boundary, a "relaxed" predictable memory footprint could be easier to deploy, assuming a hard limit in the native code to avoid swapping.

          Thanks for the detail on the collection data structures. That makes it much easier to orient oneself in the code.

          A few quick notes on your earlier comment:

          Adding the partition to the record, again, was to make the memory more predictable. The overhead in Java tracking thousands of per-partition buckets (many going unused) was worse than the per-record overhead, particularly in large jobs. Further, user comparators are often horribly inefficient, so the partition comparison and related hit to its performance was in the noise. The cache miss is real, but hard to reason about without leaving the JVM.

          The decorator-based stream is/was? required by the serialization interface. While the current patch only supports records with a known serialized length, the contract for other types is more general. Probably too general, but users with occasional several-hundred MB records (written in chunks) exist. Supporting that in this implementation is not a critical use case, since they can just use the existing collector. Tuning this to handle memcmp types could also put the burden of user comparators on the serialization frameworks, which is probably the best strategy. Which is to say: obsoleting the existing collection framework doesn't require that this support all of its use cases, if some of those can be worked around more competently elsewhere. If its principal focus is performance, it may make sense not to support inherently slow semantics.

          Which brings up a point: what is the scope of this JIRA? A full, native task runtime is a formidable job. Even if it only supported memcmp key types, no map-side combiner, no user-defined comparators, and records smaller than its intermediate buffer, such an improvement would still cover a lot of user jobs. It might make sense to commit that subset as optional functionality first, then iterate based on feedback.

          Show
          Chris Douglas added a comment - Just an idea, what if memory related configurations can be a random variable, with mean & variance? Can this leads to better resource utilization? A fixed memory bound always means application will request more memory than they really need. I think in many cases predictable memory control is enough, rather than precise memory control, since it's impractical. We can use some dynamic memory if it is in a predicable range, for example +/-20%, +-30%, etc. The fixed memory bound definitely causes resource waste. Not only will users ask for more memory than they need (particularly since most applications are not tightly tuned), but in our clusters, users will just as often request far too little. Because tasks' memory management is uniformly specified within a job, there isn't even an opportunity for the framework to adapt to skew. The random memory config is an interesting idea, but failed tasks are regrettable and expensive waste. For pipelines with SLAs, "random" failures will probably motivate users to jack up their memory requirements to match the range (which, if configurable, seems to encode the same contract). The precise specification was avoiding OOMs; because the collection is across a JNI boundary, a "relaxed" predictable memory footprint could be easier to deploy, assuming a hard limit in the native code to avoid swapping. Thanks for the detail on the collection data structures. That makes it much easier to orient oneself in the code. A few quick notes on your earlier comment: Adding the partition to the record, again, was to make the memory more predictable. The overhead in Java tracking thousands of per-partition buckets (many going unused) was worse than the per-record overhead, particularly in large jobs. Further, user comparators are often horribly inefficient, so the partition comparison and related hit to its performance was in the noise. The cache miss is real, but hard to reason about without leaving the JVM. The decorator-based stream is/was? required by the serialization interface. While the current patch only supports records with a known serialized length, the contract for other types is more general. Probably too general, but users with occasional several-hundred MB records (written in chunks) exist. Supporting that in this implementation is not a critical use case, since they can just use the existing collector. Tuning this to handle memcmp types could also put the burden of user comparators on the serialization frameworks, which is probably the best strategy. Which is to say: obsoleting the existing collection framework doesn't require that this support all of its use cases, if some of those can be worked around more competently elsewhere. If its principal focus is performance, it may make sense not to support inherently slow semantics. Which brings up a point: what is the scope of this JIRA? A full, native task runtime is a formidable job. Even if it only supported memcmp key types, no map-side combiner, no user-defined comparators, and records smaller than its intermediate buffer, such an improvement would still cover a lot of user jobs. It might make sense to commit that subset as optional functionality first, then iterate based on feedback.
          Hide
          Binglin Chang added a comment -

          It might make sense to commit that subset as optional functionality first, then iterate based on feedback.

          I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask?
          It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK.

          Random memory config gives the Resource Scheduler more information so it may yield better schedule algorithms. As for OOM, there is a flex layer for memory control already, page cache. In typical slave node memory configuration and real cases, page cache (should) take considerable proportions of total memory(20%-50%), so for example tasks can be configured to use 60% of memory, but can have some variance in 20% range, and the variance become relatively small when multiple tasks combined to node level or whole job level.
          One of my colleague is working on shuffle service, which delegate all reduce shuffle work to a per node service, this has some aspect which is similar:
          For a single task, the variance of memory footprint is a problem, but it gets much stable for many tasks run on a node.

          Show
          Binglin Chang added a comment - It might make sense to commit that subset as optional functionality first, then iterate based on feedback. I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask? It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK. Random memory config gives the Resource Scheduler more information so it may yield better schedule algorithms. As for OOM, there is a flex layer for memory control already, page cache. In typical slave node memory configuration and real cases, page cache (should) take considerable proportions of total memory(20%-50%), so for example tasks can be configured to use 60% of memory, but can have some variance in 20% range, and the variance become relatively small when multiple tasks combined to node level or whole job level. One of my colleague is working on shuffle service, which delegate all reduce shuffle work to a per node service, this has some aspect which is similar: For a single task, the variance of memory footprint is a problem, but it gets much stable for many tasks run on a node.
          Hide
          He Yongqiang added a comment -

          we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc).

          The main problem we are interested is how big is the memory problem for the java impl.

          Also it will be very useful here to define an open benchmark.

          Show
          He Yongqiang added a comment - we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc). The main problem we are interested is how big is the memory problem for the java impl. Also it will be very useful here to define an open benchmark.
          Hide
          Allen Wittenauer added a comment -

          Sure are a lot of header files with full blown functions in them....

          Show
          Allen Wittenauer added a comment - Sure are a lot of header files with full blown functions in them....
          Hide
          Chris Douglas added a comment -

          I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask?
          It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK.

          To pair the java/c++ code, a contrib module could make sense. Client tools and dev libraries are distant goals, though.

          Contributing it to the 0.20 branch is admissible, but suboptimal. Most of the releases generated for that series are sustaining releases. While it's possible to propose a new release branch with these improvements, releasing it would be difficult. Targeting trunk would be the best approach, if you can port your code.

          we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc).

          The main problem we are interested is how big is the memory problem for the java impl.

          Memory is the problem. The bucketed sort used from 0.10 to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers). Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort. Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times. Merging from fewer files also decreases the chance of failure and reduces seeks across all drives (by spreading output over fewer disks). A precise memory footprint also helped application authors calculate the framework overhead (both memory and number of spills) from the map output size without considering the number of reducers.

          That said, jobs matching particular profiles admit far more aggressive optimization, particularly if some of the use cases are ignored. Records larger than the sort buffer, user-defined comparators (particularly on deserialized objects), the combiner, and the intermediate data format restrict the solution space and complicate implementations. There's certainly fat to be trimmed from the general implementation, but restricting the problem will admit far more streamlined solutions than identifying and branching on all the special cases.

          Show
          Chris Douglas added a comment - I agree. How to contribute this to hadoop? Add a new subdirectory in contrib like streaming, or merge to native, or stay in current c++/libnativetask? It contains both c++ and java code, and will likely to add client tools like streaming, and dev SDK. To pair the java/c++ code, a contrib module could make sense. Client tools and dev libraries are distant goals, though. Contributing it to the 0.20 branch is admissible, but suboptimal. Most of the releases generated for that series are sustaining releases. While it's possible to propose a new release branch with these improvements, releasing it would be difficult. Targeting trunk would be the best approach, if you can port your code. we are also evaluating the approach of optimizing the existing Hadoop Java map side sort algorithms (like playing the same set of tricks used in this c++ impl: bucket sort, prefix key comparison, a better crc32 etc). The main problem we are interested is how big is the memory problem for the java impl. Memory is the problem. The bucketed sort used from 0.10 to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers). Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort. Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times. Merging from fewer files also decreases the chance of failure and reduces seeks across all drives (by spreading output over fewer disks). A precise memory footprint also helped application authors calculate the framework overhead (both memory and number of spills) from the map output size without considering the number of reducers. That said, jobs matching particular profiles admit far more aggressive optimization, particularly if some of the use cases are ignored. Records larger than the sort buffer, user-defined comparators (particularly on deserialized objects), the combiner, and the intermediate data format restrict the solution space and complicate implementations. There's certainly fat to be trimmed from the general implementation, but restricting the problem will admit far more streamlined solutions than identifying and branching on all the special cases.
          Hide
          He Yongqiang added a comment -

          The bucketed sort used from 0.10 to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers).

          If the java impl use the similar impl as the c++ one here, the only difference will be language. right? Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there. From the sort part is that it save the number of comparison since the original sort will need to compared records from difference reducers. And the c++ impl has trick of doing prefix comparison which reduces the number of cpu ops (8 bytes compare -> one long cmp op).

          Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort.Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times.

          I totally agree the spill will be the dominate factor if it is there. So here comes the problem that how much more memory the java impl will need compared to the c++ one. 20% or 50% or 100%? so we can calculate the chance of avoidable spilling if using the c++ impl.
          (Note: based on our analysis on jobs running during the past one month, most jobs need to shuffle less than 700MB data per mapper.)

          Show
          He Yongqiang added a comment - The bucketed sort used from 0.10 to 0.16 had more internal fragmentation and a less predictable memory footprint (particularly for jobs with lots of reducers). If the java impl use the similar impl as the c++ one here, the only difference will be language. right? Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there. From the sort part is that it save the number of comparison since the original sort will need to compared records from difference reducers. And the c++ impl has trick of doing prefix comparison which reduces the number of cpu ops (8 bytes compare -> one long cmp op). Subsequent implementations focused on reducing the number of spills for each task, because the cost of spilling dominated the cost of the sort.Even with a significant speedup in the sort step, avoiding a merge by managing memory more carefully usually effects faster task times. I totally agree the spill will be the dominate factor if it is there. So here comes the problem that how much more memory the java impl will need compared to the c++ one. 20% or 50% or 100%? so we can calculate the chance of avoidable spilling if using the c++ impl. (Note: based on our analysis on jobs running during the past one month, most jobs need to shuffle less than 700MB data per mapper.)
          Hide
          Chris Douglas added a comment -

          If the java impl use the similar impl as the c++ one here, the only difference will be language. right?

          Yes, but the language difference includes other overheads (more below).

          Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there.

          Each partition bucket maintins its own memory buffer, so the memory consumed by the collection framework includes the unused space in all the partition buffers. I'm calling that, possibly imprecisely, internal fragmentation. The RawComparator interface also requires that keys be contiguous, introducing other "waste" if the partition's collection buffer were not copied whenever it is expanded (as in 0.16; the expansion/copying overhead also harms performance and makes memory usage hard to predict because both src and dst buffers exist simultaneously), i.e. a key partially serialized at the end of a slab must be realigned in a new slab. This happens at the end of the circular buffer in the current implementation, but would happen on the boundary of every partition collector chunk.

          That internal fragmentation creates unused buffer space that "prematurely" triggers a spill to reclaim the memory. Allocating smaller slabs decreases internal fragmentation, but also adds an ~8 byte object tracking overhead and GC cycles. In contrast, large allocations (like the single collection buffer) are placed directly in permgen. The 4 byte overhead per record to track the partition is a space savings over slabs exactly matching each record size, requiring at least 8 bytes per record if naively implemented.

          The current implementation is oriented toward stuffing the most records into a precisely fixed amount of memory, and adopts a few assumptions: 1) one should spill as little as possible 2) if spilling is required, at least don't block the mapper 3) packing the most records into each spill favors MapTasks with combiners. If there are cases (we all acknowledge that there are) where spilling more often but faster can compensate for that difference, then it's worth reexamining those assumptions.

          Show
          Chris Douglas added a comment - If the java impl use the similar impl as the c++ one here, the only difference will be language. right? Yes, but the language difference includes other overheads (more below). Sorry, can you explain more about how the c++ can do a better job here for predictable memory footprint? in the current java impl, all records (no matter which reducer it is going) are stored in a central byte array. In the c++ impl, on one mapper task, each reducer will have one corresponding partition bucket which maintains its own memory buffer. From what i understand, one partition bucket is for one reducer. and all records going to that reducer from the current maptask are stored there, will be sorted and spilled from there. Each partition bucket maintins its own memory buffer, so the memory consumed by the collection framework includes the unused space in all the partition buffers. I'm calling that, possibly imprecisely, internal fragmentation. The RawComparator interface also requires that keys be contiguous, introducing other "waste" if the partition's collection buffer were not copied whenever it is expanded (as in 0.16; the expansion/copying overhead also harms performance and makes memory usage hard to predict because both src and dst buffers exist simultaneously), i.e. a key partially serialized at the end of a slab must be realigned in a new slab. This happens at the end of the circular buffer in the current implementation, but would happen on the boundary of every partition collector chunk. That internal fragmentation creates unused buffer space that "prematurely" triggers a spill to reclaim the memory. Allocating smaller slabs decreases internal fragmentation, but also adds an ~8 byte object tracking overhead and GC cycles. In contrast, large allocations (like the single collection buffer) are placed directly in permgen. The 4 byte overhead per record to track the partition is a space savings over slabs exactly matching each record size, requiring at least 8 bytes per record if naively implemented. The current implementation is oriented toward stuffing the most records into a precisely fixed amount of memory, and adopts a few assumptions: 1) one should spill as little as possible 2) if spilling is required, at least don't block the mapper 3) packing the most records into each spill favors MapTasks with combiners. If there are cases (we all acknowledge that there are) where spilling more often but faster can compensate for that difference, then it's worth reexamining those assumptions.
          Hide
          He Yongqiang added a comment -

          sorry, i am kind of confused. i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go.

          Show
          He Yongqiang added a comment - sorry, i am kind of confused. i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go.
          Hide
          Binglin Chang added a comment -

          I just attach a updated patch, in case if anybody has interest. Changes:

          1. Separate key type & value type, theoretically value can be any type.
          2. Add google-snappy compression
          3. iobuffer don't use templates anymore
          Show
          Binglin Chang added a comment - I just attach a updated patch, in case if anybody has interest. Changes: Separate key type & value type, theoretically value can be any type. Add google-snappy compression iobuffer don't use templates anymore
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 5 new or modified tests.

          -1 patch. The patch command could not apply the patch.

          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/563//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch against trunk revision . +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 5 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/563//console This message is automatically generated.
          Hide
          Binglin Chang added a comment -

          Hi, Yongqiang

          i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl.

          I think a pure java re-impl is possible, there are some tricks to slove memory fragmentation issues, maybe not throughly, for example letting many adjacent buckets share one MemoryBlock if partition number is too large, which is what I will do in native implementation. And again, it's hard to support stream like key/value serialization semantics, so the java re-impl has the same limitations as native impl has.
          But the low level unaligned memcmp & memcpy is hard to implement in java.

          Show
          Binglin Chang added a comment - Hi, Yongqiang i may should make me more clear: we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. I think a pure java re-impl is possible, there are some tricks to slove memory fragmentation issues, maybe not throughly, for example letting many adjacent buckets share one MemoryBlock if partition number is too large, which is what I will do in native implementation. And again, it's hard to support stream like key/value serialization semantics, so the java re-impl has the same limitations as native impl has. But the low level unaligned memcmp & memcpy is hard to implement in java.
          Hide
          Binglin Chang added a comment -

          Update some test results.

          1. Terasort 10G input 40map 40reduce on 9node cluster, 7map/7reduce slot per node
          io.sort.mb 500MB
          Results on jobhistory:

            Total AverageMap AverageShuffle AverageReduce
          java 54s 14s 14s 10s
          native 39s 7s 15s 9s
          java-snappy 36s 15s 9s 8s
          native-snappy 27s 7s 7s 8s

          speedup-without-compression: 1.38
          speedup-with-compression: 1.33

          2. I did another test of big data set
          Terasort 100G 400map 400reduce on 9node cluster, 7map/7reduce slot per node

            Total AverageMap AverageShuffle AverageReduce
          java-snappy 277s 17s 28s 10s
          native-snappy 234s 10s 22s 10s

          speedup: 1.18
          When cluster is under heavy workload, the bottleneck will be shown in page cache, shuffle, so optimizations in sort&spill do not play big roles.

          3. I test the dual pivot quicksort patch provided by Chris, using the same test as test No.1
          There are no observable differences compare to old QuickSort, Average map task time for java-snappy is the same as before(15s), perhaps the data set is too small, or the bottleneck is dominated by other factors, like memory random access.

          Show
          Binglin Chang added a comment - Update some test results. 1. Terasort 10G input 40map 40reduce on 9node cluster, 7map/7reduce slot per node io.sort.mb 500MB Results on jobhistory:   Total AverageMap AverageShuffle AverageReduce java 54s 14s 14s 10s native 39s 7s 15s 9s java-snappy 36s 15s 9s 8s native-snappy 27s 7s 7s 8s speedup-without-compression: 1.38 speedup-with-compression: 1.33 2. I did another test of big data set Terasort 100G 400map 400reduce on 9node cluster, 7map/7reduce slot per node   Total AverageMap AverageShuffle AverageReduce java-snappy 277s 17s 28s 10s native-snappy 234s 10s 22s 10s speedup: 1.18 When cluster is under heavy workload, the bottleneck will be shown in page cache, shuffle, so optimizations in sort&spill do not play big roles. 3. I test the dual pivot quicksort patch provided by Chris, using the same test as test No.1 There are no observable differences compare to old QuickSort, Average map task time for java-snappy is the same as before(15s), perhaps the data set is too small, or the bottleneck is dominated by other factors, like memory random access.
          Hide
          Todd Lipcon added a comment -

          One interesting C++ vs Java factor here is that this hot code is running in short-lived map tasks, the majority of the time. So there is less time for the JIT to kick in and actually compile the sorting code. In the past I've added -Xprof to mapred.child.java.opts, or looked at oprofile-jit output, and seen a fair amount of time spent in interpreted code.

          Show
          Todd Lipcon added a comment - One interesting C++ vs Java factor here is that this hot code is running in short-lived map tasks, the majority of the time. So there is less time for the JIT to kick in and actually compile the sorting code. In the past I've added -Xprof to mapred.child.java.opts, or looked at oprofile-jit output, and seen a fair amount of time spent in interpreted code.
          Hide
          Chris Douglas added a comment -

          we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go.

          Sorry, that's what I was trying to answer. A system matching your description existed in 0.16 and tests of the current collector show it to be faster for non-degenerate cases and far more predictable. The bucketed model inherently has some internal fragmentation which can only be eliminated by using expensive buffer copies and compactions or by using per-record byte arrays, where the 8 byte object overhead exceeds the cost of tracking the partition, requiring only 4 bytes. Eliminating that overhead is impractical, but even mitigating it (e.g. allowing partitions to share slabs) requires that one implement an allocation and memory management system across Java byte arrays or ByteBuffers, themselves allocated by the JVM. I would expect that system to be easier to write and maintain than even the current impl, but not trivial if it supports all of the existing use cases and semantics. Unlike the C++ impl (and like the current one), abstractions will likely be sacrificed to avoid the overheads.

          Show
          Chris Douglas added a comment - we are trying to evaluate and compare the c++ impl in HCE (and also this jira) and doing a pure java re-impl. So the thing that we mostly cared about is that is there sth that the c++ impl can do and a java re-impl can not. And if there is, we need to find out how much is that difference. And from there we can have a better understand of each approach and decide which approach to go. Sorry, that's what I was trying to answer. A system matching your description existed in 0.16 and tests of the current collector show it to be faster for non-degenerate cases and far more predictable. The bucketed model inherently has some internal fragmentation which can only be eliminated by using expensive buffer copies and compactions or by using per-record byte arrays, where the 8 byte object overhead exceeds the cost of tracking the partition, requiring only 4 bytes. Eliminating that overhead is impractical, but even mitigating it (e.g. allowing partitions to share slabs) requires that one implement an allocation and memory management system across Java byte arrays or ByteBuffers, themselves allocated by the JVM. I would expect that system to be easier to write and maintain than even the current impl, but not trivial if it supports all of the existing use cases and semantics. Unlike the C++ impl (and like the current one), abstractions will likely be sacrificed to avoid the overheads.
          Hide
          Scott Carey added a comment -

          There are no observable differences compare to old QuickSort

          Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort. However, it should have fewer swaps (0.80 times as many). If the cost of comparison is high (larger records, object comparison) the effect will be minimal. If the cost of comparison is low (values, very simple objects) the performance difference can be larger, up to about 2.5x as fast for sorting an array of ints.

          http://mail.openjdk.java.net/pipermail/core-libs-dev/2010-August/004687.html (initial message, many more if you search).

          Show
          Scott Carey added a comment - There are no observable differences compare to old QuickSort Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort. However, it should have fewer swaps (0.80 times as many). If the cost of comparison is high (larger records, object comparison) the effect will be minimal. If the cost of comparison is low (values, very simple objects) the performance difference can be larger, up to about 2.5x as fast for sorting an array of ints. http://mail.openjdk.java.net/pipermail/core-libs-dev/2010-August/004687.html (initial message, many more if you search).
          Hide
          Binglin Chang added a comment -

          Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort. However, it should have fewer swaps (0.80 times as many).

          Unfortunately, the main overhead of current sort in Hadoop comes from comparison, the swaps just swap two integer index, I thinks that's why Dual-pivot quicksort don't show any improvements. As Todd's results in MAPREDUCE-3235 and I experienced in this issue, the main overhead for the current Hadoop sort implementation is cache miss, nearly all comparison operations cause 2 random memory access in a huge memory area(typically X00MB).
          So it's not language differentes, just implementation differentes, to get better performance, we can:
          Add index in MAPREDUCE-3235, or use partition bucket based sort.

          I port DualPivotQuickSort java code to C++ and tested it on my intel i5 macbookpro, with terasort 10bytes key type and word key type in RandomTextWriter.

          TeraSort input data 50MB 500000 key/value pair
          11/12/15 12:18:16 INFO qsort time: 0.23108s
          11/12/15 12:18:16 INFO std::sort time: 0.18266s
          11/12/15 12:18:17 INFO DualPivotQuicksort time: 0.17167s

          About 6% faster, I think sorting an array of ints can get much better results, cause compare two inplace ints is much faster than campare two indexed binary string.

          Some updates about my work, I almost finished whole native mapTask, and part of native reduce task.
          As for native MapTask with C++ RecordReader, Mapper, Partitioner, MapOutputCollector, a native MapTask now can process 250MB(47MB compressed) terasort input data in just 1.6s, comparing this with the earlier test results(14s for java, 7s for java with NativeMapOutputCollector), it is a huge speed up, and it can be further optimized.

          Show
          Binglin Chang added a comment - Dual-pivot quicksort causes the exact same number of comparisons as ordinary quicksort. However, it should have fewer swaps (0.80 times as many). Unfortunately, the main overhead of current sort in Hadoop comes from comparison, the swaps just swap two integer index, I thinks that's why Dual-pivot quicksort don't show any improvements. As Todd's results in MAPREDUCE-3235 and I experienced in this issue, the main overhead for the current Hadoop sort implementation is cache miss, nearly all comparison operations cause 2 random memory access in a huge memory area(typically X00MB). So it's not language differentes, just implementation differentes, to get better performance, we can: Add index in MAPREDUCE-3235 , or use partition bucket based sort. I port DualPivotQuickSort java code to C++ and tested it on my intel i5 macbookpro, with terasort 10bytes key type and word key type in RandomTextWriter. TeraSort input data 50MB 500000 key/value pair 11/12/15 12:18:16 INFO qsort time: 0.23108s 11/12/15 12:18:16 INFO std::sort time: 0.18266s 11/12/15 12:18:17 INFO DualPivotQuicksort time: 0.17167s About 6% faster, I think sorting an array of ints can get much better results, cause compare two inplace ints is much faster than campare two indexed binary string. Some updates about my work, I almost finished whole native mapTask, and part of native reduce task. As for native MapTask with C++ RecordReader, Mapper, Partitioner, MapOutputCollector, a native MapTask now can process 250MB(47MB compressed) terasort input data in just 1.6s, comparing this with the earlier test results(14s for java, 7s for java with NativeMapOutputCollector), it is a huge speed up, and it can be further optimized.
          Hide
          Todd Lipcon added a comment -

          I chatted recently with the MR guys over at Facebook. They have another implementation here which they've been working on that gives similar gains, while staying all in java. The approach is something like the following:

          • map output collector collects into small buffers, each sized something close to L3 cache
          • when any buffer is full, sort it but don't spill it
          • when enough buffers are collected to fill io.sort.mb, merge them from memory to disk

          This fixes the cache locality issues that everyone has identified, but doesn't require native code. It's up on their github here: https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java

          Maybe Yongqiang can comment more on this approach? Dmytro has given permission offline for us to work from the code on their github and contribute it on trunk (they may not have time to contribute it in the nearterm)

          I think a short term goal for this area we could attack would be to make the map output collector implementation pluggable. Then people can experiment more freely with different collector implementations. I don't have time for it - just throwing it out there as a thought.

          Show
          Todd Lipcon added a comment - I chatted recently with the MR guys over at Facebook. They have another implementation here which they've been working on that gives similar gains, while staying all in java. The approach is something like the following: map output collector collects into small buffers, each sized something close to L3 cache when any buffer is full, sort it but don't spill it when enough buffers are collected to fill io.sort.mb, merge them from memory to disk This fixes the cache locality issues that everyone has identified, but doesn't require native code. It's up on their github here: https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java Maybe Yongqiang can comment more on this approach? Dmytro has given permission offline for us to work from the code on their github and contribute it on trunk (they may not have time to contribute it in the nearterm) I think a short term goal for this area we could attack would be to make the map output collector implementation pluggable. Then people can experiment more freely with different collector implementations. I don't have time for it - just throwing it out there as a thought.
          Hide
          Dong Yang added a comment -

          Beautiful works beyond HCE! Contrib to binglin~

          Show
          Dong Yang added a comment - Beautiful works beyond HCE! Contrib to binglin~
          Hide
          Binglin Chang added a comment -

          I write a design document for this work, to better describe my thoughts. The document also include some test results.
          Generally, NativeTask outperforms original MapReduce framework, about 3x-7x
          for map task, 1x-1.1x for reduce task, 1.5x-5x for whole job. If the compiler
          hypothesis is correct, the speedup could be 4.5x-12x for map task, and the
          speedup for whole job should be larger correspondingly.

          Show
          Binglin Chang added a comment - I write a design document for this work, to better describe my thoughts. The document also include some test results. Generally, NativeTask outperforms original MapReduce framework, about 3x-7x for map task, 1x-1.1x for reduce task, 1.5x-5x for whole job. If the compiler hypothesis is correct, the speedup could be 4.5x-12x for map task, and the speedup for whole job should be larger correspondingly.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 5 new or modified tests.

          -1 patch. The patch command could not apply the patch.

          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/1749//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12492208/MAPREDUCE-2841.v2.patch against trunk revision . +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 5 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/1749//console This message is automatically generated.
          Hide
          He Yongqiang added a comment -

          Really cool stuff!
          In terms of CPU performance, can you also do some comparison with new java output collector that Todd mentioned (https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java)? In facebook's internal tests, we have seen a big improvement(8x-10x) for cpu spent in sort (e.g., example 1:
          240M->30M, example 2: 9M->1.8M), and total mapper CPU of 2x ( example 1: 707M > 440M, example 2: 40M>19M). They are CPU numbers, not latency numbers. BlockMapOutputBuffer.java uses only one thread but the original collector uses 2 threads. But the latency is still improved by a lot (like 30%).

          with some analysis on performance differences, it will really help understand some bottlenecks and the difference that language brings.

          Show
          He Yongqiang added a comment - Really cool stuff! In terms of CPU performance, can you also do some comparison with new java output collector that Todd mentioned ( https://github.com/facebook/hadoop-20/blob/master/src/mapred/org/apache/hadoop/mapred/BlockMapOutputBuffer.java)? In facebook's internal tests, we have seen a big improvement(8x-10x) for cpu spent in sort (e.g., example 1: 240M->30M, example 2: 9M->1.8M), and total mapper CPU of 2x ( example 1: 707M > 440M, example 2: 40M >19M). They are CPU numbers, not latency numbers. BlockMapOutputBuffer.java uses only one thread but the original collector uses 2 threads. But the latency is still improved by a lot (like 30%). with some analysis on performance differences, it will really help understand some bottlenecks and the difference that language brings.
          Hide
          Binglin Chang added a comment -

          @Yongqiang
          It seams BlockMapOutputBuffer only support BytesWritable currently? So Wordcount & Terasort can't run directly . From the test result, Map Avg time and sort time, I would say sort take about 40-50% time of whole map task, because sort is CPU intensive, the CPU time should be more, about 50%-60% maybe.
          If my compiler assumption is right, total speedup for Wordcount mapper should be 10x, sort speedup should be 10x-12x, and the rest(reader, mapper, merge, spill combined) should be (10-0.5*12)/(1-0.5)=8.
          I must say using quicksort to sort small buffers fit into cache then merge them is a good idea, I should make this optimization too.
          NativeTask currently use single thread currently, but I think all partition based collector, including BlockMapOutputCollector) can take advantage of parallel sort & spill I mentioned in the design doc, this needs code changes to other part(TaskTracker,IndexCache maybe), and change map output file to a directory.

          Show
          Binglin Chang added a comment - @Yongqiang It seams BlockMapOutputBuffer only support BytesWritable currently? So Wordcount & Terasort can't run directly . From the test result, Map Avg time and sort time, I would say sort take about 40-50% time of whole map task, because sort is CPU intensive, the CPU time should be more, about 50%-60% maybe. If my compiler assumption is right, total speedup for Wordcount mapper should be 10x, sort speedup should be 10x-12x, and the rest(reader, mapper, merge, spill combined) should be (10-0.5*12)/(1-0.5)=8. I must say using quicksort to sort small buffers fit into cache then merge them is a good idea, I should make this optimization too. NativeTask currently use single thread currently, but I think all partition based collector, including BlockMapOutputCollector) can take advantage of parallel sort & spill I mentioned in the design doc, this needs code changes to other part(TaskTracker,IndexCache maybe), and change map output file to a directory.
          Hide
          Todd Lipcon added a comment -

          I forward-ported Facebook's shuffle from their github 'production' branch into MR2. There are a few changes I had to make to adjust to the latest interfaces, and some counter-related stuff I commented out. I also fixed a perf issue where it spent a lot of time creating new Configuration objects. But the core of the code is the same.

          The patch also has a simple benchmark similar to what Binglin described above – a standalone piece of code to exercise the mapside sort/spill with nothing else in the way. This makes it very easy to compare different implementations.

          In my testing, the FB implementation is about 10% faster on a wall clock basis, and perhaps a bit better on CPU. I didn't spend a lot of time looking at results as of yet - but figured I'd post this here in case anyone else was interested.

          This is in no way meant for commit - just sharing code so other people can tweak. If someone has a version of the native mapside sort already available against trunk MR2, it would be great to have that available on the JIRA so we can compare all of the options.

          Show
          Todd Lipcon added a comment - I forward-ported Facebook's shuffle from their github 'production' branch into MR2. There are a few changes I had to make to adjust to the latest interfaces, and some counter-related stuff I commented out. I also fixed a perf issue where it spent a lot of time creating new Configuration objects. But the core of the code is the same. The patch also has a simple benchmark similar to what Binglin described above – a standalone piece of code to exercise the mapside sort/spill with nothing else in the way. This makes it very easy to compare different implementations. In my testing, the FB implementation is about 10% faster on a wall clock basis, and perhaps a bit better on CPU. I didn't spend a lot of time looking at results as of yet - but figured I'd post this here in case anyone else was interested. This is in no way meant for commit - just sharing code so other people can tweak. If someone has a version of the native mapside sort already available against trunk MR2, it would be great to have that available on the JIRA so we can compare all of the options.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12638882/fb-shuffle.patch
          against trunk revision .

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 1 new or modified test files.

          -1 javac. The applied patch generated 1491 javac compiler warnings (more than the trunk's current 1483 warnings).

          +1 javadoc. There were no new javadoc warning messages.

          +1 eclipse:eclipse. The patch built with eclipse:eclipse.

          -1 findbugs. The patch appears to introduce 10 new Findbugs (version 1.3.9) warnings.

          -1 release audit. The applied patch generated 1 release audit warnings.

          -1 core tests. The patch failed these unit tests in hadoop-common-project/hadoop-common hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core:

          org.apache.hadoop.mapreduce.v2.app.TestMRAppMaster

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//testReport/
          Release audit warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/patchReleaseAuditProblems.txt
          Findbugs warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/newPatchFindbugsWarningshadoop-mapreduce-client-core.html
          Javac warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/diffJavacWarnings.txt
          Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12638882/fb-shuffle.patch against trunk revision . +1 @author . The patch does not contain any @author tags. +1 tests included . The patch appears to include 1 new or modified test files. -1 javac . The applied patch generated 1491 javac compiler warnings (more than the trunk's current 1483 warnings). +1 javadoc . There were no new javadoc warning messages. +1 eclipse:eclipse . The patch built with eclipse:eclipse. -1 findbugs . The patch appears to introduce 10 new Findbugs (version 1.3.9) warnings. -1 release audit . The applied patch generated 1 release audit warnings. -1 core tests . The patch failed these unit tests in hadoop-common-project/hadoop-common hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core: org.apache.hadoop.mapreduce.v2.app.TestMRAppMaster +1 contrib tests . The patch passed contrib unit tests. Test results: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//testReport/ Release audit warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/patchReleaseAuditProblems.txt Findbugs warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/newPatchFindbugsWarningshadoop-mapreduce-client-core.html Javac warnings: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//artifact/trunk/patchprocess/diffJavacWarnings.txt Console output: https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/4488//console This message is automatically generated.
          Hide
          Sean Zhong added a comment -
          Show
          Sean Zhong added a comment - Updates on this: https://github.com/intel-hadoop/nativetask
          Hide
          Sean Zhong added a comment -

          Latest native task code is posted at: https://github.com/intel-hadoop/nativetask/tree/native_output_collector for easy review. Currently the code is patched againt Hadoop2.2.

          Some features highlights:
          1. Full performance test covered https://github.com/intel-hadoop/nativetask/tree/native_output_collector#what-is-the-benefit
          2. Support all values types which extends Writable.
          3. Support all key types in hadoop.io, and most key types in project hive, pig, mahout, hbase. For a list of supported key types, please check https://github.com/intel-hadoop/nativetask/wiki#supported-key-types
          4. Fully support java combiner.
          5. Support large key and values.
          6. A full test suite for key value combination.
          7. Support GZIP, LZ4, and Snappy.

          Items we are still working on:
          1. Extract support for Hive/Pig/HBase/Mahout platforms to standalone jars, and decouple the dependency with native task source code.
          2. More documents describing the api.

          For design, test, and doc, please check
          https://github.com/intel-hadoop/nativetask/tree/native_output_collector
          https://github.com/intel-hadoop/nativetask/wiki

          Show
          Sean Zhong added a comment - Latest native task code is posted at: https://github.com/intel-hadoop/nativetask/tree/native_output_collector for easy review. Currently the code is patched againt Hadoop2.2. Some features highlights: 1. Full performance test covered https://github.com/intel-hadoop/nativetask/tree/native_output_collector#what-is-the-benefit 2. Support all values types which extends Writable. 3. Support all key types in hadoop.io, and most key types in project hive, pig, mahout, hbase. For a list of supported key types, please check https://github.com/intel-hadoop/nativetask/wiki#supported-key-types 4. Fully support java combiner. 5. Support large key and values. 6. A full test suite for key value combination. 7. Support GZIP, LZ4, and Snappy. Items we are still working on: 1. Extract support for Hive/Pig/HBase/Mahout platforms to standalone jars, and decouple the dependency with native task source code. 2. More documents describing the api. For design, test, and doc, please check https://github.com/intel-hadoop/nativetask/tree/native_output_collector https://github.com/intel-hadoop/nativetask/wiki
          Hide
          Todd Lipcon added a comment -

          Hey Sean. Thanks for posting the updated code.

          I've been following the progress via Github for a couple of months and also had some in-person discussion with the authors a while back to learn about this project. It seems like this would be a good addition to Hadoop – it offers some substantial performance improvements for many jobs, both in latency and total cluster throughput. While it would be possible to distribute this as a separate downloadable (eg on github or a separate incubator project) it seems like it would be better for the project to be part of the core.

          If the MR developer community generally agrees this belongs in the core, I'd like to start a feature branch for it in order to import the current code, sort out the build/integration issues, and take care of the remaining items that Sean mentioned above. I'll volunteer to be a committer "shepherd" for the branch and help ensure that all the code is properly reviewed and up to our usual contribution standards around licensing, testing, etc. I think a feature branch is better than trying to sort out the remaining tasks over in github.

          What do other folks think?

          Show
          Todd Lipcon added a comment - Hey Sean. Thanks for posting the updated code. I've been following the progress via Github for a couple of months and also had some in-person discussion with the authors a while back to learn about this project. It seems like this would be a good addition to Hadoop – it offers some substantial performance improvements for many jobs, both in latency and total cluster throughput. While it would be possible to distribute this as a separate downloadable (eg on github or a separate incubator project) it seems like it would be better for the project to be part of the core. If the MR developer community generally agrees this belongs in the core, I'd like to start a feature branch for it in order to import the current code, sort out the build/integration issues, and take care of the remaining items that Sean mentioned above. I'll volunteer to be a committer "shepherd" for the branch and help ensure that all the code is properly reviewed and up to our usual contribution standards around licensing, testing, etc. I think a feature branch is better than trying to sort out the remaining tasks over in github. What do other folks think?
          Hide
          Todd Lipcon added a comment -

          Also, just to clarify – the current goal is to contribute the transparent "map output collector" improvement. The wiki pages also mention a full task framework which includes an ability for users to author tasks fully in C++. That's an interesting extension to pursue in the future, but as I understand it, the current state of the github repository is just the transparent output collector improvements, which require no changes in user code so long as they are using standard writables, and only some pretty simple coding (akin to writing a RawComparator) if they are using custom types.

          Show
          Todd Lipcon added a comment - Also, just to clarify – the current goal is to contribute the transparent "map output collector" improvement. The wiki pages also mention a full task framework which includes an ability for users to author tasks fully in C++. That's an interesting extension to pursue in the future, but as I understand it, the current state of the github repository is just the transparent output collector improvements, which require no changes in user code so long as they are using standard writables, and only some pretty simple coding (akin to writing a RawComparator) if they are using custom types.
          Hide
          Todd Lipcon added a comment -

          BTW, one quick question: I noticed in your README that you support CRC32C checksums on IFile. However, the Java code currently hard-codes CRC32. Do we need a JIRA to make this configurable (or just switch Java over to CRC32C since it's faster in Java too?) Seems like we could start that work on trunk in parallel with importing the native code.

          Show
          Todd Lipcon added a comment - BTW, one quick question: I noticed in your README that you support CRC32C checksums on IFile. However, the Java code currently hard-codes CRC32. Do we need a JIRA to make this configurable (or just switch Java over to CRC32C since it's faster in Java too?) Seems like we could start that work on trunk in parallel with importing the native code.
          Hide
          Todd Lipcon added a comment -

          I confirmed offline with Binglin that he's not actively working on this and happy to have it reassigned. It seems like Sean is driving this one forward now, so reassigning to him.

          Show
          Todd Lipcon added a comment - I confirmed offline with Binglin that he's not actively working on this and happy to have it reassigned. It seems like Sean is driving this one forward now, so reassigning to him.
          Hide
          Arun C Murthy added a comment -

          If the MR developer community generally agrees this belongs in the core, I'd like to start a feature branch for it in order to import the current code, sort out the build/integration issues, and take care of the remaining items that Sean mentioned above.

          Todd Lipcon Thanks for starting this discussion. I have a few thoughts I'd like to run by you.

          I think the eventual goal of this (looking at https://github.com/intel-hadoop/nativetask/blob/master/README.md) is a full-native runtime for MapReduce including sort, shuffle, merge etc.

          Hence, it does look like we will achieve a compatible, but alternate implementation of MapReduce runtime. Hence, this is similar to other alternate runtimes for MapReduce such as Apache Tez.

          Furthermore, this is implemented in C++ - which is, frankly, a concern for the poor job C++ has done with ABI. I'm glad to see that it doesn't rely on boost - the worst affender. This is the same reason the native Hadoop client (HADOOP-10388) is being done purely in C. Also, the MR development community is pre-dominantly Java, which is something to keep in mind. This is a big concern for me.

          In all, it seems to me we could consider having this not in Apache Hadoop, but as an incubator project to develop a native, MR compatible runtime.

          This will allow it to develop a like-minded community (C++ skills etc.) and not be bogged down by all of Hadoop's requirements such as security (how/when will this allow for secure shuffle or encrypted shuffle etc.), compatibility with several OSes (flavours of Linux, MacOSX, Windows) etc. It will also allow them to ship independently and get user feedback more quickly.

          Similarly, I am wary of importing a nearly 75K LOC codebase into a stable project and it's impact on our releases on breakage - particularly given the difference in skills of the community i.e. Java v/s C++ etc.

          What do you think Todd & Sean? I'm more than happy to help with incubator process if required.

          Show
          Arun C Murthy added a comment - If the MR developer community generally agrees this belongs in the core, I'd like to start a feature branch for it in order to import the current code, sort out the build/integration issues, and take care of the remaining items that Sean mentioned above. Todd Lipcon Thanks for starting this discussion. I have a few thoughts I'd like to run by you. I think the eventual goal of this (looking at https://github.com/intel-hadoop/nativetask/blob/master/README.md ) is a full-native runtime for MapReduce including sort, shuffle, merge etc. Hence, it does look like we will achieve a compatible, but alternate implementation of MapReduce runtime. Hence, this is similar to other alternate runtimes for MapReduce such as Apache Tez. Furthermore, this is implemented in C++ - which is, frankly, a concern for the poor job C++ has done with ABI. I'm glad to see that it doesn't rely on boost - the worst affender. This is the same reason the native Hadoop client ( HADOOP-10388 ) is being done purely in C. Also, the MR development community is pre-dominantly Java, which is something to keep in mind. This is a big concern for me. In all, it seems to me we could consider having this not in Apache Hadoop, but as an incubator project to develop a native, MR compatible runtime. This will allow it to develop a like-minded community (C++ skills etc.) and not be bogged down by all of Hadoop's requirements such as security (how/when will this allow for secure shuffle or encrypted shuffle etc.), compatibility with several OSes (flavours of Linux, MacOSX, Windows) etc. It will also allow them to ship independently and get user feedback more quickly. Similarly, I am wary of importing a nearly 75K LOC codebase into a stable project and it's impact on our releases on breakage - particularly given the difference in skills of the community i.e. Java v/s C++ etc. What do you think Todd & Sean? I'm more than happy to help with incubator process if required.
          Hide
          Arun C Murthy added a comment -

          I also noticed that the github has a bunch of code related to Pig, Hive etc. - I think we'd all agree that they need to be in respective projects eventually.

          Show
          Arun C Murthy added a comment - I also noticed that the github has a bunch of code related to Pig, Hive etc. - I think we'd all agree that they need to be in respective projects eventually.
          Hide
          Arun C Murthy added a comment -

          On related thought to Pig/Hive etc. - I see Hadoop MapReduce fading away fast particularly since projects using MR such as Pig, Hive, Cascading etc. re-vector on other projects like Apache Tez or Apache Spark.

          For e.g.

          1. Hive-on-Tez (https://issues.apache.org/jira/browse/HIVE-4660) - The hive community has already moved it's major investments away from MR to Tez.
          2. Pig-on-Tez (https://issues.apache.org/jira/browse/PIG-3446) - The pig community is very close to shipping this in pig-0.14 and again is investing heavily on Tez.

          Given that, Sean/Todd, would it be useful to discuss contributing this to Tez instead?

          This way the work here would continue to stay relevant in the context of the majority users of MapReduce who use Pig, Hive, Cascading etc.

          Of course, I'm sure another option is Apache Spark, but given that Tez is much more closer (code-base wise) to MR, it would be much easier to contribute to Tez. Happy to help if that makes sense too. Thanks.

          Show
          Arun C Murthy added a comment - On related thought to Pig/Hive etc. - I see Hadoop MapReduce fading away fast particularly since projects using MR such as Pig, Hive, Cascading etc. re-vector on other projects like Apache Tez or Apache Spark. For e.g. Hive-on-Tez ( https://issues.apache.org/jira/browse/HIVE-4660 ) - The hive community has already moved it's major investments away from MR to Tez. Pig-on-Tez ( https://issues.apache.org/jira/browse/PIG-3446 ) - The pig community is very close to shipping this in pig-0.14 and again is investing heavily on Tez. Given that, Sean/Todd, would it be useful to discuss contributing this to Tez instead? This way the work here would continue to stay relevant in the context of the majority users of MapReduce who use Pig, Hive, Cascading etc. Of course, I'm sure another option is Apache Spark, but given that Tez is much more closer (code-base wise) to MR, it would be much easier to contribute to Tez. Happy to help if that makes sense too. Thanks.
          Hide
          Gopal V added a comment -

          From my previous experience with MAPREDUCE-4755, I found adding a new Sort buffer impl to Tez to be far simpler.

          The 4755 patch lives on as the multi-core PipelinedSorter in Tez, in case you need a reference hook for alt-sorter impls there.

          Show
          Gopal V added a comment - From my previous experience with MAPREDUCE-4755 , I found adding a new Sort buffer impl to Tez to be far simpler. The 4755 patch lives on as the multi-core PipelinedSorter in Tez, in case you need a reference hook for alt-sorter impls there.
          Hide
          Todd Lipcon added a comment -

          Hey Arun.

          I agree that building a completely parallel C++ MR runtime is a much larger project that should not be part of Hadoop. However, per my above comment, the current goal for contribution is only the MapOutputCollector. That is to say, there is no ABI concern, because the interface exposed to application developers is really quite small (unlike something like pipes or a full task SDK). I realize it's confusing because the github repo does mention these larger goals of the project, but I agree with you that they are not worth the cost-benefit within MR itself.

          I also noticed that the github has a bunch of code related to Pig, Hive etc. - I think we'd all agree that they need to be in respective projects eventually.

          Fully agree. I raised the same concern offline to Sean and he is working on making the "platform support" use a ServiceLoader instead of explicitly registering all of the other frameworks inside the core code. (his comment "Extract support for Hive/Pig/HBase/Mahout platforms to standalone jars, and decouple the dependency with native task source code" is referring to this).

          Similarly, I am wary of importing a nearly 75K LOC codebase into a stable project

          I think the 75k you're counting may include the auto-generated shell scripts. By my count, the non-test Java code is 3k lines, some of which is boilerplate around the different platform implementations (which would move into those projects). If you exclude the thirdparty dependencies which are bundled into the repo, the non-test C++ source is 10kloc.

          So, it's not a tiny import by any means, but for 2x improvement on terasort wallclock, my opinion is that the maintenance burden is worth it.

          As for importing to Tez, I don't think the community has generally agreed to EOL MapReduce In other words, if you feel that Tez is the future, it makes sense for you to not work on any further MR optimizations, but shouldn't preclude others from doing so. There are lots of production apps on MR today that do not want to switch to a new framework (regardless of compatibility layers), so I think this would be valuable for the community.

          Show
          Todd Lipcon added a comment - Hey Arun. I agree that building a completely parallel C++ MR runtime is a much larger project that should not be part of Hadoop. However, per my above comment, the current goal for contribution is only the MapOutputCollector . That is to say, there is no ABI concern, because the interface exposed to application developers is really quite small (unlike something like pipes or a full task SDK). I realize it's confusing because the github repo does mention these larger goals of the project, but I agree with you that they are not worth the cost-benefit within MR itself. I also noticed that the github has a bunch of code related to Pig, Hive etc. - I think we'd all agree that they need to be in respective projects eventually. Fully agree. I raised the same concern offline to Sean and he is working on making the "platform support" use a ServiceLoader instead of explicitly registering all of the other frameworks inside the core code. (his comment "Extract support for Hive/Pig/HBase/Mahout platforms to standalone jars, and decouple the dependency with native task source code" is referring to this). Similarly, I am wary of importing a nearly 75K LOC codebase into a stable project I think the 75k you're counting may include the auto-generated shell scripts. By my count, the non-test Java code is 3k lines, some of which is boilerplate around the different platform implementations (which would move into those projects). If you exclude the thirdparty dependencies which are bundled into the repo, the non-test C++ source is 10kloc. So, it's not a tiny import by any means, but for 2x improvement on terasort wallclock, my opinion is that the maintenance burden is worth it. As for importing to Tez, I don't think the community has generally agreed to EOL MapReduce In other words, if you feel that Tez is the future, it makes sense for you to not work on any further MR optimizations, but shouldn't preclude others from doing so. There are lots of production apps on MR today that do not want to switch to a new framework (regardless of compatibility layers), so I think this would be valuable for the community.
          Hide
          Arun C Murthy added a comment - - edited

          Todd,

          I agree that building a completely parallel C++ MR runtime is a much larger project that should not be part of Hadoop.

          I'm confused. There already exists large amounts of code on the github for a the full task runtime. Is that abandoned? Are you saying there no intention to contribute that to Hadoop, ever? Why would that be? Would that be a separate project?

          With or without ABI, C++ still is a major problem w.r.t different compiler versions, different platforms we support etc. That is precisely why HADOOP-10388 chose to use pure-C only. A similar switch makes me much more comfortable, aside from the disparity in skills in the Hadoop community.

          Furthermore, there are considerably more security issues which open up in C++ land such as buffer overflow etc.


          I think the 75k you're counting may include the auto-generated shell scripts.

          From the github:

          $ find . -name *.java | xargs wc -l
             11988 total
          $ find . -name *.h | xargs wc -l
             27269 total
          $ find . -name *.cc | xargs wc -l
             26276 total
          

          Whether it's test or non-test, we are still importing a lot of code - code for which the Hadoop community does need to maintain?


          So, it's not a tiny import by any means, but for 2x improvement on terasort wallclock, my opinion is that the maintenance burden is worth it.

          Todd, as we both know, there are many, many ways to get 2x improvement on terasort...
          ... nor is it worth a lot in real-world outside of benchmarks.

          I'm sure we both would take 2x on Pig/Hive anyday... smile


          As for importing to Tez, I don't think the community has generally agreed to EOL MapReduce

          Regardless of whether or not we pull this into MR, it would be useful to pull it into Tez too - if Sean wants to do it. Let's not discourage them.

          I'm sure we both agree, and want to see real world workloads improve and that Hive/Pig/Cascading etc. represent that.

          IAC, hopefully we can stop this meme that I'm trying to preclude you from doing anything regardless of my beliefs. IAC, we both realize MR is reasonably stable and won't get a lot of investment, and so do our employers:
          http://vision.cloudera.com/mapreduce-spark/
          http://hortonworks.com/hadoop/tez/

          Essentially, you asked for feedback from the MapReduce community; and this is my honest feedback - as someone who has actively helped maintain this codebase for more than 8 years now. So, I'd appreciate if we don't misinterpret each others' technical opinions and concerns during this discussion. Thanks in advance.

          FTR: I'll restate my concerns about C+, roadmap for C+ runtime, maintainability, support for all of Hadoop (new security bugs, future security features, platforms etc.).

          Furthermore, this jira was opened nearly 3 years ago and only has sporadic bursts of activity - not a good sign for long-term maintainability.

          I've stated my concerns, let's try get through them by focussing on those aspects.


          Finally, what is the concern you see with starting this as an incubator project and allowing folks to develop a community around it? We can certainly help on our end by making it easy for them to plug in via interfaces etc.

          Thanks.

          Show
          Arun C Murthy added a comment - - edited Todd, I agree that building a completely parallel C++ MR runtime is a much larger project that should not be part of Hadoop. I'm confused. There already exists large amounts of code on the github for a the full task runtime. Is that abandoned? Are you saying there no intention to contribute that to Hadoop, ever? Why would that be? Would that be a separate project? With or without ABI, C++ still is a major problem w.r.t different compiler versions, different platforms we support etc. That is precisely why HADOOP-10388 chose to use pure-C only. A similar switch makes me much more comfortable, aside from the disparity in skills in the Hadoop community. Furthermore, there are considerably more security issues which open up in C++ land such as buffer overflow etc. I think the 75k you're counting may include the auto-generated shell scripts. From the github: $ find . -name *.java | xargs wc -l 11988 total $ find . -name *.h | xargs wc -l 27269 total $ find . -name *.cc | xargs wc -l 26276 total Whether it's test or non-test, we are still importing a lot of code - code for which the Hadoop community does need to maintain? So, it's not a tiny import by any means, but for 2x improvement on terasort wallclock, my opinion is that the maintenance burden is worth it. Todd, as we both know, there are many, many ways to get 2x improvement on terasort... ... nor is it worth a lot in real-world outside of benchmarks. I'm sure we both would take 2x on Pig/Hive anyday... smile As for importing to Tez, I don't think the community has generally agreed to EOL MapReduce Regardless of whether or not we pull this into MR, it would be useful to pull it into Tez too - if Sean wants to do it. Let's not discourage them. I'm sure we both agree, and want to see real world workloads improve and that Hive/Pig/Cascading etc. represent that. IAC, hopefully we can stop this meme that I'm trying to preclude you from doing anything regardless of my beliefs. IAC, we both realize MR is reasonably stable and won't get a lot of investment, and so do our employers: http://vision.cloudera.com/mapreduce-spark/ http://hortonworks.com/hadoop/tez/ Essentially, you asked for feedback from the MapReduce community; and this is my honest feedback - as someone who has actively helped maintain this codebase for more than 8 years now. So, I'd appreciate if we don't misinterpret each others' technical opinions and concerns during this discussion. Thanks in advance. FTR: I'll restate my concerns about C+ , roadmap for C + runtime, maintainability, support for all of Hadoop (new security bugs, future security features, platforms etc.). Furthermore, this jira was opened nearly 3 years ago and only has sporadic bursts of activity - not a good sign for long-term maintainability. I've stated my concerns, let's try get through them by focussing on those aspects. Finally, what is the concern you see with starting this as an incubator project and allowing folks to develop a community around it? We can certainly help on our end by making it easy for them to plug in via interfaces etc. Thanks.
          Hide
          Sean Zhong added a comment -

          First, Arun and Todd, thank you both for your honest opinions! You are both respected!

          I believe the differences will narrow after we see the same facts, I'd like to state the facts and clarify some confusions:

          1. How many lines of code on earth?

          Here is a breakdown for branch https://github.com/intel-hadoop/nativetask/tree/native_output_collector:

          java code(*.java) 122 files, 8057 lines
          nativetask 62 files, 4080 lines
          nativetask Unit Test 14 files, 1222 lines
          other platform pig/mahout/hbase/hive 25 files, 477 lines
          scenario test 21 files, 2278 lines,

          native code(*.h, *.cc) 128 file, 47048 lines
          nativetask 85 files, 11713 lines
          nativetask Unit Test 33 files, 4911 lines,
          otherPlatform pig/mahout/hbase/hive 2 files, 1083 lines,
          thirdparty gtest lib header files 3 files, 28699 lines
          thirdparty lz4/snappy/cityhash 5 files, 642 lines

          (Note: All license header lines in each source file are not counted, blanks and other comments are counted)

          If we measure the LOC in the sense of code complexity, then:
          Third party code like google test header files should not be counted,gtest head alone has 28699 lines of code.
          Pig/mahout/hbase/hive code will be removed from the code repository eventually, and should not be counted.
          Scenario test code may not be included, as you can always write new scenario tests.

          So after the deduction, effective code contains,
          NativeTask Source Code(java + native C++): 15793 lines
          NativeTask Unit test Code(java + native C++): 6133 lines

          2. Is this patch used as alternate implementation of MapReduce runtime, like TEZ?

          No, the whole purpose of this patch submission is to act as an Map Output Collector, which transparently improve MapReduce performance, NOT as a new MR engine.

          The code is posted at branch https://github.com/intel-hadoop/nativetask/tree/native_output_collector, it only includes code for map output collector.

          3. Why there are Pig/Mahout/HBase/Hive code in native task source code?

          We are working on removing platform(Hive/Pig/HBase/Mahout) code from native task source code a I commented above, and provide them as standalone jars.
          We rushed to post the link without fully cleanup so that we can get some early feedback from community.

          4. Is the Full native runtime included?

          No, full native runtime is not included in this patch, and related code is stripped. Repo https://github.com/intel-hadoop/nativetask/tree/native_output_collector only contains code for transparent collector.

          5. Are there intention to contribute the full native runtime node to Hadoop? or act as a separate project?

          It is not the purpose of this patch to support full native runtime mode, the goal of this patch is to make existing MR job runs better on modern CPU with native map output collector.

          Full native runtime mode is another topic, there is a long way for that to be ready for submission, We don't want to consider that now.

          6. Are there interface compatibility issue?

          This patch is not about full native runtime mode which supports native mapper and native reducer.

          This patch is only about a custom map output collector in transparent mode. We are using existing java interfaces, and people are still running their java version mapper/reducer without re-compilation. User can make a small configure change to enable this nativetask collector. When there is a case that nativetask don't support, it will simply fallback to default MR implementation.

          7. Are there C++ ABI issue?

          The concern make sense.

          Regarding ABI, if the user don't need custom key comparator, he will never need to implement native comparator on nativetask.so, so no ABI issue.
          If the user do want to write a native comparator, the nativetask native interface involved is very limited, only:

          typedef int (*ComparatorPtr)(const char * src, uint32_t srcLength, const char * dest,
          uint32_t destLength);

          However, the current code will assume user to include whole "NativeTask.h", which contains more stuff than the typedef above.
          We will work on this to make sure that "NativeTask.h" only expose necessary minimum API. After we do this, there should be no big ABI issue.

          8. How can you make sure the quality of this code?

          The code has been actively developed more than 1 year. It has been used and tested in production for a very long time, and there are also full set of unit test and scenario test for coverage.

          9. Can this be worked on TEZ instead?

          We believe it is good for MapReduce, we know people are still using Mapreduce, that justify the reason that we submit the patch.

          Whether TEZ can benefit from this feature or not should not impact the decision made here, as they are two projects.

          If the value of this patch is appreciated, we are totally open to ALSO migrate this to TEZ. We may want your help on it.

          10. How to make sure the code will be well maintained in the future, considering the fact that the bug is opened so long ago and not maintained well?

          If you check commit history at https://github.com/intel-hadoop/nativetask/commits/native_output_collector, you will find that the code has been actively developed continuously over years, and they are in public. Till now, there are 319 commits. I personally started to work on this project since end of 2012.

          Now We post it here because we believe after all these commits the code quality has reached a level that worth contributing back formally to hadoop. The code is stable, and has been well tested in production environment. We are very serious on this and definitely will take responsibility to maintain it. Besides, we have Todd's help on this.

          11. Why you still work on MapReduce, as the new investment is not in Mapreduce?

          We believe as long as there are still big user base who use MapReduce in daily job, no matter how slow MR new feature evolves or where the new investment goes, there is still value for us to contribute to it. Intel has a history to support many decades old projects.

          We are also open to apply the "nativetask" ideas to other projects, but it is not directly related with this jira.

          12. How to support encrypted shuffle in nativetask?

          First, there are a flag to enable/disable the native output collector feature, with full control from end user.

          Second, if there are cases nativetask don't support, nativetask will simply fallback to default mapreduce implementation.

          For encrypted shuffle, we will fallback to default MR implementation. However, we are totally open to support further cases like this as long as there are still performance benefits, otherwise we will just fallback.

          13. Why the code is at github https://github.com/intel-hadoop/nativetask/tree/native_output_collector?

          This is just a place for early code review and get some early feedbacks. We will submit it as a standard patch after we clean up hive/pig platforms.

          14. Why not put it as incubator project instead of putting this in MR code base?

          We think:
          a) It is good for mapreduce, it help mapreduce to run better.
          b) More mapreduce user can use this. It is generally more easy for a user to update hadoop and use this feature if provided in hadoop package.
          c) The performance benefit for mapreduce is big. We believe low level CPU optimization is still important, knowing the facts that there are other optimization on computation flow like TEZ, Spark, Impala and etc..
          d) This map output collector is transparent, it is not unreasoned to provide this as a alternative collector in hadoop package. there are similar practice before, like the fair scheduler is bundled in similar way.
          e) Risk is low. Nativetask will fallback to default MR implementation if some case is not supported or goes wrong.
          f) Change is small. There are only 10 lines of code change required for hadoop, we expect no regression possibility. Patch for hadoop is provided at:
          https://github.com/intel-hadoop/nativetask/blob/native_output_collector/patch/hadoop-2.patch Other nativetask code are self-contained, and won't pollute mapreduce.
          In runtime, we only requires deploying the nativetask jars to hadoop directory when installing.

          These thinkings make us tend to feel that it is better to do this in hadoop instead of a new project.

          The above questions was interpreted from the comments in a very subjective manner, I suspect there will be points missing or wrongly interpreted that need further elaboration or correction. What are them?

          I am sure there are still concerns for this patch, but before we say no to reject, maybe we should pause and think:
          Are there still big user base using Mapreduce? Is this patch valuable for those users? Is the risk involved by this patch controlled? Can these concerns be solved by improving the patch?

          In the hadoop eco-system, there are many high level innovation like TEZ, Impala, Spark and etc.., but there are far less innovation or optimization in micro-architecture. This patch is a try.

          Thanks

          Show
          Sean Zhong added a comment - First, Arun and Todd, thank you both for your honest opinions! You are both respected! I believe the differences will narrow after we see the same facts, I'd like to state the facts and clarify some confusions: 1. How many lines of code on earth? Here is a breakdown for branch https://github.com/intel-hadoop/nativetask/tree/native_output_collector: java code(*.java) 122 files, 8057 lines nativetask 62 files, 4080 lines nativetask Unit Test 14 files, 1222 lines other platform pig/mahout/hbase/hive 25 files, 477 lines scenario test 21 files, 2278 lines, native code(*.h, *.cc) 128 file, 47048 lines nativetask 85 files, 11713 lines nativetask Unit Test 33 files, 4911 lines, otherPlatform pig/mahout/hbase/hive 2 files, 1083 lines, thirdparty gtest lib header files 3 files, 28699 lines thirdparty lz4/snappy/cityhash 5 files, 642 lines (Note: All license header lines in each source file are not counted, blanks and other comments are counted) If we measure the LOC in the sense of code complexity, then: Third party code like google test header files should not be counted,gtest head alone has 28699 lines of code. Pig/mahout/hbase/hive code will be removed from the code repository eventually, and should not be counted. Scenario test code may not be included, as you can always write new scenario tests. So after the deduction, effective code contains, NativeTask Source Code(java + native C++): 15793 lines NativeTask Unit test Code(java + native C++): 6133 lines 2. Is this patch used as alternate implementation of MapReduce runtime, like TEZ? No, the whole purpose of this patch submission is to act as an Map Output Collector, which transparently improve MapReduce performance, NOT as a new MR engine. The code is posted at branch https://github.com/intel-hadoop/nativetask/tree/native_output_collector , it only includes code for map output collector. 3. Why there are Pig/Mahout/HBase/Hive code in native task source code? We are working on removing platform(Hive/Pig/HBase/Mahout) code from native task source code a I commented above, and provide them as standalone jars. We rushed to post the link without fully cleanup so that we can get some early feedback from community. 4. Is the Full native runtime included? No, full native runtime is not included in this patch, and related code is stripped. Repo https://github.com/intel-hadoop/nativetask/tree/native_output_collector only contains code for transparent collector. 5. Are there intention to contribute the full native runtime node to Hadoop? or act as a separate project? It is not the purpose of this patch to support full native runtime mode, the goal of this patch is to make existing MR job runs better on modern CPU with native map output collector. Full native runtime mode is another topic, there is a long way for that to be ready for submission, We don't want to consider that now. 6. Are there interface compatibility issue? This patch is not about full native runtime mode which supports native mapper and native reducer. This patch is only about a custom map output collector in transparent mode. We are using existing java interfaces, and people are still running their java version mapper/reducer without re-compilation. User can make a small configure change to enable this nativetask collector. When there is a case that nativetask don't support, it will simply fallback to default MR implementation. 7. Are there C++ ABI issue? The concern make sense. Regarding ABI, if the user don't need custom key comparator, he will never need to implement native comparator on nativetask.so, so no ABI issue. If the user do want to write a native comparator, the nativetask native interface involved is very limited, only: typedef int (*ComparatorPtr)(const char * src, uint32_t srcLength, const char * dest, uint32_t destLength); However, the current code will assume user to include whole "NativeTask.h", which contains more stuff than the typedef above. We will work on this to make sure that "NativeTask.h" only expose necessary minimum API. After we do this, there should be no big ABI issue. 8. How can you make sure the quality of this code? The code has been actively developed more than 1 year. It has been used and tested in production for a very long time, and there are also full set of unit test and scenario test for coverage. 9. Can this be worked on TEZ instead? We believe it is good for MapReduce, we know people are still using Mapreduce, that justify the reason that we submit the patch. Whether TEZ can benefit from this feature or not should not impact the decision made here, as they are two projects. If the value of this patch is appreciated, we are totally open to ALSO migrate this to TEZ. We may want your help on it. 10. How to make sure the code will be well maintained in the future, considering the fact that the bug is opened so long ago and not maintained well? If you check commit history at https://github.com/intel-hadoop/nativetask/commits/native_output_collector , you will find that the code has been actively developed continuously over years, and they are in public. Till now, there are 319 commits. I personally started to work on this project since end of 2012. Now We post it here because we believe after all these commits the code quality has reached a level that worth contributing back formally to hadoop. The code is stable, and has been well tested in production environment. We are very serious on this and definitely will take responsibility to maintain it. Besides, we have Todd's help on this. 11. Why you still work on MapReduce, as the new investment is not in Mapreduce? We believe as long as there are still big user base who use MapReduce in daily job, no matter how slow MR new feature evolves or where the new investment goes, there is still value for us to contribute to it. Intel has a history to support many decades old projects. We are also open to apply the "nativetask" ideas to other projects, but it is not directly related with this jira. 12. How to support encrypted shuffle in nativetask? First, there are a flag to enable/disable the native output collector feature, with full control from end user. Second, if there are cases nativetask don't support, nativetask will simply fallback to default mapreduce implementation. For encrypted shuffle, we will fallback to default MR implementation. However, we are totally open to support further cases like this as long as there are still performance benefits, otherwise we will just fallback. 13. Why the code is at github https://github.com/intel-hadoop/nativetask/tree/native_output_collector? This is just a place for early code review and get some early feedbacks. We will submit it as a standard patch after we clean up hive/pig platforms. 14. Why not put it as incubator project instead of putting this in MR code base? We think: a) It is good for mapreduce, it help mapreduce to run better. b) More mapreduce user can use this. It is generally more easy for a user to update hadoop and use this feature if provided in hadoop package. c) The performance benefit for mapreduce is big. We believe low level CPU optimization is still important, knowing the facts that there are other optimization on computation flow like TEZ, Spark, Impala and etc.. d) This map output collector is transparent, it is not unreasoned to provide this as a alternative collector in hadoop package. there are similar practice before, like the fair scheduler is bundled in similar way. e) Risk is low. Nativetask will fallback to default MR implementation if some case is not supported or goes wrong. f) Change is small. There are only 10 lines of code change required for hadoop, we expect no regression possibility. Patch for hadoop is provided at: https://github.com/intel-hadoop/nativetask/blob/native_output_collector/patch/hadoop-2.patch Other nativetask code are self-contained, and won't pollute mapreduce. In runtime, we only requires deploying the nativetask jars to hadoop directory when installing. These thinkings make us tend to feel that it is better to do this in hadoop instead of a new project. The above questions was interpreted from the comments in a very subjective manner, I suspect there will be points missing or wrongly interpreted that need further elaboration or correction. What are them? I am sure there are still concerns for this patch, but before we say no to reject, maybe we should pause and think: Are there still big user base using Mapreduce? Is this patch valuable for those users? Is the risk involved by this patch controlled? Can these concerns be solved by improving the patch? In the hadoop eco-system, there are many high level innovation like TEZ, Impala, Spark and etc.., but there are far less innovation or optimization in micro-architecture. This patch is a try. Thanks
          Hide
          Todd Lipcon added a comment -

          Hey Arun. I think Sean did a good job answering many of your questions, but here are a few more responses to specifics from your earlier comment.

          I'm confused. There already exists large amounts of code on the github for a the full task runtime. Is that abandoned? Are you saying there no intention to contribute that to Hadoop, ever? Why would that be? Would that be a separate project?

          As Sean pointed out, there is a branch on github which is just the native collector.

          I won't speak for Sean, but my own opinion is the same as yours with regard to the runtime. If we want to make a full native MR framework, it's a larger project that should probably be in the incubator and build on APIs exposed by YARN and MR. A strict accelerator of the existing MR, though, doesn't seem to make as much sense as a separate project.

          C++ still is a major problem w.r.t different compiler versions

          So long as you avoid C++11, C++ can be very portable. AFAIK there is no usage of C++11 features in this contribution, and I would agree with you that we should avoid them and stick to a proper subset of C++. Personally, I am currently working on another project which uses C++ with the Google style guidelines and we have no problem building on a wide variety of operating systems (despite having orders of magnitude more code and complexity).

          Furthermore, there are considerably more security issues which open up in C++ land such as buffer overflow etc.

          I'm not sure how this is a concern, since the new code only runs in the context of tasks, not daemons. C certainly has the same issues (and in my experience, buffer overflows and memory leaks are more common in C vs C++ due to lack of safe containers and smart pointers, but that's a separate discussion). It might be a stability concern, but that's easy to address with extensive testing, which Sean and team have already been doing for the past year or more.

          I'm sure we both would take 2x on Pig/Hive anyday... smile

          Well, it's not quite 2x, but the performance benchmarks referenced on the wiki show "hive aggregation" having a 50% improvement. So, while I agree that terasort is not representative of many workloads, Sean and his team have done a good job showing that this optimization benefits a large class of diverse workloads, with no change required to the upper-level framework.

          Furthermore, this jira was opened nearly 3 years ago and only has sporadic bursts of activity - not a good sign for long-term maintainability.

          I'm not sure that the past is indicative of the future here. Many times we open JIRAs and don't have time to fully push them to fruition until the future – eg YARN sat around on JIRA with sporadic activity for many years until your team at Yahoo really got started on it. Even then, if I recall correctly, a lot of the development happened in a separate repository before there was an initial code drop to a branch at Apache. The same is true of this project (though of course on much smaller scale) – the project idea was a few years back, and then it was developed in Intel's repository until it is now being proposed to be integrated.

          Finally, what is the concern you see with starting this as an incubator project and allowing folks to develop a community around it? We can certainly help on our end by making it easy for them to plug in via interfaces etc.

          The main concern is that it would be difficult for users to install/plug in. Speaking with my Apache hat on, I think this benefits all MR users and it would be great to say "Upgrade to 2.5, and your jobs will go 50% faster in many cases!" With my vendor hat on, it might actually be beneficial for this to live elsewhere – we could tout it as a unique feature of our distro But, I'm trying to do the right thing here for the community at large, and also encourage a new group of developers to make contributions to our project.

          <discussion of line counts, etc>

          My metrics were using the 'sloccount' program which counts non-comment non-empty lines. Sean already gave a good breakdown of the code. But, I think it's unimportant to squabble over details - my main point there was just that the contribution is meaty but not massive. It's also relatively simple code (eg entirely single-threaded) which is confined to the task (no concerns of daemon stability) and entirely optional (users can switch on a per-job basis whether to use this collector). I'd assume that in our first release we would leave the feature off by default and only make it on-by-default after we observe that many users have enabled it with good results. In the very worst case, since it is a fully transparent optimization, we could even remove it in a future version in a compatible manner if it turns out to be unused or irreparably unstable.

          Show
          Todd Lipcon added a comment - Hey Arun. I think Sean did a good job answering many of your questions, but here are a few more responses to specifics from your earlier comment. I'm confused. There already exists large amounts of code on the github for a the full task runtime. Is that abandoned? Are you saying there no intention to contribute that to Hadoop, ever? Why would that be? Would that be a separate project? As Sean pointed out, there is a branch on github which is just the native collector. I won't speak for Sean, but my own opinion is the same as yours with regard to the runtime. If we want to make a full native MR framework, it's a larger project that should probably be in the incubator and build on APIs exposed by YARN and MR. A strict accelerator of the existing MR, though, doesn't seem to make as much sense as a separate project. C++ still is a major problem w.r.t different compiler versions So long as you avoid C++11, C++ can be very portable. AFAIK there is no usage of C++11 features in this contribution, and I would agree with you that we should avoid them and stick to a proper subset of C++. Personally, I am currently working on another project which uses C++ with the Google style guidelines and we have no problem building on a wide variety of operating systems (despite having orders of magnitude more code and complexity). Furthermore, there are considerably more security issues which open up in C++ land such as buffer overflow etc. I'm not sure how this is a concern, since the new code only runs in the context of tasks, not daemons. C certainly has the same issues (and in my experience, buffer overflows and memory leaks are more common in C vs C++ due to lack of safe containers and smart pointers, but that's a separate discussion). It might be a stability concern, but that's easy to address with extensive testing, which Sean and team have already been doing for the past year or more. I'm sure we both would take 2x on Pig/Hive anyday... smile Well, it's not quite 2x, but the performance benchmarks referenced on the wiki show "hive aggregation" having a 50% improvement. So, while I agree that terasort is not representative of many workloads, Sean and his team have done a good job showing that this optimization benefits a large class of diverse workloads, with no change required to the upper-level framework. Furthermore, this jira was opened nearly 3 years ago and only has sporadic bursts of activity - not a good sign for long-term maintainability. I'm not sure that the past is indicative of the future here. Many times we open JIRAs and don't have time to fully push them to fruition until the future – eg YARN sat around on JIRA with sporadic activity for many years until your team at Yahoo really got started on it. Even then, if I recall correctly, a lot of the development happened in a separate repository before there was an initial code drop to a branch at Apache. The same is true of this project (though of course on much smaller scale) – the project idea was a few years back, and then it was developed in Intel's repository until it is now being proposed to be integrated. Finally, what is the concern you see with starting this as an incubator project and allowing folks to develop a community around it? We can certainly help on our end by making it easy for them to plug in via interfaces etc. The main concern is that it would be difficult for users to install/plug in. Speaking with my Apache hat on, I think this benefits all MR users and it would be great to say "Upgrade to 2.5, and your jobs will go 50% faster in many cases!" With my vendor hat on, it might actually be beneficial for this to live elsewhere – we could tout it as a unique feature of our distro But, I'm trying to do the right thing here for the community at large, and also encourage a new group of developers to make contributions to our project. <discussion of line counts, etc> My metrics were using the 'sloccount' program which counts non-comment non-empty lines. Sean already gave a good breakdown of the code. But, I think it's unimportant to squabble over details - my main point there was just that the contribution is meaty but not massive. It's also relatively simple code (eg entirely single-threaded) which is confined to the task (no concerns of daemon stability) and entirely optional (users can switch on a per-job basis whether to use this collector). I'd assume that in our first release we would leave the feature off by default and only make it on-by-default after we observe that many users have enabled it with good results. In the very worst case, since it is a fully transparent optimization, we could even remove it in a future version in a compatible manner if it turns out to be unused or irreparably unstable.
          Hide
          Todd Lipcon added a comment -

          At the risk of sounding like a broken record, the key points towards encouraging this contribution are:

          1. it is fully transparent, and thus revertible without impacting users in case it is later abandoned or otherwise problematic to maintain
          2. the code has a multi-year history of contribution from several different developers, which eliminates risk of abandonment
          3. the code is already in use in several production clusters, which indicates that end users find the improvement useful and stable in real-world applications
          4. benchmarks show a substantial performance improvement across a variety of workloads
          5. the new feature is 100% optional, with no changes to existing code paths, which eliminates any risk from users or vendors who prefer not to enable it

          Since I think Arun's questions have mostly been addressed, I'd like to continue making forward progress on this issue. Arun – could you clarify whether you are vetoing (-1) or just expressing some healthy skepticism (-0)?

          Show
          Todd Lipcon added a comment - At the risk of sounding like a broken record, the key points towards encouraging this contribution are: it is fully transparent, and thus revertible without impacting users in case it is later abandoned or otherwise problematic to maintain the code has a multi-year history of contribution from several different developers, which eliminates risk of abandonment the code is already in use in several production clusters, which indicates that end users find the improvement useful and stable in real-world applications benchmarks show a substantial performance improvement across a variety of workloads the new feature is 100% optional, with no changes to existing code paths, which eliminates any risk from users or vendors who prefer not to enable it Since I think Arun's questions have mostly been addressed, I'd like to continue making forward progress on this issue. Arun – could you clarify whether you are vetoing (-1) or just expressing some healthy skepticism (-0)?
          Hide
          Chris Douglas added a comment -

          If Sean Zhong is close to a patch, that would make the scope concrete. It sounds like there are more than zero changes to the framework (i.e., the MAPREDUCE-2454 API is insufficient), but fewer than a full replacement of the Task code with C++. Would it be difficult to produce and post a patch to ground the discussion?

          Show
          Chris Douglas added a comment - If Sean Zhong is close to a patch, that would make the scope concrete. It sounds like there are more than zero changes to the framework (i.e., the MAPREDUCE-2454 API is insufficient), but fewer than a full replacement of the Task code with C++. Would it be difficult to produce and post a patch to ground the discussion?
          Hide
          Todd Lipcon added a comment -

          The patch required for the output collector is just this one: https://github.com/intel-hadoop/nativetask/blob/native_output_collector/patch/hadoop-2.patch

          In fact, this just provides the "automatic fallback" functionality. That functionality is probably useful for all pluggable output collectors – happy to break it out to be distinct from the JIRA.

          The only other diff in that patch is a trivial addition to the Text writable implementation to allow setting a Text more easily from different serialization formats. I don't think it makes sense to break it out to a separate JIRA, but happy to do so if that makes things easier.

          Show
          Todd Lipcon added a comment - The patch required for the output collector is just this one: https://github.com/intel-hadoop/nativetask/blob/native_output_collector/patch/hadoop-2.patch In fact, this just provides the "automatic fallback" functionality. That functionality is probably useful for all pluggable output collectors – happy to break it out to be distinct from the JIRA. The only other diff in that patch is a trivial addition to the Text writable implementation to allow setting a Text more easily from different serialization formats. I don't think it makes sense to break it out to a separate JIRA, but happy to do so if that makes things easier.
          Hide
          Todd Lipcon added a comment -

          Hey Sean. Would you mind creating a patch file which can be applied to MR trunk which would add the native collector code into the Hadoop tree? Seems like we should probably create a new maven project inside hadoop-mapreduce-project/hadoop-mapreduce-client/ (eg something like hadoop-mapreduce-native-collector)? When a patch file is ready, I'll create a development branch and we can work from there to address the remaining issues mentioned above around pluggability.

          I also noticed that the current code uses autoconf, whereas Hadoop has generally standardized on cmake for native build. Let's add that to our list of things to do on the development branch.

          Show
          Todd Lipcon added a comment - Hey Sean. Would you mind creating a patch file which can be applied to MR trunk which would add the native collector code into the Hadoop tree? Seems like we should probably create a new maven project inside hadoop-mapreduce-project/hadoop-mapreduce-client/ (eg something like hadoop-mapreduce-native-collector)? When a patch file is ready, I'll create a development branch and we can work from there to address the remaining issues mentioned above around pluggability. I also noticed that the current code uses autoconf, whereas Hadoop has generally standardized on cmake for native build. Let's add that to our list of things to do on the development branch.

            People

            • Assignee:
              Sean Zhong
              Reporter:
              Binglin Chang
            • Votes:
              4 Vote for this issue
              Watchers:
              53 Start watching this issue

              Dates

              • Created:
                Updated:

                Development