Uploaded image for project: 'Hadoop Map/Reduce'
  1. Hadoop Map/Reduce
  2. MAPREDUCE-6108

ShuffleError OOM while reserving memory by MergeManagerImpl

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Cannot Reproduce
    • Affects Version/s: 2.5.1
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None
    • Target Version/s:

      Activity

      Hide
      leftnoteasy Wangda Tan added a comment -

      No responses, close as cannot reproduce, please reopen it if anybody see this issue again.

      Thanks.

      Show
      leftnoteasy Wangda Tan added a comment - No responses, close as cannot reproduce, please reopen it if anybody see this issue again. Thanks.
      Hide
      leftnoteasy Wangda Tan added a comment -

      Karthik Kambatla, Vinod Kumar Vavilapalli is this still an issue in existing code base? Can we close as not-reproducible if it cannot be reproduced?

      Thanks,

      Show
      leftnoteasy Wangda Tan added a comment - Karthik Kambatla , Vinod Kumar Vavilapalli is this still an issue in existing code base? Can we close as not-reproducible if it cannot be reproduced? Thanks,
      Hide
      gu chi gu-chi added a comment -

      Xuan Gongany thoughts other than changing default configuration

      Show
      gu chi gu-chi added a comment - Xuan Gong any thoughts other than changing default configuration
      Hide
      gu chi gu-chi added a comment -

      Vinod Kumar Vavilapalli from above history, I can not see what is the actual solution designed, can you please explain what will be done in 2.8.0 to this issue?

      Show
      gu chi gu-chi added a comment - Vinod Kumar Vavilapalli from above history, I can not see what is the actual solution designed, can you please explain what will be done in 2.8.0 to this issue?
      Hide
      vinodkv Vinod Kumar Vavilapalli added a comment -

      Moving bugs out of previously closed releases into the next minor release 2.8.0.

      Show
      vinodkv Vinod Kumar Vavilapalli added a comment - Moving bugs out of previously closed releases into the next minor release 2.8.0.
      Hide
      xgong Xuan Gong added a comment -

      Karthik Kambatla Thanks for the investigation. It makes sense for me. I have meet the same issue here.

      But, we got the "java.lang.OutOfMemoryError: Java heap space" here. Does it mean we are running out of jvm heap memory instead of "memoryLimit" ?
      So, even we still use "if (usedMemory > memoryLimit)" condition, and use all the default configuration (memoryLimit = 0.7 * maxMemory). In the worse case, usedMemory = memoryLimit = 0.7 * maxMemory, requestedSize = 0.25*0.7*maxMemory. So, total will be 0.875 maxMemory. I guess that is why we use " (usedMemory > memoryLimit)" instead of "(usedMemory + requestedSize > memoryLimit)" here.

      I guess that we might not have a big issue on logic itself. In the same JVM, not only the shuffle need memory, but also other staff occupy some part of the memory (such as fetch thread ?). I guess this OOM happens when we are trying to use "too-many" memory for the shuffle(such as the worst case happens).

      I think for now, this is more related to the configuration expertise.
      *mapreduce.reduce.shuffle.input.buffer.percent
      *mapreduce.reduce.shuffle.memory.limit.percent
      *mapreduce.reduce.shuffle.parallelcopies

      Thoughts ?

      Show
      xgong Xuan Gong added a comment - Karthik Kambatla Thanks for the investigation. It makes sense for me. I have meet the same issue here. But, we got the "java.lang.OutOfMemoryError: Java heap space" here. Does it mean we are running out of jvm heap memory instead of "memoryLimit" ? So, even we still use "if (usedMemory > memoryLimit)" condition, and use all the default configuration (memoryLimit = 0.7 * maxMemory). In the worse case, usedMemory = memoryLimit = 0.7 * maxMemory, requestedSize = 0.25*0.7*maxMemory. So, total will be 0.875 maxMemory. I guess that is why we use " (usedMemory > memoryLimit)" instead of "(usedMemory + requestedSize > memoryLimit)" here. I guess that we might not have a big issue on logic itself. In the same JVM, not only the shuffle need memory, but also other staff occupy some part of the memory (such as fetch thread ?). I guess this OOM happens when we are trying to use "too-many" memory for the shuffle(such as the worst case happens). I think for now, this is more related to the configuration expertise. *mapreduce.reduce.shuffle.input.buffer.percent *mapreduce.reduce.shuffle.memory.limit.percent *mapreduce.reduce.shuffle.parallelcopies Thoughts ?
      Hide
      jadenpark Jaden Park added a comment -

      I am still not clear on why we resort to (usedMemory > memoryLimit) instead of (usedMemory + requestedSize > memoryLimit). How exactly do all fetchers stall, and is there no resolution we could prevent it?

      Show
      jadenpark Jaden Park added a comment - I am still not clear on why we resort to (usedMemory > memoryLimit) instead of (usedMemory + requestedSize > memoryLimit). How exactly do all fetchers stall, and is there no resolution we could prevent it?
      Hide
      kkambatl Karthik Kambatla (Inactive) added a comment -

      As reported by Dongwook Kwon:

      Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#14
              at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
              at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377)
              at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:415)
              at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
              at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
      Caused by: java.lang.OutOfMemoryError: Java heap space
              at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56)
              at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46)
              at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63)
              at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297)
              at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287)
              at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411)
              at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341)
              at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165)
      
      

      Lowering mapreduce.reduce.shuffle.input.buffer.percent value mitigate the issue. However depending on the data and the memory system had, the issue comes back.

      From my test, when it's happening , the issue is very constant, memory foot print, and the point OOM happens was the same, regardless of the value of mapreduce.reduce.shuffle.input.buffer.percent( my test had default 0.7).

      Here is what I found.

      According to MergeManagerImpl which implemented by https://issues.apache.org/jira/browse/MAPREDUCE-4808, it appears the reserve method deliberately allows just one thread(fetcher) to go over "memoryLimit" by checking the condition (usedMemory > memoryLimit) instead of (usedMemory + requestedSize > memoryLimit) to prevent stalling all fetchers issue as comment indicated. This seems working well most of times. However when the one fetcher tries to reserver usedMemory + requestedSize more than memoryLimit(Runtime.getRuntime().maxMemory()), I think there is OOM issue.

       @Override
      public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
      long requestedSize,
      int fetcher
      ) throws IOException {
      if (!canShuffleToMemory(requestedSize)) {
      LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
      " is greater than maxSingleShuffleLimit (" +
      maxSingleShuffleLimit + ")");
      return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
      jobConf, mapOutputFile, fetcher, true);
      }
      // Stall shuffle if we are above the memory limit
      // It is possible that all threads could just be stalling and not make
      // progress at all. This could happen when:
      //
      // requested size is causing the used memory to go above limit &&
      // requested size < singleShuffleLimit &&
      // current used size < mergeThreshold (merge will not get triggered)
      //
      // To avoid this from happening, we allow exactly one thread to go past
      // the memory limit. We check (usedMemory > memoryLimit) and not
      // (usedMemory + requestedSize > memoryLimit). When this thread is done
      // fetching, this will automatically trigger a merge thereby unlocking
      // all the stalled threads
      if (usedMemory > memoryLimit) {
      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
      + ") is greater than memoryLimit (" + memoryLimit + ")." +
      " CommitMemory is (" + commitMemory + ")");
      return null;
      }
      // Allow the in-memory shuffle to progress
      LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
      + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
      + "CommitMemory is (" + commitMemory + ")");
      return unconditionalReserve(mapId, requestedSize, true);
      }
      
      

      https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java#L256

      When the one fetcher tries to reserve (usedMemory + requestedSize > memoryLimit), depending on the memory the reducer has, BoundedByteArrayOutputStream has the OOM issue at

       public BoundedByteArrayOutputStream(int capacity, int limit) {
      this(new byte[capacity], 0, limit);
      }
      

      https://github.com/apache/hadoop-common/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java#L56

      memoryLimit is Runtime.getRuntime().maxMemory(), MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES seems for unit test.

          this.memoryLimit = 
            (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
                Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
              * maxInMemCopyUse);
      

      It explains why lowering mapreduce.reduce.shuffle.input.buffer.percent value resolves this issue and why the same setting sometimes works and doesn't.

      But I wasn't sure this is correct and what is the expected behavior for stalling fetchers issue to fix OOM issue as commented pointed out.

      Show
      kkambatl Karthik Kambatla (Inactive) added a comment - As reported by Dongwook Kwon: Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#14 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:377) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56) at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46) at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:297) at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:287) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:411) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:341) at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:165) Lowering mapreduce.reduce.shuffle.input.buffer.percent value mitigate the issue. However depending on the data and the memory system had, the issue comes back. From my test, when it's happening , the issue is very constant, memory foot print, and the point OOM happens was the same, regardless of the value of mapreduce.reduce.shuffle.input.buffer.percent( my test had default 0.7). Here is what I found. According to MergeManagerImpl which implemented by https://issues.apache.org/jira/browse/MAPREDUCE-4808 , it appears the reserve method deliberately allows just one thread(fetcher) to go over "memoryLimit" by checking the condition (usedMemory > memoryLimit) instead of (usedMemory + requestedSize > memoryLimit) to prevent stalling all fetchers issue as comment indicated. This seems working well most of times. However when the one fetcher tries to reserver usedMemory + requestedSize more than memoryLimit(Runtime.getRuntime().maxMemory()), I think there is OOM issue. @Override public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, long requestedSize, int fetcher ) throws IOException { if (!canShuffleToMemory(requestedSize)) { LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")" ); return new OnDiskMapOutput<K,V>(mapId, reduceId, this , requestedSize, jobConf, mapOutputFile, fetcher, true ); } // Stall shuffle if we are above the memory limit // It is possible that all threads could just be stalling and not make // progress at all. This could happen when: // // requested size is causing the used memory to go above limit && // requested size < singleShuffleLimit && // current used size < mergeThreshold (merge will not get triggered) // // To avoid this from happening, we allow exactly one thread to go past // the memory limit. We check (usedMemory > memoryLimit) and not // (usedMemory + requestedSize > memoryLimit). When this thread is done // fetching, this will automatically trigger a merge thereby unlocking // all the stalled threads if (usedMemory > memoryLimit) { LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory + ") is greater than memoryLimit (" + memoryLimit + ")." + " CommitMemory is (" + commitMemory + ")" ); return null ; } // Allow the in-memory shuffle to progress LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")." + "CommitMemory is (" + commitMemory + ")" ); return unconditionalReserve(mapId, requestedSize, true ); } https://github.com/apache/hadoop-common/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java#L256 When the one fetcher tries to reserve (usedMemory + requestedSize > memoryLimit), depending on the memory the reducer has, BoundedByteArrayOutputStream has the OOM issue at public BoundedByteArrayOutputStream( int capacity, int limit) { this ( new byte [capacity], 0, limit); } https://github.com/apache/hadoop-common/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java#L56 memoryLimit is Runtime.getRuntime().maxMemory(), MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES seems for unit test. this .memoryLimit = ( long )(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Math .min( Runtime .getRuntime().maxMemory(), Integer .MAX_VALUE)) * maxInMemCopyUse); It explains why lowering mapreduce.reduce.shuffle.input.buffer.percent value resolves this issue and why the same setting sometimes works and doesn't. But I wasn't sure this is correct and what is the expected behavior for stalling fetchers issue to fix OOM issue as commented pointed out.

        People

        • Assignee:
          Unassigned
          Reporter:
          dongwook Dongwook Kwon
        • Votes:
          0 Vote for this issue
          Watchers:
          18 Start watching this issue

          Dates

          • Created:
            Updated:
            Resolved:

            Development