Uploaded image for project: 'Apache Tez'
  1. Apache Tez
  2. TEZ-3831

Reduce Unordered memory needed for storing empty completed events

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.1
    • Component/s: None
    • Labels:
      None

      Description

      the completedInputs blocking queue is used to store inputs for the UnorderedKVReader to consume. With Auto-reduce parallelism enabled and nearly all empty inputs, the reader can't prune the empty events from the blocking queue fast enough to keep up. In my scenario, an OOM occurred.

      1. Screen Shot 2017-09-13 at 4.55.11 PM.png
        186 kB
        Jonathan Eagles
      2. TEZ-3831.001.patch
        2 kB
        Jonathan Eagles
      3. TEZ-3831.001-addendum.patch
        1 kB
        Jonathan Eagles

        Activity

        Hide
        jeagles Jonathan Eagles added a comment -

        From the below snippet of the task log, we can see that the UnorderedKVReader was doing a blocking take from the completedInputs, while the ShuffleManager was fetching, and adding empty completes to the completedInputs queue. While 11 million empty partitions had been fetched only 8 million were in the completedInputs queue.

        2017-09-12 22:01:11,656 [INFO] [TezTaskEventRouter{attempt_1502569884713_1043511_1_05_000000_0}] |impl.ShuffleInputEventHandlerImpl|: scope-416: numDmeEventsSeen=11297691, numDmeEventsSeenWithNoData=11276605, numObsoletionEventsSeen=0
        2017-09-12 22:01:01,531 [ERROR] [Fetcher_B {scope_416} #19] |impl.ShuffleManager|: scope_416: Fetcher failed with error:
        java.lang.OutOfMemoryError: GC overhead limit exceeded
                at org.apache.tez.runtime.library.common.sort.impl.IFileInputStream.<init>(IFileInputStream.java:88)
                at org.apache.tez.runtime.library.common.sort.impl.IFile$Reader.readToMemory(IFile.java:617)
                at org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.shuffleToMemory(ShuffleUtils.java:121)
                at org.apache.tez.runtime.library.common.shuffle.Fetcher.fetchInputs(Fetcher.java:950)
                at org.apache.tez.runtime.library.common.shuffle.Fetcher.doHttpFetch(Fetcher.java:599)
                at org.apache.tez.runtime.library.common.shuffle.Fetcher.doHttpFetch(Fetcher.java:486)
                at org.apache.tez.runtime.library.common.shuffle.Fetcher.callInternal(Fetcher.java:284)
                at org.apache.tez.runtime.library.common.shuffle.Fetcher.callInternal(Fetcher.java:76)
                at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
        2017-09-12 22:01:42,524 [INFO] [TaskHeartbeatThread] |task.TezTaskRunner2|: Attempting to abort attempt_1502569884713_1043511_1_05_000000_0 due to an invocation of umbilicalFatalError
        2017-09-12 22:01:46,941 [WARN] [TezChild] |readers.UnorderedKVReader|: Interrupted while waiting for next available input
        java.lang.InterruptedException
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
                at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
                at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
                at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:893)
                at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202)
                at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125)
                at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.collectSample(PigProcessor.java:417)
                at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.initializeInputs(PigProcessor.java:320)
                at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.run(PigProcessor.java:228)
                at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
                at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
                at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
                at java.security.AccessController.doPrivileged(Native Method)
                at javax.security.auth.Subject.doAs(Subject.java:422)
                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
                at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
                at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
                at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
        2017-09-12 22:01:46,941 [ERROR] [TezChild] |runtime.PigProcessor|: Encountered exception while processing:
        org.apache.tez.runtime.library.api.IOInterruptedException: java.lang.InterruptedException
                at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:206)
                at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125)
                at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.collectSample(PigProcessor.java:417)
                at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.initializeInputs(PigProcessor.java:320)
                at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.run(PigProcessor.java:228)
                at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
                at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
                at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
                at java.security.AccessController.doPrivileged(Native Method)
                at javax.security.auth.Subject.doAs(Subject.java:422)
                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936)
                at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
                at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
                at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
        Caused by: java.lang.InterruptedException
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
                at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
                at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
                at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:893)
                at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202)
                ... 17 more
        
        Show
        jeagles Jonathan Eagles added a comment - From the below snippet of the task log, we can see that the UnorderedKVReader was doing a blocking take from the completedInputs, while the ShuffleManager was fetching, and adding empty completes to the completedInputs queue. While 11 million empty partitions had been fetched only 8 million were in the completedInputs queue. 2017-09-12 22:01:11,656 [INFO] [TezTaskEventRouter{attempt_1502569884713_1043511_1_05_000000_0}] |impl.ShuffleInputEventHandlerImpl|: scope-416: numDmeEventsSeen=11297691, numDmeEventsSeenWithNoData=11276605, numObsoletionEventsSeen=0 2017-09-12 22:01:01,531 [ERROR] [Fetcher_B {scope_416} #19] |impl.ShuffleManager|: scope_416: Fetcher failed with error: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.tez.runtime.library.common.sort.impl.IFileInputStream.<init>(IFileInputStream.java:88) at org.apache.tez.runtime.library.common.sort.impl.IFile$Reader.readToMemory(IFile.java:617) at org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.shuffleToMemory(ShuffleUtils.java:121) at org.apache.tez.runtime.library.common.shuffle.Fetcher.fetchInputs(Fetcher.java:950) at org.apache.tez.runtime.library.common.shuffle.Fetcher.doHttpFetch(Fetcher.java:599) at org.apache.tez.runtime.library.common.shuffle.Fetcher.doHttpFetch(Fetcher.java:486) at org.apache.tez.runtime.library.common.shuffle.Fetcher.callInternal(Fetcher.java:284) at org.apache.tez.runtime.library.common.shuffle.Fetcher.callInternal(Fetcher.java:76) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-09-12 22:01:42,524 [INFO] [TaskHeartbeatThread] |task.TezTaskRunner2|: Attempting to abort attempt_1502569884713_1043511_1_05_000000_0 due to an invocation of umbilicalFatalError 2017-09-12 22:01:46,941 [WARN] [TezChild] |readers.UnorderedKVReader|: Interrupted while waiting for next available input java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492) at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680) at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:893) at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202) at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125) at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.collectSample(PigProcessor.java:417) at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.initializeInputs(PigProcessor.java:320) at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.run(PigProcessor.java:228) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-09-12 22:01:46,941 [ERROR] [TezChild] |runtime.PigProcessor|: Encountered exception while processing: org.apache.tez.runtime.library.api.IOInterruptedException: java.lang.InterruptedException at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:206) at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125) at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.collectSample(PigProcessor.java:417) at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.initializeInputs(PigProcessor.java:320) at org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor.run(PigProcessor.java:228) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73) at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1936) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61) at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492) at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680) at org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:893) at org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202) ... 17 more
        Hide
        jeagles Jonathan Eagles added a comment -

        Show
        jeagles Jonathan Eagles added a comment -
        Hide
        jeagles Jonathan Eagles added a comment -

        This approach assumes that we can avoid adding empty or null fetched inputs into the completed inputs blocking queue. The inputs ready trigger is still flagged in case all inputs are empty.

        Rohini Palaniswamy, I think this is the scenario we were looking at earlier. Eric Badger, can you have a look at this patch to see what I have missed.

        Show
        jeagles Jonathan Eagles added a comment - This approach assumes that we can avoid adding empty or null fetched inputs into the completed inputs blocking queue. The inputs ready trigger is still flagged in case all inputs are empty. Rohini Palaniswamy , I think this is the scenario we were looking at earlier. Eric Badger , can you have a look at this patch to see what I have missed.
        Hide
        rohini Rohini Palaniswamy added a comment -

        +1. Looks good to me

        Show
        rohini Rohini Palaniswamy added a comment - +1. Looks good to me
        Hide
        kshukla Kuhu Shukla added a comment - - edited

        Thank you Jonathan Eagles for reporting this and the patch.
        +1 . LGTM.

        Show
        kshukla Kuhu Shukla added a comment - - edited Thank you Jonathan Eagles for reporting this and the patch. +1 . LGTM.
        Hide
        kshukla Kuhu Shukla added a comment -

        Thank you Jonathan Eagles. Committed this to master.

        Show
        kshukla Kuhu Shukla added a comment - Thank you Jonathan Eagles . Committed this to master.
        Hide
        tezqa TezQA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12886966/TEZ-3831.001.patch
        against master revision 7e895f5.

        +1 @author. The patch does not contain any @author tags.

        -1 tests included. The patch doesn't appear to include any new or modified tests.
        Please justify why no new tests are needed for this patch.
        Also please list what manual steps were performed to verify this patch.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        +1 javadoc. There were no new javadoc warning messages.

        +1 findbugs. The patch does not introduce any new Findbugs (version 3.0.1) warnings.

        +1 release audit. The applied patch does not increase the total number of release audit warnings.

        +1 core tests. The patch passed unit tests in .

        Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/2630//testReport/
        Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/2630//console

        This message is automatically generated.

        Show
        tezqa TezQA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12886966/TEZ-3831.001.patch against master revision 7e895f5. +1 @author . The patch does not contain any @author tags. -1 tests included . The patch doesn't appear to include any new or modified tests. Please justify why no new tests are needed for this patch. Also please list what manual steps were performed to verify this patch. +1 javac . The applied patch does not increase the total number of javac compiler warnings. +1 javadoc . There were no new javadoc warning messages. +1 findbugs . The patch does not introduce any new Findbugs (version 3.0.1) warnings. +1 release audit . The applied patch does not increase the total number of release audit warnings. +1 core tests . The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/2630//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/2630//console This message is automatically generated.
        Hide
        jeagles Jonathan Eagles added a comment -

        Missed a case needed to wake up consumer thread in blocking take call. In this case adding a poison pill end of input message to prevent hang in the case where all input after take is called is NullFetchedInput and so isn't added to blocking queue.

        Instead of adding a new EndOfInputFetchedInput, I reused NullFetchedInput. Let me know if it is worth it to add this separate class and work to support it.

        Show
        jeagles Jonathan Eagles added a comment - Missed a case needed to wake up consumer thread in blocking take call. In this case adding a poison pill end of input message to prevent hang in the case where all input after take is called is NullFetchedInput and so isn't added to blocking queue. Instead of adding a new EndOfInputFetchedInput, I reused NullFetchedInput. Let me know if it is worth it to add this separate class and work to support it.
        Hide
        kshukla Kuhu Shukla added a comment -

        Thanks Jonathan Eagles for the additional patch. Using NullFetchedInput looks fine to me. The adjust call will add this input and that would cover cases of all empty partitions and some empty partitions.
        +1. Will commit this shortly.

        Show
        kshukla Kuhu Shukla added a comment - Thanks Jonathan Eagles for the additional patch. Using NullFetchedInput looks fine to me. The adjust call will add this input and that would cover cases of all empty partitions and some empty partitions. +1. Will commit this shortly.

          People

          • Assignee:
            jeagles Jonathan Eagles
            Reporter:
            jeagles Jonathan Eagles
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development