Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.23.0
    • Fix Version/s: None
    • Component/s: performance, task
    • Labels:
      None

      Description

      When running oprofile on a terasort workload, I noticed that a large amount of CPU usage was going to MapTask$MapOutputBuffer.compare. Upon disassembling this and looking at cycle counters, most of the cycles were going to memory loads dereferencing into the array of key-value data – implying expensive cache misses. This can be avoided as follows:

      • rather than simply swapping indexes into the kv array, swap the entire meta entries in the meta array. Swapping 16 bytes is only negligibly slower than swapping 4 bytes. This requires adding the value-length into the meta array, since we used to rely on the previous-in-the-array meta entry to determine this. So we replace INDEX with VALUELEN and avoid one layer of indirection.
      • introduce an interface which allows key types to provide a 4-byte comparison proxy. For string keys, this can simply be the first 4 bytes of the string. The idea is that, if stringCompare(key1.proxy(), key2.proxy()) != 0, then compare(key1, key2) should have the same result. If the proxies are equal, the normal comparison method is used. We then include the 4-byte proxy as part of the metadata entry, so that for many cases the indirection into the data buffer can be avoided.

      On a terasort benchmark, these optimizations plus an optimization to WritableComparator.compareBytes dropped the aggregate mapside CPU millis by 40%, and the compare() routine mostly dropped off the oprofile results.

      1. hashed-sort-MAPREDUCE-3235.patch
        10 kB
        Gopal V
      2. map_sort_perf.diff
        8 kB
        Hal Mo
      3. mr-3235-poc.txt
        12 kB
        Todd Lipcon

        Issue Links

          Activity

          Hide
          Sandy Ryza added a comment -

          Creating subtasks for the two items so that we can work on them independently

          Show
          Sandy Ryza added a comment - Creating subtasks for the two items so that we can work on them independently
          Todd Lipcon made changes -
          Link This issue is related to MAPREDUCE-4755 [ MAPREDUCE-4755 ]
          Gopal V made changes -
          Attachment hashed-sort-MAPREDUCE-3235.patch [ 12552509 ]
          Gopal V made changes -
          Attachment hashed-sort-MAPREDUCE-3235.patch [ 12552654 ]
          Hide
          Gopal V added a comment -

          Update BinaryComparable.getPrefix() to always generated positive integers.

          Show
          Gopal V added a comment - Update BinaryComparable.getPrefix() to always generated positive integers.
          Hide
          Gopal V added a comment -

          Yes, approximately 90M.

          The patch I've put up does not break BC yet. Because none of the regular WritableComparators have been modified.

          This patch could be useful for a large common subset of cases (i.e ordering is natural byte ordinal) but might be detrimental to a few corner cases. Not a breaking change when I put it in a custom Comparator instead of being built-in - the breakage is only if Text.Comparator implemented the feature.

          BinaryComparable does export getPrefix(), but MapTask does not check for it & only serves as a short-cut to data in the custom Comparator class.

          I'm running the following as my primary test

          https://gist.github.com/636b0fb5f770b24b4512

          the 90M version failed to register any CPU numbers (running LocalJobRunner) - the prefix sorted version did show less GC time spent over-all.

          Ran with 5x the amount of data to see if it triggered any number differences

          trunk +hashcomparator %change
          time 204992 193958 -5.3%
          compares 1382152895 410352373 -70%

          But just comparing the output from the time command

          trunk = 207.70user 9.29system 3:26.76elapsed 104%CPU
          patched = 193.89user 9.37system 3:15.68elapsed 103%CPU

          The performance gain is more to do with avoiding the interface dispatch involved in calling comparator. The best I could tell any interface dispatch in JVM is a bit more expensive than direct class calls.

          The kvmeta being copied 16 bytes at a time is actually slower during sort, but my data set is perhaps skewed towards long sorted subsequences (i.e 500 of them already ordered), causing more swaps per compare than something more real-life.

          Show
          Gopal V added a comment - Yes, approximately 90M. The patch I've put up does not break BC yet. Because none of the regular WritableComparators have been modified. This patch could be useful for a large common subset of cases (i.e ordering is natural byte ordinal) but might be detrimental to a few corner cases. Not a breaking change when I put it in a custom Comparator instead of being built-in - the breakage is only if Text.Comparator implemented the feature. BinaryComparable does export getPrefix(), but MapTask does not check for it & only serves as a short-cut to data in the custom Comparator class. I'm running the following as my primary test https://gist.github.com/636b0fb5f770b24b4512 the 90M version failed to register any CPU numbers (running LocalJobRunner) - the prefix sorted version did show less GC time spent over-all. Ran with 5x the amount of data to see if it triggered any number differences trunk +hashcomparator %change time 204992 193958 -5.3% compares 1382152895 410352373 -70% But just comparing the output from the time command trunk = 207.70user 9.29system 3:26.76elapsed 104%CPU patched = 193.89user 9.37system 3:15.68elapsed 103%CPU The performance gain is more to do with avoiding the interface dispatch involved in calling comparator. The best I could tell any interface dispatch in JVM is a bit more expensive than direct class calls. The kvmeta being copied 16 bytes at a time is actually slower during sort, but my data set is perhaps skewed towards long sorted subsequences (i.e 500 of them already ordered), causing more swaps per compare than something more real-life.
          Hide
          Todd Lipcon added a comment -

          Hi Gopal. I agree the patch isn't suitable for commit right now as it could break stuff – was just using it to try to understand the cache effects.

          How big was the dataset that you're sorting? Somewhere around 90MB it sounds like? Were you able to compare cpu seconds elapsed? Even if the wall clock doesn't improve much, saving cpu-seconds means that you can run more tasks in parallel, etc.

          Show
          Todd Lipcon added a comment - Hi Gopal. I agree the patch isn't suitable for commit right now as it could break stuff – was just using it to try to understand the cache effects. How big was the dataset that you're sorting? Somewhere around 90MB it sounds like? Were you able to compare cpu seconds elapsed? Even if the wall clock doesn't improve much, saving cpu-seconds means that you can run more tasks in parallel, etc.
          Hide
          Gopal V added a comment -

          For a sort set of 100 x 99171 unique strings (/usr/share/dict/words on Ubuntu)

          trunk +hashcomparator
          275143706 74887595

          Comparisons per-item went from 27.74 per item to 7.55 per item. But this did not translate into any massive savings in execution time as most of it went into the spill & subsequent merge.

          Show
          Gopal V added a comment - For a sort set of 100 x 99171 unique strings (/usr/share/dict/words on Ubuntu) trunk +hashcomparator 275143706 74887595 Comparisons per-item went from 27.74 per item to 7.55 per item. But this did not translate into any massive savings in execution time as most of it went into the spill & subsequent merge.
          Gopal V made changes -
          Attachment hashed-sort-MAPREDUCE-3235.patch [ 12552509 ]
          Hide
          Gopal V added a comment -

          Update patch to trunk.

          Move the byte[] into an int and place it into the HashComparator interface instead of into Comparables

          Show
          Gopal V added a comment - Update patch to trunk. Move the byte[] into an int and place it into the HashComparator interface instead of into Comparables
          Hide
          Gopal V added a comment -

          My descending sorter fails to sort words in reverse with this patch on.

          A custom comparator breaks the prefix sort order - the prefix generation has to be moved to the comparator instead of being inherent to the comparable.

          Even then it is a breaking change for everyone who has done "extends Text.Comparator" & calling -1*(super.compare).

          But as long as we're breaking stuff - will work on making this patch work with my reverse sorter.

          Show
          Gopal V added a comment - My descending sorter fails to sort words in reverse with this patch on. A custom comparator breaks the prefix sort order - the prefix generation has to be moved to the comparator instead of being inherent to the comparable. Even then it is a breaking change for everyone who has done "extends Text.Comparator" & calling -1*(super.compare). But as long as we're breaking stuff - will work on making this patch work with my reverse sorter.
          Hide
          Hal Mo added a comment -

          I guess you're working with Krystal Mok? Cool stuff, I hope to see it make it into OpenJDK as well!

          Yes.
          Krystal has just left, and will join Oracle JVM core team soon.
          He has done some work on binary array compare, yet not finished, seems there are much codes to "copy" than expected.

          You mean that if there are more rack-local (as opposed to data-local), right? If everything is data-local (eg terasort on an empty cluster) then I would expect the CPU difference to make a more noticeable difference.

          Yes, I think so.
          We are trying to test it on hive.

          Show
          Hal Mo added a comment - I guess you're working with Krystal Mok? Cool stuff, I hope to see it make it into OpenJDK as well! Yes. Krystal has just left, and will join Oracle JVM core team soon. He has done some work on binary array compare, yet not finished, seems there are much codes to "copy" than expected. You mean that if there are more rack-local (as opposed to data-local), right? If everything is data-local (eg terasort on an empty cluster) then I would expect the CPU difference to make a more noticeable difference. Yes, I think so. We are trying to test it on hive.
          Hide
          Todd Lipcon added a comment -

          BTW, I know you are interested in JVM intrinsic binary array compare

          I guess you're working with Krystal Mok? Cool stuff, I hope to see it make it into OpenJDK as well!

          Almost the same, depends on if there are rack local maps. the more rack local maps, the slower.

          You mean that if there are more rack-local (as opposed to data-local), right? If everything is data-local (eg terasort on an empty cluster) then I would expect the CPU difference to make a more noticeable difference.

          Show
          Todd Lipcon added a comment - BTW, I know you are interested in JVM intrinsic binary array compare I guess you're working with Krystal Mok? Cool stuff, I hope to see it make it into OpenJDK as well! Almost the same, depends on if there are rack local maps. the more rack local maps, the slower. You mean that if there are more rack-local (as opposed to data-local), right? If everything is data-local (eg terasort on an empty cluster) then I would expect the CPU difference to make a more noticeable difference.
          Hide
          Hal Mo added a comment -

          @Todd

          How did the wall clock differ?

          Almost the same, depends on if there are rack local maps. the more rack local maps, the slower.

          BTW, I know you are interested in JVM intrinsic binary array compare. Our team might implement it, if we can see enough requirement. We work on JVM to support hadoop. For example, our GCIH share side data between maps(share java object directly between JVMs), saving 2000G physical memory on 200 nodes, and at the same time improve GC performance/job performance, for one of our critical daily job.

          Show
          Hal Mo added a comment - @Todd How did the wall clock differ? Almost the same, depends on if there are rack local maps. the more rack local maps, the slower. BTW, I know you are interested in JVM intrinsic binary array compare. Our team might implement it, if we can see enough requirement. We work on JVM to support hadoop. For example, our GCIH share side data between maps(share java object directly between JVMs), saving 2000G physical memory on 200 nodes, and at the same time improve GC performance/job performance, for one of our critical daily job.
          Hide
          Todd Lipcon added a comment -

          Nice test. So, the 13% improvement is total CPU for the job – whereas I measured CPU improvement on the map phase only and saw ~40%. How did the wall clock differ?

          Show
          Todd Lipcon added a comment - Nice test. So, the 13% improvement is total CPU for the job – whereas I measured CPU improvement on the map phase only and saw ~40%. How did the wall clock differ?
          Hide
          Hal Mo added a comment -

          > Todd Lipcon added a comment - 15/Jun/12 21:15
          >> Unit test passed, and got 10% improvement of cpu usage per node.
          > What workload? Terasort or something else?

          on 9-slaves cluster, test Terasort,spill only once(teragen 126*512M)

          parameter:
          -Dmapred.child.java.opts=-Xmx1g
          -Dio.sort.mb=647
          -Ddfs.block.size=536870912
          -Dio.sort.record.percent=0.167
          -Dmapred.map.tasks=126
          -Dmapred.reduce.tasks=126
          -Dio.sort.factor=100
          -Dmapred.compress.map.output=ture
          -Dmapred.map.output.compression.codec=org.apache.hadoop.io.compress.LzoCodec
          -Dio.sort.spill.percent=0.95
          -Dmapred.map.tasks.speculative.execution=false
          -Dmapred.reduce.tasks.speculative.execution=false

          result:
          (here was the how I got the cpu use:
          run cat /proc/stat|grep "cpu " | awk '

          {print $2+$3+$4+$7+$8+$9}

          '
          before and after the job, per each node, ignoring $5(idle) and $6(iowait),
          and subtract the two number)
          cpu use(original):
          run node1 node2 node3 node4 node5 node6 node7 node8 node9 cluster
          #1 95934 99531 96278 97429 87239 98085 96376 97491 96159 864522
          #2 93360 95457 94389 94158 83400 95927 98785 96480 97012 848968
          #3 97022 94855 101510 94534 83064 95922 96947 97071 96707 857632
          #4 94124 97135 95332 95126 86036 94352 97149 101701 95313 856268
          #5 92429 94422 94107 90331 83487 91857 94130 94866 92747 828376
          avg 851153

          cpu use(with patch):
          run node1 node2 node3 node4 node5 node6 node7 node8 node9 cluster
          #1 84241 86630 84010 84752 76644 85126 84457 85035 84504 755399
          #2 86727 82622 84231 84313 72410 87602 83546 87104 85624 754179
          #3 84626 84467 81459 85049 71584 83746 83843 88444 80776 743994
          #4 82072 83229 81670 84070 71931 86059 88080 84624 87980 749715
          #5 80029 84439 87655 87411 73583 85899 84129 85077 86522 754744
          avg 751606

          improved:851153/751606*100%-100%=13.2%

          Show
          Hal Mo added a comment - > Todd Lipcon added a comment - 15/Jun/12 21:15 >> Unit test passed, and got 10% improvement of cpu usage per node. > What workload? Terasort or something else? on 9-slaves cluster, test Terasort,spill only once(teragen 126*512M) parameter: -Dmapred.child.java.opts=-Xmx1g -Dio.sort.mb=647 -Ddfs.block.size=536870912 -Dio.sort.record.percent=0.167 -Dmapred.map.tasks=126 -Dmapred.reduce.tasks=126 -Dio.sort.factor=100 -Dmapred.compress.map.output=ture -Dmapred.map.output.compression.codec=org.apache.hadoop.io.compress.LzoCodec -Dio.sort.spill.percent=0.95 -Dmapred.map.tasks.speculative.execution=false -Dmapred.reduce.tasks.speculative.execution=false result: (here was the how I got the cpu use: run cat /proc/stat|grep "cpu " | awk ' {print $2+$3+$4+$7+$8+$9} ' before and after the job, per each node, ignoring $5(idle) and $6(iowait), and subtract the two number) cpu use(original): run node1 node2 node3 node4 node5 node6 node7 node8 node9 cluster #1 95934 99531 96278 97429 87239 98085 96376 97491 96159 864522 #2 93360 95457 94389 94158 83400 95927 98785 96480 97012 848968 #3 97022 94855 101510 94534 83064 95922 96947 97071 96707 857632 #4 94124 97135 95332 95126 86036 94352 97149 101701 95313 856268 #5 92429 94422 94107 90331 83487 91857 94130 94866 92747 828376 avg 851153 cpu use(with patch): run node1 node2 node3 node4 node5 node6 node7 node8 node9 cluster #1 84241 86630 84010 84752 76644 85126 84457 85035 84504 755399 #2 86727 82622 84231 84313 72410 87602 83546 87104 85624 754179 #3 84626 84467 81459 85049 71584 83746 83843 88444 80776 743994 #4 82072 83229 81670 84070 71931 86059 88080 84624 87980 749715 #5 80029 84439 87655 87411 73583 85899 84129 85077 86522 754744 avg 751606 improved:851153/751606*100%-100%=13.2%
          Hide
          Todd Lipcon added a comment -

          Unit test passed, and got 10% improvement of cpu usage per node.

          What workload? Terasort or something else?

          Show
          Todd Lipcon added a comment - Unit test passed, and got 10% improvement of cpu usage per node. What workload? Terasort or something else?
          Hal Mo made changes -
          Attachment map_sort_perf.diff [ 12532169 ]
          Hide
          Hal Mo added a comment -

          I tried to apply Todd's path, got a unit test error on user defined key cooperator.
          I tried to modify it like this in our hadoop.
          Unit test passed, and got 10% improvement of cpu usage per node.

          Show
          Hal Mo added a comment - I tried to apply Todd's path, got a unit test error on user defined key cooperator. I tried to modify it like this in our hadoop. Unit test passed, and got 10% improvement of cpu usage per node.
          Hide
          Hal Mo added a comment -

          >Todd Lipcon added a comment - 21/Oct/11 15:24
          >I didn't have a chance to run several benchmarks with/without. But I did the compareBytes optimization first, and >didn't see a huge speedup, and then did the sort optimization and saw the 40%. So, the compareBytes is probably only a >few percent

          I try to dummy sort(terasort with sorted input) and found only 10% cpu time improment for a machein.

          Show
          Hal Mo added a comment - >Todd Lipcon added a comment - 21/Oct/11 15:24 >I didn't have a chance to run several benchmarks with/without. But I did the compareBytes optimization first, and >didn't see a huge speedup, and then did the sort optimization and saw the 40%. So, the compareBytes is probably only a >few percent I try to dummy sort(terasort with sorted input) and found only 10% cpu time improment for a machein.
          Hide
          Chris Douglas added a comment -

          One optimization there would be to pack the PARTITION field more tightly - in most MR jobs, we have <256 reducers, so partition could be a single byte. Since we know the number of reducers up front, we could easily trade-off space between the partition ID and the comparison proxy.

          Right now, the overlay is all ints. The JVM could/should be smart enough to handle that. If we start mixing types in metadata records, we'll end up writing placement new and possibly preventing some optimizations. If (instead) you're talking about shifting the partition and writing the prefix into the remaining bytes, or even more generally, defining (partition,key) as the input to an order-preserving hash function... yikes.

          Though viewed askance, that's a decent API, since the default just returns the partition and follows the existing path.

          Currently I added a marker interface with a single method: getPrefix(byte[] dst, int off, int length). If the key type implements this interface, getPrefix() is called by collect() to copy the comparison proxy into the kvmeta buffer. I was thinking last night that it would be better to delegate to the Serializer implementation there, though.

          Agreed; attaching it to the keytype prevents a user from doing obvious things like reversing the order without subclassing the type. An optional component (a role comparable to the combiner, I guess) that may be used by the framework to generate a hash that effects the same ordering could be useful. Would it make sense for this to come from the comparator, rather than the keytype? Mapping the keytype to a fixed-length record, even forcing an integer, will probably cover most cases.

          I imagine most of the "stock writables" we have in Hadoop could easily implement this

          If this came from the RawComparator attached to most Writables, one would have to work out how to manage subclasses, or incompatibly make all the Comparators for those types final (which is probably correct, anyway).

          Show
          Chris Douglas added a comment - One optimization there would be to pack the PARTITION field more tightly - in most MR jobs, we have <256 reducers, so partition could be a single byte. Since we know the number of reducers up front, we could easily trade-off space between the partition ID and the comparison proxy. Right now, the overlay is all ints. The JVM could/should be smart enough to handle that. If we start mixing types in metadata records, we'll end up writing placement new and possibly preventing some optimizations. If (instead) you're talking about shifting the partition and writing the prefix into the remaining bytes, or even more generally, defining (partition,key) as the input to an order-preserving hash function... yikes. Though viewed askance, that's a decent API, since the default just returns the partition and follows the existing path. Currently I added a marker interface with a single method: getPrefix(byte[] dst, int off, int length). If the key type implements this interface, getPrefix() is called by collect() to copy the comparison proxy into the kvmeta buffer. I was thinking last night that it would be better to delegate to the Serializer implementation there, though. Agreed; attaching it to the keytype prevents a user from doing obvious things like reversing the order without subclassing the type. An optional component (a role comparable to the combiner, I guess) that may be used by the framework to generate a hash that effects the same ordering could be useful. Would it make sense for this to come from the comparator, rather than the keytype? Mapping the keytype to a fixed-length record, even forcing an integer, will probably cover most cases. I imagine most of the "stock writables" we have in Hadoop could easily implement this If this came from the RawComparator attached to most Writables, one would have to work out how to manage subclasses, or incompatibly make all the Comparators for those types final (which is probably correct, anyway).
          Hide
          Todd Lipcon added a comment -

          I didn't have a chance to run several benchmarks with/without. But I did the compareBytes optimization first, and didn't see a huge speedup, and then did the sort optimization and saw the 40%. So, the compareBytes is probably only a few percent.

          Show
          Todd Lipcon added a comment - I didn't have a chance to run several benchmarks with/without. But I did the compareBytes optimization first, and didn't see a huge speedup, and then did the sort optimization and saw the 40%. So, the compareBytes is probably only a few percent.
          Hide
          Binglin Chang added a comment -

          Very cool, again.

          On a terasort benchmark, these optimizations plus an optimization to WritableComparator.compareBytes dropped the aggregate mapside CPU millis by 40%

          Would you please give the test results about run WritableComparator.compareBytes optimization or key index optimizations alone? This would be useful. As I experienced in MAPREDUCE-2841, cache miss matters most.

          Show
          Binglin Chang added a comment - Very cool, again. On a terasort benchmark, these optimizations plus an optimization to WritableComparator.compareBytes dropped the aggregate mapside CPU millis by 40% Would you please give the test results about run WritableComparator.compareBytes optimization or key index optimizations alone? This would be useful. As I experienced in MAPREDUCE-2841 , cache miss matters most.
          Todd Lipcon made changes -
          Component/s performance [ 12316500 ]
          Todd Lipcon made changes -
          Link This issue is related to MAPREDUCE-1639 [ MAPREDUCE-1639 ]
          Hide
          Todd Lipcon added a comment -

          Todd, why do the proxies have to be strings

          I meant "string" in the C sense of the word – a byte[] like you said.

          Show
          Todd Lipcon added a comment - Todd, why do the proxies have to be strings I meant "string" in the C sense of the word – a byte[] like you said.
          Hide
          Milind Bhandarkar added a comment -

          Todd, why do the proxies have to be strings ? (I think you meant 4-byte byte[], right ? In that case, I do not have to construct a new string.) I think the accounting overhead is typically not a major factor in real applications (which have large values). I think this should not be a config knob at all, adding another interface is a good idea.

          I take my "-1" back. I am +1 on the proxy idea now.

          Show
          Milind Bhandarkar added a comment - Todd, why do the proxies have to be strings ? (I think you meant 4-byte byte[], right ? In that case, I do not have to construct a new string.) I think the accounting overhead is typically not a major factor in real applications (which have large values). I think this should not be a config knob at all, adding another interface is a good idea. I take my "-1" back. I am +1 on the proxy idea now.
          Hide
          Todd Lipcon added a comment -

          oops, accidentally uploaded the wrong rev. you'll need this typo fix to compile:

          • int kVS = kvmeta.get(kvi + VALSTART);
            + int iVS = kvmeta.get(kvi + VALSTART);
          Show
          Todd Lipcon added a comment - oops, accidentally uploaded the wrong rev. you'll need this typo fix to compile: int kVS = kvmeta.get(kvi + VALSTART); + int iVS = kvmeta.get(kvi + VALSTART);
          Todd Lipcon made changes -
          Field Original Value New Value
          Attachment mr-3235-poc.txt [ 12499917 ]
          Hide
          Todd Lipcon added a comment -

          Here's my proof-of-concept patch. I don't have time to really focus on this (getting 23 out the door is higher priority) but if some other people have time to push it through or run some benchmarks on non-terasort workloads, that would be terrific!

          Show
          Todd Lipcon added a comment - Here's my proof-of-concept patch. I don't have time to really focus on this (getting 23 out the door is higher priority) but if some other people have time to push it through or run some benchmarks on non-terasort workloads, that would be terrific!
          Hide
          Todd Lipcon added a comment -

          -1 for the second. The key distribution of terasort, which results in compare != 0 most of the time, is an anomaly. In my experience, where skews etc are almost always a fact of life, more compares return 0 than non-zero.

          Definitely worth considering. Like Chris said, this comparison is practically free - we don't have to delegate to any "proxy objects" as you put it. The proxies have to be 4-byte strings. Since we already do a compare on the PARTITION part of the metadata, making that comparison an 8-byte compare instead of a 4-byte compare doesn't really cost anything. So, the only real cost is the extra accounting overhead in the buffer - 20 bytes per record instead of 16. One optimization there would be to pack the PARTITION field more tightly - in most MR jobs, we have <256 reducers, so partition could be a single byte. Since we know the number of reducers up front, we could easily trade-off space between the partition ID and the comparison proxy.

          What's the API look like?

          Currently I added a marker interface with a single method: getPrefix(byte[] dst, int off, int length). If the key type implements this interface, getPrefix() is called by collect() to copy the comparison proxy into the kvmeta buffer. I was thinking last night that it would be better to delegate to the Serializer implementation there, though. I just did the above for expediency last night while hacking this together. Alternatively, it might make sense to add another interface like how RawComparator is done.

          Both of these would be pretty esoteric config knobs

          I imagine most of the "stock writables" we have in Hadoop could easily implement this - eg Text, BytesWritable, LongWritable, etc. Frameworks like Pig/Hive could get it in as well. Application programmers implementing their own key types already tend to implement RawComparators, so allowing them to implement another simple method for a good CPU boost doesn't seem too bad.

          Show
          Todd Lipcon added a comment - -1 for the second. The key distribution of terasort, which results in compare != 0 most of the time, is an anomaly. In my experience, where skews etc are almost always a fact of life, more compares return 0 than non-zero. Definitely worth considering. Like Chris said, this comparison is practically free - we don't have to delegate to any "proxy objects" as you put it. The proxies have to be 4-byte strings. Since we already do a compare on the PARTITION part of the metadata, making that comparison an 8-byte compare instead of a 4-byte compare doesn't really cost anything. So, the only real cost is the extra accounting overhead in the buffer - 20 bytes per record instead of 16. One optimization there would be to pack the PARTITION field more tightly - in most MR jobs, we have <256 reducers, so partition could be a single byte. Since we know the number of reducers up front, we could easily trade-off space between the partition ID and the comparison proxy. What's the API look like? Currently I added a marker interface with a single method: getPrefix(byte[] dst, int off, int length). If the key type implements this interface, getPrefix() is called by collect() to copy the comparison proxy into the kvmeta buffer. I was thinking last night that it would be better to delegate to the Serializer implementation there, though. I just did the above for expediency last night while hacking this together. Alternatively, it might make sense to add another interface like how RawComparator is done. Both of these would be pretty esoteric config knobs I imagine most of the "stock writables" we have in Hadoop could easily implement this - eg Text, BytesWritable, LongWritable, etc. Frameworks like Pig/Hive could get it in as well. Application programmers implementing their own key types already tend to implement RawComparators, so allowing them to implement another simple method for a good CPU boost doesn't seem too bad.
          Hide
          Chris Douglas added a comment -

          Very cool.

          The key distribution of terasort, which results in compare != 0 most of the time, is an anomaly. In my experience, where skews etc are almost always a fact of life, more compares return 0 than non-zero.

          If the keys are small and fixed-length, co-locating them with the metadata would be pretty sweet (and without requiring KEYLEN, one could just use the slot). Not sure about the proxy... though it will often come in on the same cache line, so its cost is only an optional, extra 4 byte overhead per record. What's the API look like? Both of these would be pretty esoteric config knobs, unless it came from the serializers.

          Show
          Chris Douglas added a comment - Very cool. The key distribution of terasort, which results in compare != 0 most of the time, is an anomaly. In my experience, where skews etc are almost always a fact of life, more compares return 0 than non-zero. If the keys are small and fixed-length, co-locating them with the metadata would be pretty sweet (and without requiring KEYLEN, one could just use the slot). Not sure about the proxy... though it will often come in on the same cache line, so its cost is only an optional, extra 4 byte overhead per record. What's the API look like? Both of these would be pretty esoteric config knobs, unless it came from the serializers.
          Hide
          Milind Bhandarkar added a comment -

          +1 for the first improvements.

          -1 for the second. The key distribution of terasort, which results in compare != 0 most of the time, is an anomaly. In my experience, where skews etc are almost always a fact of life, more compares return 0 than non-zero. These will have an added overhead of providing new proxy objects, comparing new proxy objects, and the proceeding to compare actual keys.

          Show
          Milind Bhandarkar added a comment - +1 for the first improvements. -1 for the second. The key distribution of terasort, which results in compare != 0 most of the time, is an anomaly. In my experience, where skews etc are almost always a fact of life, more compares return 0 than non-zero. These will have an added overhead of providing new proxy objects, comparing new proxy objects, and the proceeding to compare actual keys.
          Todd Lipcon created issue -

            People

            • Assignee:
              Todd Lipcon
              Reporter:
              Todd Lipcon
            • Votes:
              0 Vote for this issue
              Watchers:
              38 Start watching this issue

              Dates

              • Created:
                Updated:

                Development