Uploaded image for project: 'Hadoop Map/Reduce'
  1. Hadoop Map/Reduce
  2. MAPREDUCE-6923

Optimize MapReduce Shuffle I/O for small partitions

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.9.0, 3.0.0-beta1
    • Component/s: None
    • Labels:
      None
    • Environment:

      Observed in Hadoop 2.7.3 and above (judging from the source code of future versions), and Ubuntu 16.04.

      Description

      When a job configuration results in small partitions read by each reducer from each mapper (e.g. 65 kilobytes as in my setup: a TeraSort of 256 gigabytes using 2048 mappers and reducers each), and setting

      <property>
        <name>mapreduce.shuffle.transferTo.allowed</name>
        <value>false</value>
      </property>
      

      then the default setting of

      <property>
        <name>mapreduce.shuffle.transfer.buffer.size</name>
        <value>131072</value>
      </property>
      

      results in almost 100% overhead in reads during shuffle in YARN, because for each 65K needed, 128K are read.

      I propose a fix in FadvisedFileRegion.java as follows:

      ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.shuffleBufferSize, trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
      

      e.g. here. This sets the shuffle buffer size to the minimum value of the shuffle buffer size specified in the configuration (128K by default), and the actual partition size (65K on average in my setup). In my benchmarks this reduced the read overhead in YARN from about 100% (255 additional gigabytes as described above) down to about 18% (an additional 45 gigabytes). The runtime of the job remained the same in my setup.

      1. MAPREDUCE-6923.00.patch
        2 kB
        Robert Schmidtke
      2. MAPREDUCE-6923.01.patch
        3 kB
        Robert Schmidtke

        Issue Links

          Activity

          Hide
          raviprak Ravi Prakash added a comment -

          I'd say that for readSize == trans, we're in the else block,

          Thanks for pointing that out Robert! Yupp. I agree....

          I'll be linking to the results once they're properly published.

          Looking forward to it

          Show
          raviprak Ravi Prakash added a comment - I'd say that for readSize == trans, we're in the else block, Thanks for pointing that out Robert! Yupp. I agree.... I'll be linking to the results once they're properly published. Looking forward to it
          Hide
          rosch Robert Schmidtke added a comment -

          Thanks for coming back to my comments.

          When I said Yarn I indeed meant the NodeManager, sorry for the confusion. You're right about the shuffle service, it was however something that I only discovered recently, having built my configuration a long time ago, not exactly knowing what I was doing. I set these keys as you described.
          I'm seeing jar files being loaded in the MapTask and ReduceTask JVMs alright, but there does not seem to be disk I/O overhead.

          In any case, I greatly appreciate all of your effort, and now that things are working as expected for me, I can focus on analyzing the numbers and making some sense of them. I'll be linking to the results once they're properly published.

          Cheers
          Robert

          Show
          rosch Robert Schmidtke added a comment - Thanks for coming back to my comments. When I said Yarn I indeed meant the NodeManager, sorry for the confusion. You're right about the shuffle service, it was however something that I only discovered recently, having built my configuration a long time ago, not exactly knowing what I was doing. I set these keys as you described. I'm seeing jar files being loaded in the MapTask and ReduceTask JVMs alright, but there does not seem to be disk I/O overhead. In any case, I greatly appreciate all of your effort, and now that things are working as expected for me, I can focus on analyzing the numbers and making some sense of them. I'll be linking to the results once they're properly published. Cheers Robert
          Hide
          rosch Robert Schmidtke added a comment - - edited

          Hi Ravi,

          When shuffleBufferSize <= trans, then behavior is exactly the same as old code.

          Yes.

          if readSize == trans (i.e. the fileChannel.read() returned as many bytes as I wanted to transfer, trans is decremented correctly, position is increased correctly and the byteBuffer is flipped as usual. byteBuffer's contents are written to target as usual, byteBuffer is cleared and then hopefully GCed never to be seen again.

          I'd say that for readSize == trans, we're in the else block, and thus byteBuffer's limit() is set to trans (which is the size it already has because we're in the case where trans < shuffleBufferSize. It's correctly positioned to 0 as we're done reading, and trans is correctly set to 0. Afterwards, the loop breaks (it can only be one iteration here because otherwise trans would have been larger than shuffleBufferSize), byteBuffer is written to target and then cleared.

          if readSize < trans (almost the same thing as above happens, but in a while loop). The only change this patch makes is that the byteBuffer may be smaller than before this patch, but it doesn't matter because its big enough for the number of bytes we need to transfer.

          Now we have the situation you described for the previous case, and I agree with your reasoning here.

          Show
          rosch Robert Schmidtke added a comment - - edited Hi Ravi, When shuffleBufferSize <= trans , then behavior is exactly the same as old code. Yes. if readSize == trans (i.e. the fileChannel.read() returned as many bytes as I wanted to transfer, trans is decremented correctly, position is increased correctly and the byteBuffer is flipped as usual. byteBuffer 's contents are written to target as usual, byteBuffer is cleared and then hopefully GCed never to be seen again. I'd say that for readSize == trans , we're in the else block , and thus byteBuffer 's limit() is set to trans (which is the size it already has because we're in the case where trans < shuffleBufferSize . It's correctly positioned to 0 as we're done reading, and trans is correctly set to 0 . Afterwards, the loop breaks (it can only be one iteration here because otherwise trans would have been larger than shuffleBufferSize ), byteBuffer is written to target and then cleared. if readSize < trans (almost the same thing as above happens, but in a while loop). The only change this patch makes is that the byteBuffer may be smaller than before this patch, but it doesn't matter because its big enough for the number of bytes we need to transfer. Now we have the situation you described for the previous case, and I agree with your reasoning here.
          Hide
          raviprak Ravi Prakash added a comment -

          Oh and sorry about neglecting your questions earlier. Apologies also if this is too deep in the details. Maybe a better understanding could help.

          The Hadoop project has tried to make clear distinctions between YARN (the resource management) and frameworks that can run on top of YARN (eg. MapReduce, Tez, Slider etc.). Even so some dependencies have stuck around.

          I see that some 1.5 GiB is spent on reading the mapreduce jar files (in Yarn), and another 1.2 GiB is spent reading jar files in /usr/lib/jvm.

          I'm not entirely sure what you mean when you say Yarn here. I'm guessing you mean the NodeManager. Technically the NodeManager shouldn't really even be loading the MapReduce jars (because separate projects blah blah). However, there's a MapReduce Auxiliary Shuffle Service (if you see your yarn-site.xml yarn.nodemanager.aux-services probably has org.apache.hadoop.mapred.ShuffleHandler which I'm sure pulls in all sorts of MapReduce code into the NodeManager JVM. This happens only when you start the cluster (the auxiliary ShuffleService is a long-running service in the NodeManager) .

          I have instrumented reading zip and jar files separately, and over the course of all map tasks (TeraGen + TeraSort), my instrumentation gives a total of 638 GiB / (2048 + 2048) = 159.5 MiB per mapper, and 337 GiB / 2048 = 168.5 MiB per reducer. However I wouldn't rely too much on these numbers, because if I added them to the regular I/O induced by reading/writing the input/output, shuffle and spill, then my numbers wouldn't agree any longer with the XFS counters.

          Hmm.. without knowing exactly what your instrumentation does, I will choose to share your skepticism of these numbers

          Do you mean that Yarn should exhibit this I/O, or would I see this in the map and reduce JVMs (as explained above)?

          Again, I'm guessing by "Yarn" over here you mean the NodeManager. To launch any YARN container (MapTask or ReduceTask or TezChild etc) the NodeManager does a lot of things . One of the things is to localize the resources. For this, usually a separate process called a Localizer is run. This process may download things from HDFS to the local machine under certain circumstances. (usually though if the job jars are already in the DistributedCache, then it may be skipped)
          However I was referring to the MapTask and ReduceTask JVMs loading the jar files.

          Show
          raviprak Ravi Prakash added a comment - Oh and sorry about neglecting your questions earlier. Apologies also if this is too deep in the details. Maybe a better understanding could help. The Hadoop project has tried to make clear distinctions between YARN (the resource management) and frameworks that can run on top of YARN (eg. MapReduce, Tez, Slider etc.). Even so some dependencies have stuck around. I see that some 1.5 GiB is spent on reading the mapreduce jar files (in Yarn), and another 1.2 GiB is spent reading jar files in /usr/lib/jvm. I'm not entirely sure what you mean when you say Yarn here. I'm guessing you mean the NodeManager. Technically the NodeManager shouldn't really even be loading the MapReduce jars (because separate projects blah blah). However, there's a MapReduce Auxiliary Shuffle Service (if you see your yarn-site.xml yarn.nodemanager.aux-services probably has org.apache.hadoop.mapred.ShuffleHandler which I'm sure pulls in all sorts of MapReduce code into the NodeManager JVM. This happens only when you start the cluster (the auxiliary ShuffleService is a long-running service in the NodeManager) . I have instrumented reading zip and jar files separately, and over the course of all map tasks (TeraGen + TeraSort), my instrumentation gives a total of 638 GiB / (2048 + 2048) = 159.5 MiB per mapper, and 337 GiB / 2048 = 168.5 MiB per reducer. However I wouldn't rely too much on these numbers, because if I added them to the regular I/O induced by reading/writing the input/output, shuffle and spill, then my numbers wouldn't agree any longer with the XFS counters. Hmm.. without knowing exactly what your instrumentation does, I will choose to share your skepticism of these numbers Do you mean that Yarn should exhibit this I/O, or would I see this in the map and reduce JVMs (as explained above)? Again, I'm guessing by "Yarn" over here you mean the NodeManager. To launch any YARN container (MapTask or ReduceTask or TezChild etc) the NodeManager does a lot of things . One of the things is to localize the resources. For this, usually a separate process called a Localizer is run. This process may download things from HDFS to the local machine under certain circumstances. (usually though if the job jars are already in the DistributedCache, then it may be skipped) However I was referring to the MapTask and ReduceTask JVMs loading the jar files.
          Hide
          hudson Hudson added a comment -

          SUCCESS: Integrated in Jenkins build Hadoop-trunk-Commit #12157 (See https://builds.apache.org/job/Hadoop-trunk-Commit/12157/)
          MAPREDUCE-6923. Optimize MapReduce Shuffle I/O for small partitions. (raviprak: rev ac7d0604bc73c0925eff240ad9837e14719d57b7)

          • (edit) hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
          Show
          hudson Hudson added a comment - SUCCESS: Integrated in Jenkins build Hadoop-trunk-Commit #12157 (See https://builds.apache.org/job/Hadoop-trunk-Commit/12157/ ) MAPREDUCE-6923 . Optimize MapReduce Shuffle I/O for small partitions. (raviprak: rev ac7d0604bc73c0925eff240ad9837e14719d57b7) (edit) hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
          Hide
          raviprak Ravi Prakash added a comment -

          Committed to branch-2 and trunk. Thanks a lot for your contribution Robert!

          Good luck with your research. I hope to hear back from you when you publish. And look forward to more valuable contributions from you.

          Show
          raviprak Ravi Prakash added a comment - Committed to branch-2 and trunk. Thanks a lot for your contribution Robert! Good luck with your research. I hope to hear back from you when you publish. And look forward to more valuable contributions from you.
          Hide
          raviprak Ravi Prakash added a comment -

          Hi Robert!

          Here's my reasoning about this patch. Sorry about being this verbose. I just have umm.... let's say history with the shuffle code :

          1. When shuffleBufferSize <= trans, then behavior is exactly the same as old code.
          2. When trans < shuffleBufferSize then
            • if readSize == trans (i.e. the fileChannel.read() returned as many bytes as I wanted to transfer, trans is decremented correctly, position is increased correctly and the byteBuffer is flipped as usual. byteBuffer's contents are written to target as usual, byteBuffer is cleared and then hopefully GCed never to be seen again.
            • if readSize < trans (almost the same thing as above happens, but in a while loop). The only change this patch makes is that the byteBuffer may be smaller than before this patch, but it doesn't matter because its big enough for the number of bytes we need to transfer.
            • if readSize > trans This shouldn't happen any more since byteBuffer's size is trans . However this is still not dead code because we need it for the first case (when shuffleBufferSize <= trans)

          As much as I would have liked another review to calm myself, I am fairly confident this is fine. Please let me know if the reasoning above is incorrect in any manner.

          Committing shortly

          Show
          raviprak Ravi Prakash added a comment - Hi Robert! Here's my reasoning about this patch. Sorry about being this verbose. I just have umm.... let's say history with the shuffle code : When shuffleBufferSize <= trans , then behavior is exactly the same as old code. When trans < shuffleBufferSize then if readSize == trans (i.e. the fileChannel.read() returned as many bytes as I wanted to transfer, trans is decremented correctly, position is increased correctly and the byteBuffer is flipped as usual. byteBuffer 's contents are written to target as usual, byteBuffer is cleared and then hopefully GCed never to be seen again. if readSize < trans (almost the same thing as above happens, but in a while loop). The only change this patch makes is that the byteBuffer may be smaller than before this patch, but it doesn't matter because its big enough for the number of bytes we need to transfer. if readSize > trans This shouldn't happen any more since byteBuffer 's size is trans . However this is still not dead code because we need it for the first case (when shuffleBufferSize <= trans ) As much as I would have liked another review to calm myself, I am fairly confident this is fine. Please let me know if the reasoning above is incorrect in any manner. Committing shortly
          Hide
          hadoopqa Hadoop QA added a comment -
          -1 overall



          Vote Subsystem Runtime Comment
          0 reexec 0m 18s Docker mode activated.
                Prechecks
          +1 @author 0m 0s The patch does not contain any @author tags.
          -1 test4tests 0m 0s The patch doesn't appear to include any new or modified tests. Please justify why no new tests are needed for this patch. Also please list what manual steps were performed to verify this patch.
                trunk Compile Tests
          +1 mvninstall 15m 33s trunk passed
          +1 compile 0m 21s trunk passed
          +1 checkstyle 0m 12s trunk passed
          +1 mvnsite 0m 20s trunk passed
          +1 findbugs 0m 28s trunk passed
          +1 javadoc 0m 15s trunk passed
                Patch Compile Tests
          +1 mvninstall 0m 17s the patch passed
          +1 compile 0m 18s the patch passed
          +1 javac 0m 18s the patch passed
          +1 checkstyle 0m 11s the patch passed
          +1 mvnsite 0m 18s the patch passed
          +1 whitespace 0m 0s The patch has no whitespace issues.
          +1 findbugs 0m 31s the patch passed
          +1 javadoc 0m 12s the patch passed
                Other Tests
          +1 unit 0m 20s hadoop-mapreduce-client-shuffle in the patch passed.
          +1 asflicense 0m 14s The patch does not generate ASF License warnings.
          20m 33s



          Subsystem Report/Notes
          Docker Image:yetus/hadoop:14b5c93
          JIRA Issue MAPREDUCE-6923
          JIRA Patch URL https://issues.apache.org/jira/secure/attachment/12880367/MAPREDUCE-6923.01.patch
          Optional Tests asflicense compile javac javadoc mvninstall mvnsite unit findbugs checkstyle
          uname Linux de665417d286 3.13.0-117-generic #164-Ubuntu SMP Fri Apr 7 11:05:26 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
          Build tool maven
          Personality /testptch/hadoop/patchprocess/precommit/personality/provided.sh
          git revision trunk / 7fc324a
          Default Java 1.8.0_131
          findbugs v3.1.0-RC1
          Test Results https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7043/testReport/
          modules C: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle U: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle
          Console output https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7043/console
          Powered by Apache Yetus 0.5.0 http://yetus.apache.org

          This message was automatically generated.

          Show
          hadoopqa Hadoop QA added a comment - -1 overall Vote Subsystem Runtime Comment 0 reexec 0m 18s Docker mode activated.       Prechecks +1 @author 0m 0s The patch does not contain any @author tags. -1 test4tests 0m 0s The patch doesn't appear to include any new or modified tests. Please justify why no new tests are needed for this patch. Also please list what manual steps were performed to verify this patch.       trunk Compile Tests +1 mvninstall 15m 33s trunk passed +1 compile 0m 21s trunk passed +1 checkstyle 0m 12s trunk passed +1 mvnsite 0m 20s trunk passed +1 findbugs 0m 28s trunk passed +1 javadoc 0m 15s trunk passed       Patch Compile Tests +1 mvninstall 0m 17s the patch passed +1 compile 0m 18s the patch passed +1 javac 0m 18s the patch passed +1 checkstyle 0m 11s the patch passed +1 mvnsite 0m 18s the patch passed +1 whitespace 0m 0s The patch has no whitespace issues. +1 findbugs 0m 31s the patch passed +1 javadoc 0m 12s the patch passed       Other Tests +1 unit 0m 20s hadoop-mapreduce-client-shuffle in the patch passed. +1 asflicense 0m 14s The patch does not generate ASF License warnings. 20m 33s Subsystem Report/Notes Docker Image:yetus/hadoop:14b5c93 JIRA Issue MAPREDUCE-6923 JIRA Patch URL https://issues.apache.org/jira/secure/attachment/12880367/MAPREDUCE-6923.01.patch Optional Tests asflicense compile javac javadoc mvninstall mvnsite unit findbugs checkstyle uname Linux de665417d286 3.13.0-117-generic #164-Ubuntu SMP Fri Apr 7 11:05:26 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux Build tool maven Personality /testptch/hadoop/patchprocess/precommit/personality/provided.sh git revision trunk / 7fc324a Default Java 1.8.0_131 findbugs v3.1.0-RC1 Test Results https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7043/testReport/ modules C: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle U: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle Console output https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7043/console Powered by Apache Yetus 0.5.0 http://yetus.apache.org This message was automatically generated.
          Hide
          rosch Robert Schmidtke added a comment - - edited

          Fixed indentation for checkstyle in new patch MAPREDUCE-6923.01.patch, which supersedes the old one MAPREDUCE-6923.00.patch.

          Show
          rosch Robert Schmidtke added a comment - - edited Fixed indentation for checkstyle in new patch MAPREDUCE-6923.01.patch , which supersedes the old one MAPREDUCE-6923.00.patch .
          Hide
          rosch Robert Schmidtke added a comment -

          I am using bytecode instrumentation to log every read and write request going through the core Java I/O classes. I do this for every JVM started (Yarn, Map, Reduce, Hdfs, ...), and log statistics over the entire TeraSort run. The aggregated statistics from there agree to 97-99% (for reads and writes, respectively) with what the underlying XFS file system counters report. Hence I assume that my instrumentation is pretty accurate, giving 1169 GiB for all Yarn I/O. I see that some 1.5 GiB is spent on reading the mapreduce jar files (in Yarn), and another 1.2 GiB is spent reading jar files in /usr/lib/jvm. However, there most likely is caching involved, and I wouldn't be sure about how much I/O actually happened at this level.

          I have instrumented reading zip and jar files separately, and over the course of all map tasks (TeraGen + TeraSort), my instrumentation gives a total of 638 GiB / (2048 + 2048) = 159.5 MiB per mapper, and 337 GiB / 2048 = 168.5 MiB per reducer. However I wouldn't rely too much on these numbers, because if I added them to the regular I/O induced by reading/writing the input/output, shuffle and spill, then my numbers wouldn't agree any longer with the XFS counters.

          On some installations I've seen the JVM load close to 400Mb of jar files for hadoop and its dependencies. Even on trunk my MapTask reads about 180Mb of jars atleast.

          Do you mean that Yarn should exhibit this I/O, or would I see this in the map and reduce JVMs (as explained above)?

          Show
          rosch Robert Schmidtke added a comment - I am using bytecode instrumentation to log every read and write request going through the core Java I/O classes. I do this for every JVM started (Yarn, Map, Reduce, Hdfs, ...), and log statistics over the entire TeraSort run. The aggregated statistics from there agree to 97-99% (for reads and writes, respectively) with what the underlying XFS file system counters report. Hence I assume that my instrumentation is pretty accurate, giving 1169 GiB for all Yarn I/O. I see that some 1.5 GiB is spent on reading the mapreduce jar files (in Yarn), and another 1.2 GiB is spent reading jar files in /usr/lib/jvm. However, there most likely is caching involved, and I wouldn't be sure about how much I/O actually happened at this level. I have instrumented reading zip and jar files separately, and over the course of all map tasks (TeraGen + TeraSort), my instrumentation gives a total of 638 GiB / (2048 + 2048) = 159.5 MiB per mapper, and 337 GiB / 2048 = 168.5 MiB per reducer. However I wouldn't rely too much on these numbers, because if I added them to the regular I/O induced by reading/writing the input/output, shuffle and spill, then my numbers wouldn't agree any longer with the XFS counters. On some installations I've seen the JVM load close to 400Mb of jar files for hadoop and its dependencies. Even on trunk my MapTask reads about 180Mb of jars atleast. Do you mean that Yarn should exhibit this I/O, or would I see this in the map and reduce JVMs (as explained above)?
          Hide
          hadoopqa Hadoop QA added a comment -
          -1 overall



          Vote Subsystem Runtime Comment
          0 reexec 0m 18s Docker mode activated.
                Prechecks
          +1 @author 0m 0s The patch does not contain any @author tags.
          -1 test4tests 0m 0s The patch doesn't appear to include any new or modified tests. Please justify why no new tests are needed for this patch. Also please list what manual steps were performed to verify this patch.
                trunk Compile Tests
          +1 mvninstall 14m 32s trunk passed
          +1 compile 0m 18s trunk passed
          +1 checkstyle 0m 13s trunk passed
          +1 mvnsite 0m 19s trunk passed
          +1 findbugs 0m 24s trunk passed
          +1 javadoc 0m 14s trunk passed
                Patch Compile Tests
          +1 mvninstall 0m 17s the patch passed
          +1 compile 0m 17s the patch passed
          +1 javac 0m 17s the patch passed
          -1 checkstyle 0m 9s hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle: The patch generated 1 new + 5 unchanged - 0 fixed = 6 total (was 5)
          +1 mvnsite 0m 16s the patch passed
          +1 whitespace 0m 0s The patch has no whitespace issues.
          +1 findbugs 0m 30s the patch passed
          +1 javadoc 0m 11s the patch passed
                Other Tests
          +1 unit 0m 21s hadoop-mapreduce-client-shuffle in the patch passed.
          +1 asflicense 0m 13s The patch does not generate ASF License warnings.
          19m 13s



          Subsystem Report/Notes
          Docker Image:yetus/hadoop:14b5c93
          JIRA Issue MAPREDUCE-6923
          JIRA Patch URL https://issues.apache.org/jira/secure/attachment/12880185/MAPREDUCE-6923.00.patch
          Optional Tests asflicense compile javac javadoc mvninstall mvnsite unit findbugs checkstyle
          uname Linux a8be7b9cefca 3.13.0-117-generic #164-Ubuntu SMP Fri Apr 7 11:05:26 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
          Build tool maven
          Personality /testptch/hadoop/patchprocess/precommit/personality/provided.sh
          git revision trunk / f4c6b00
          Default Java 1.8.0_131
          findbugs v3.1.0-RC1
          checkstyle https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7042/artifact/patchprocess/diff-checkstyle-hadoop-mapreduce-project_hadoop-mapreduce-client_hadoop-mapreduce-client-shuffle.txt
          Test Results https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7042/testReport/
          modules C: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle U: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle
          Console output https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7042/console
          Powered by Apache Yetus 0.5.0 http://yetus.apache.org

          This message was automatically generated.

          Show
          hadoopqa Hadoop QA added a comment - -1 overall Vote Subsystem Runtime Comment 0 reexec 0m 18s Docker mode activated.       Prechecks +1 @author 0m 0s The patch does not contain any @author tags. -1 test4tests 0m 0s The patch doesn't appear to include any new or modified tests. Please justify why no new tests are needed for this patch. Also please list what manual steps were performed to verify this patch.       trunk Compile Tests +1 mvninstall 14m 32s trunk passed +1 compile 0m 18s trunk passed +1 checkstyle 0m 13s trunk passed +1 mvnsite 0m 19s trunk passed +1 findbugs 0m 24s trunk passed +1 javadoc 0m 14s trunk passed       Patch Compile Tests +1 mvninstall 0m 17s the patch passed +1 compile 0m 17s the patch passed +1 javac 0m 17s the patch passed -1 checkstyle 0m 9s hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle: The patch generated 1 new + 5 unchanged - 0 fixed = 6 total (was 5) +1 mvnsite 0m 16s the patch passed +1 whitespace 0m 0s The patch has no whitespace issues. +1 findbugs 0m 30s the patch passed +1 javadoc 0m 11s the patch passed       Other Tests +1 unit 0m 21s hadoop-mapreduce-client-shuffle in the patch passed. +1 asflicense 0m 13s The patch does not generate ASF License warnings. 19m 13s Subsystem Report/Notes Docker Image:yetus/hadoop:14b5c93 JIRA Issue MAPREDUCE-6923 JIRA Patch URL https://issues.apache.org/jira/secure/attachment/12880185/MAPREDUCE-6923.00.patch Optional Tests asflicense compile javac javadoc mvninstall mvnsite unit findbugs checkstyle uname Linux a8be7b9cefca 3.13.0-117-generic #164-Ubuntu SMP Fri Apr 7 11:05:26 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux Build tool maven Personality /testptch/hadoop/patchprocess/precommit/personality/provided.sh git revision trunk / f4c6b00 Default Java 1.8.0_131 findbugs v3.1.0-RC1 checkstyle https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7042/artifact/patchprocess/diff-checkstyle-hadoop-mapreduce-project_hadoop-mapreduce-client_hadoop-mapreduce-client-shuffle.txt Test Results https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7042/testReport/ modules C: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle U: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle Console output https://builds.apache.org/job/PreCommit-MAPREDUCE-Build/7042/console Powered by Apache Yetus 0.5.0 http://yetus.apache.org This message was automatically generated.
          Hide
          raviprak Ravi Prakash added a comment -

          The patch looks good to me. Barring objections, I'll commit it on Monday.

          Show
          raviprak Ravi Prakash added a comment - The patch looks good to me. Barring objections, I'll commit it on Monday.
          Hide
          raviprak Ravi Prakash added a comment -

          Thank you for the contribution Robert. You are right that the typecast inside Math.min() would be wrong. I was thinking outside the Math.min() would be fine like in your testCast . In either case, I agree its alright to keep the ternary operator there for clarity and in case shuffleBufferSize is ever incorrectly configured to be negative.

          Thank you for that wonderful analysis. It is very valuable.

          YARN reads a total of 1169 GiB in my setup

          How are you measuring that? Does this take into account the jar files that the MapTask and ReduceTask must read to start the JVM? On some installations I've seen the JVM load close to 400Mb of jar files for hadoop and its dependencies. Even on trunk my MapTask reads about 180Mb of jars atleast.
          I'm sure "Failed Shuffles" would be executed again. I don't know where the failure would be picked up from, but its probably from the beginning. This is possibly due to your network.

          Thanks for the extensive research.

          Show
          raviprak Ravi Prakash added a comment - Thank you for the contribution Robert. You are right that the typecast inside Math.min() would be wrong. I was thinking outside the Math.min() would be fine like in your testCast . In either case, I agree its alright to keep the ternary operator there for clarity and in case shuffleBufferSize is ever incorrectly configured to be negative. Thank you for that wonderful analysis. It is very valuable. YARN reads a total of 1169 GiB in my setup How are you measuring that? Does this take into account the jar files that the MapTask and ReduceTask must read to start the JVM? On some installations I've seen the JVM load close to 400Mb of jar files for hadoop and its dependencies. Even on trunk my MapTask reads about 180Mb of jars atleast. I'm sure "Failed Shuffles" would be executed again. I don't know where the failure would be picked up from, but its probably from the beginning. This is possibly due to your network. Thanks for the extensive research.
          Hide
          rosch Robert Schmidtke added a comment - - edited

          Fyi I have benchmarked another version which uses casts instead of the ternary operator using JMH on my Mac:

          package de.schmidtke.java.benchmark;
          
          import java.util.Random;
          
          import org.openjdk.jmh.annotations.Benchmark;
          import org.openjdk.jmh.annotations.Level;
          import org.openjdk.jmh.annotations.Scope;
          import org.openjdk.jmh.annotations.Setup;
          import org.openjdk.jmh.annotations.State;
          
          public class TernaryBenchmark {
          
              @State(Scope.Thread)
              public static class TBState {
                  private final Random random = new Random(0);
                  public long trans;
          
                  @Setup(Level.Invocation)
                  public void setup() {
                      trans = random.nextLong();
                  }
              }
          
              @Benchmark
              public int testTernary(TBState tbState) {
                  long trans = tbState.trans;
                  return Math.min(131072,
                          trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans);
              }
          
              @Benchmark
              public int testCast(TBState tbState) {
                  long trans = tbState.trans;
                  return (int) Math.min((long) 131072, trans);
              }
          
          }
          

          The results are roughly 1% higher throughput using the cast version, the rest seems about the same. I'd go with the ternary operator version for better clarity:

          Benchmark                                           Mode      Cnt         Score        Error  Units
          TernaryBenchmark.testCast                          thrpt      200  25142779.388 ± 114863.918  ops/s
          TernaryBenchmark.testTernary                       thrpt      200  24829083.072 ±  64009.480  ops/s
          TernaryBenchmark.testCast                           avgt      200        ≈ 10⁻⁷                s/op
          TernaryBenchmark.testTernary                        avgt      200        ≈ 10⁻⁷                s/op
          TernaryBenchmark.testCast                         sample  7596374        ≈ 10⁻⁷                s/op
          TernaryBenchmark.testCast:testCast·p0.00          sample                 ≈ 10⁻⁹                s/op
          TernaryBenchmark.testCast:testCast·p0.50          sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testCast:testCast·p0.90          sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testCast:testCast·p0.95          sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testCast:testCast·p0.99          sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testCast:testCast·p0.999         sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testCast:testCast·p0.9999        sample                 ≈ 10⁻⁵                s/op
          TernaryBenchmark.testCast:testCast·p1.00          sample                  0.002                s/op
          TernaryBenchmark.testTernary                      sample  7469568        ≈ 10⁻⁷                s/op
          TernaryBenchmark.testTernary:testTernary·p0.00    sample                 ≈ 10⁻⁹                s/op
          TernaryBenchmark.testTernary:testTernary·p0.50    sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testTernary:testTernary·p0.90    sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testTernary:testTernary·p0.95    sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testTernary:testTernary·p0.99    sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testTernary:testTernary·p0.999   sample                 ≈ 10⁻⁷                s/op
          TernaryBenchmark.testTernary:testTernary·p0.9999  sample                 ≈ 10⁻⁵                s/op
          TernaryBenchmark.testTernary:testTernary·p1.00    sample                  0.002                s/op
          TernaryBenchmark.testCast                             ss       10        ≈ 10⁻⁵                s/op
          TernaryBenchmark.testTernary                          ss       10        ≈ 10⁻⁵                s/op
          
          Show
          rosch Robert Schmidtke added a comment - - edited Fyi I have benchmarked another version which uses casts instead of the ternary operator using JMH on my Mac: package de.schmidtke.java.benchmark; import java.util.Random; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; public class TernaryBenchmark { @State(Scope. Thread ) public static class TBState { private final Random random = new Random(0); public long trans; @Setup(Level.Invocation) public void setup() { trans = random.nextLong(); } } @Benchmark public int testTernary(TBState tbState) { long trans = tbState.trans; return Math .min(131072, trans > Integer .MAX_VALUE ? Integer .MAX_VALUE : ( int ) trans); } @Benchmark public int testCast(TBState tbState) { long trans = tbState.trans; return ( int ) Math .min(( long ) 131072, trans); } } The results are roughly 1% higher throughput using the cast version, the rest seems about the same. I'd go with the ternary operator version for better clarity: Benchmark Mode Cnt Score Error Units TernaryBenchmark.testCast thrpt 200 25142779.388 ± 114863.918 ops/s TernaryBenchmark.testTernary thrpt 200 24829083.072 ± 64009.480 ops/s TernaryBenchmark.testCast avgt 200 ≈ 10⁻⁷ s/op TernaryBenchmark.testTernary avgt 200 ≈ 10⁻⁷ s/op TernaryBenchmark.testCast sample 7596374 ≈ 10⁻⁷ s/op TernaryBenchmark.testCast:testCast·p0.00 sample ≈ 10⁻⁹ s/op TernaryBenchmark.testCast:testCast·p0.50 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testCast:testCast·p0.90 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testCast:testCast·p0.95 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testCast:testCast·p0.99 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testCast:testCast·p0.999 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testCast:testCast·p0.9999 sample ≈ 10⁻⁵ s/op TernaryBenchmark.testCast:testCast·p1.00 sample 0.002 s/op TernaryBenchmark.testTernary sample 7469568 ≈ 10⁻⁷ s/op TernaryBenchmark.testTernary:testTernary·p0.00 sample ≈ 10⁻⁹ s/op TernaryBenchmark.testTernary:testTernary·p0.50 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testTernary:testTernary·p0.90 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testTernary:testTernary·p0.95 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testTernary:testTernary·p0.99 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testTernary:testTernary·p0.999 sample ≈ 10⁻⁷ s/op TernaryBenchmark.testTernary:testTernary·p0.9999 sample ≈ 10⁻⁵ s/op TernaryBenchmark.testTernary:testTernary·p1.00 sample 0.002 s/op TernaryBenchmark.testCast ss 10 ≈ 10⁻⁵ s/op TernaryBenchmark.testTernary ss 10 ≈ 10⁻⁵ s/op
          Hide
          rosch Robert Schmidtke added a comment -

          Initial patch from trunk that uses the minimum of shuffleBufferSize and trans in FadvisedFileRegion.

          Show
          rosch Robert Schmidtke added a comment - Initial patch from trunk that uses the minimum of shuffleBufferSize and trans in FadvisedFileRegion .
          Hide
          rosch Robert Schmidtke added a comment - - edited

          Hi Ravi,

          my guess is that since trans is a long, and ByteBuffer.allocate(...) only takes ints, a "blind" cast in the Math.min(...) operation might yield a negative value for trans > Integer.MAX_VALUE:

          package test;
          import java.io.IOException;
          import java.nio.ByteBuffer;
          public class Test {
              public static void main(String[] args) throws IOException {
                  long trans = Integer.MAX_VALUE + 1L;
                  int shuffleBufferSize = 131072;
                  ByteBuffer byteBuffer = ByteBuffer
                          .allocate(Math.min(shuffleBufferSize, (int) trans));
                  System.out.println(byteBuffer.capacity());
              }
          }
          

          gives

          Exception in thread "main" java.lang.IllegalArgumentException
          	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
          	at test.Test.main(Test.java:9)
          

          whereas

          package test;
          import java.io.IOException;
          import java.nio.ByteBuffer;
          public class Test {
              public static void main(String[] args) throws IOException {
                  long trans = Integer.MAX_VALUE + 1L;
                  int shuffleBufferSize = 131072;
                  ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(shuffleBufferSize,
                          trans > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) trans));
                  System.out.println(byteBuffer.capacity());
              }
          }
          

          correctly outputs 131072.

          As for the other 18% issue, I am not yet quite sure. I'm currently investigating the I/O of each of Hadoop's components, using TeraSort as my working horse. For a TeraSort of 1024 GiB, YARN reads a total of 1169 GiB in my setup, with transferTo.allowed=true. Taking into account that the MapReduce framework counters report 1065 GiB of serialized map output (and thus, 1065 GiB of shuffled bytes), the overhead is "only" 104 GiB for 1024 GiB input, or roughly 10%. So there are additional reads, even when using transferTo. Maybe it has something to do with resource distribution? Note that I have disabled speculative execution, so there are no extra executions of additional reducers, which might read the same map output multiple times. However, there are 140 "Failed Shuffles" – does that mean that they have been executed again? If so, and assuming that for 1024 GiB of input, each reducer needs to fetch 1065 / 2048 = 0.52 GiB, there is an additional overhead of 2048 * 0.52 = 73 GiB. What remains is an unexplained 31 GiB.

          When running TeraSort with transferTo.allowed=false and my patch as described above, sorting 256 GiB, the MapReduce framework counters report 266 GiB of serialized map output (and thus, 266 GiB of shuffled bytes). In this run, there were no "Failed Shuffles". Since my analysis reports that YARN reads 300 GiB, the overhead is actually probably more correctly measured as 34 GiB (= 13% of 256 GiB) instead of 45 GiB (= 18% of 256 GiB). These 34 GiB are close enough to the 31 GiB for 1024 GiB input (see above), so maybe this is constant overhead for 2048 mappers and 2048 reducers?

          Anyway, since I'll be investigating this behavior in the future, digging into per-file statistics, I'll be able to report exactly which file is read how often / how much of it is read. I can then tell exactly what is happening on disk. Since this is part of unpublished research, however, I'm afraid I can only report the results later.

          Show
          rosch Robert Schmidtke added a comment - - edited Hi Ravi, my guess is that since trans is a long , and ByteBuffer.allocate(...) only takes ints , a "blind" cast in the Math.min(...) operation might yield a negative value for trans > Integer.MAX_VALUE : package test; import java.io.IOException; import java.nio.ByteBuffer; public class Test { public static void main( String [] args) throws IOException { long trans = Integer .MAX_VALUE + 1L; int shuffleBufferSize = 131072; ByteBuffer byteBuffer = ByteBuffer .allocate( Math .min(shuffleBufferSize, ( int ) trans)); System .out.println(byteBuffer.capacity()); } } gives Exception in thread "main" java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at test.Test.main(Test.java:9) whereas package test; import java.io.IOException; import java.nio.ByteBuffer; public class Test { public static void main( String [] args) throws IOException { long trans = Integer .MAX_VALUE + 1L; int shuffleBufferSize = 131072; ByteBuffer byteBuffer = ByteBuffer.allocate( Math .min(shuffleBufferSize, trans > Integer .MAX_VALUE ? Integer .MAX_VALUE : ( int ) trans)); System .out.println(byteBuffer.capacity()); } } correctly outputs 131072 . As for the other 18% issue, I am not yet quite sure. I'm currently investigating the I/O of each of Hadoop's components, using TeraSort as my working horse. For a TeraSort of 1024 GiB, YARN reads a total of 1169 GiB in my setup, with transferTo.allowed=true . Taking into account that the MapReduce framework counters report 1065 GiB of serialized map output (and thus, 1065 GiB of shuffled bytes), the overhead is "only" 104 GiB for 1024 GiB input, or roughly 10%. So there are additional reads, even when using transferTo . Maybe it has something to do with resource distribution? Note that I have disabled speculative execution, so there are no extra executions of additional reducers, which might read the same map output multiple times. However, there are 140 "Failed Shuffles" – does that mean that they have been executed again? If so, and assuming that for 1024 GiB of input, each reducer needs to fetch 1065 / 2048 = 0.52 GiB , there is an additional overhead of 2048 * 0.52 = 73 GiB . What remains is an unexplained 31 GiB. When running TeraSort with transferTo.allowed=false and my patch as described above, sorting 256 GiB, the MapReduce framework counters report 266 GiB of serialized map output (and thus, 266 GiB of shuffled bytes). In this run, there were no "Failed Shuffles". Since my analysis reports that YARN reads 300 GiB, the overhead is actually probably more correctly measured as 34 GiB (= 13% of 256 GiB) instead of 45 GiB (= 18% of 256 GiB). These 34 GiB are close enough to the 31 GiB for 1024 GiB input (see above), so maybe this is constant overhead for 2048 mappers and 2048 reducers? Anyway, since I'll be investigating this behavior in the future, digging into per-file statistics, I'll be able to report exactly which file is read how often / how much of it is read. I can then tell exactly what is happening on disk. Since this is part of unpublished research, however, I'm afraid I can only report the results later.
          Hide
          raviprak Ravi Prakash added a comment -

          Hi Robert! Thanks for filing the JIRA and your contribution! I'm adding you as a contributor and assigning the JIRA to you. Could you please post a patch file to this JIRA? You can name the patch file MAPREDUCE-6923.00.patch . One minor nit is that we limit lines to 80 characters. Could you please fix that in the patch file?
          Also since trans is guaranteed to be positive and shuffleBufferSize is an integer, maybe we don't really need the ternary operator condition? Up to you to keep it though.

          I'm not surprised that there isn't an improvement in job performance but the read overhead improvement is great. Do you know where the 18% overhead is going?

          The patch sounds reasonable to me. Jason Lowe, Nikola Vujic, Chris Nauroth do you have any comments? The diff he's proposing is in the link to the word "e.g. here" .

          Show
          raviprak Ravi Prakash added a comment - Hi Robert! Thanks for filing the JIRA and your contribution! I'm adding you as a contributor and assigning the JIRA to you. Could you please post a patch file to this JIRA? You can name the patch file MAPREDUCE-6923.00.patch . One minor nit is that we limit lines to 80 characters. Could you please fix that in the patch file? Also since trans is guaranteed to be positive and shuffleBufferSize is an integer, maybe we don't really need the ternary operator condition? Up to you to keep it though. I'm not surprised that there isn't an improvement in job performance but the read overhead improvement is great. Do you know where the 18% overhead is going? The patch sounds reasonable to me. Jason Lowe , Nikola Vujic , Chris Nauroth do you have any comments? The diff he's proposing is in the link to the word "e.g. here " .

            People

            • Assignee:
              rosch Robert Schmidtke
              Reporter:
              rosch Robert Schmidtke
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development