Uploaded image for project: 'Apache Tez'
  1. Apache Tez
  2. TEZ-1494

DAG hangs waiting for ShuffleManager.getNextInput()

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.5.1
    • None
    • Reviewed

    Description

      Attaching the DAG and the stack trace of the hung process.

      Thread 30071: (state = BLOCKED)

      • sun.misc.Unsafe.park(boolean, long) @bci=0 (Interpreted frame)
      • java.util.concurrent.locks.LockSupport.park(java.lang.Object) @bci=14, line=186 (Interpreted frame)
      • java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await() @bci=42, line=2043 (Interpreted frame)
      • java.util.concurrent.LinkedBlockingQueue.take() @bci=29, line=442 (Interpreted frame)
      • org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager.getNextInput() @bci=67, line=610 (Interpreted frame)
      • org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput() @bci=26, line=176 (Interpreted frame)
      • org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next() @bci=30, line=117 (Interpreted frame)

      Attachments

        1. TEZ-1494.1.patch
          7 kB
          Rajesh Balamohan
        2. TEZ-1494.2.patch
          11 kB
          Rajesh Balamohan
        3. TEZ-1494.3.patch
          26 kB
          Rajesh Balamohan
        4. TEZ-1494.4.patch
          29 kB
          Rajesh Balamohan
        5. TEZ-1494.5.patch
          29 kB
          Rajesh Balamohan
        6. TEZ-1494-DAG.dot
          6 kB
          Rajesh Balamohan

        Activity

          hitesh Hitesh Shah added a comment -

          Setting the fix version to the lowest version the patch was committed to.

          hitesh Hitesh Shah added a comment - Setting the fix version to the lowest version the patch was committed to.

          Agreed. Thanks bikassaha. Committed to branch-0.5

          commit 176b12cde8421582e05afbf1cb5e1b46f3cf0d38
          Author: Rajesh Balamohan <rbalamohan@apache.org>
          Date: Thu Sep 11 03:11:48 2014 +0530

          TEZ-1494. DAG hangs waiting for ShuffleManager.getNextInput() (Rajesh Balamohan)

          rajesh.balamohan Rajesh Balamohan added a comment - Agreed. Thanks bikassaha . Committed to branch-0.5 commit 176b12cde8421582e05afbf1cb5e1b46f3cf0d38 Author: Rajesh Balamohan <rbalamohan@apache.org> Date: Thu Sep 11 03:11:48 2014 +0530 TEZ-1494 . DAG hangs waiting for ShuffleManager.getNextInput() (Rajesh Balamohan)
          bikassaha Bikas Saha added a comment -

          lgtm. lets get this in for 0.5.1 and continue to investigate these corner cases. I dont think we are done with these issues yet. The shuffle case may still have issues as well as the corner case you mentioned in TEZ-1522. We can start working on integrating the notification mechanism with the VMs and use that for 0.6.0

          bikassaha Bikas Saha added a comment - lgtm. lets get this in for 0.5.1 and continue to investigate these corner cases. I dont think we are done with these issues yet. The shuffle case may still have issues as well as the corner case you mentioned in TEZ-1522 . We can start working on integrating the notification mechanism with the VMs and use that for 0.6.0

          bikassahaGot it; fixed it in latest patch.

          rajesh.balamohan Rajesh Balamohan added a comment - bikassaha Got it; fixed it in latest patch.
          sseth Siddharth Seth added a comment -

          Since the fix is within plugins, we'll likely need to inform Hive and Pig to make similar changes - if they're required. Hive, for example, uses a VertexManagerPlugin to process RootInputs.

          sseth Siddharth Seth added a comment - Since the fix is within plugins, we'll likely need to inform Hive and Pig to make similar changes - if they're required. Hive, for example, uses a VertexManagerPlugin to process RootInputs.
          bikassaha Bikas Saha added a comment -

          Unless I am following the flow incorrectly, onSourceTaskCompleted() calls scheduleTasks() which calls canScheduleTasks() and then schedules all tasks. Since onSourceTaskCompleted can be called multiple times scheduleTasks() will be called multiple times and thus we can end up scheduling the tasks multiple times. canScheduleTasks()/scheduleTasks() dont seem to have any checks against having already scheduled everything. I am looking at .4 patch. Am I missing something?

          bikassaha Bikas Saha added a comment - Unless I am following the flow incorrectly, onSourceTaskCompleted() calls scheduleTasks() which calls canScheduleTasks() and then schedules all tasks. Since onSourceTaskCompleted can be called multiple times scheduleTasks() will be called multiple times and thus we can end up scheduling the tasks multiple times. canScheduleTasks()/scheduleTasks() dont seem to have any checks against having already scheduled everything. I am looking at .4 patch. Am I missing something?
          • "Boolean taskIsFinished[]" was added earlier to track duplicate completions. But, in this case it is irrelevant. So retaining just numFinishedTasks in the latest patch. Removed numTasks as well in latest patch.
          • Multiple schedulings are prevented via canScheduleTasks() which is invoked within scheduleTasks()
          • Removed ImmediateStartVertexManager specific checks from testcases. Added CustomEdge in the testcase.
          rajesh.balamohan Rajesh Balamohan added a comment - "Boolean taskIsFinished[]" was added earlier to track duplicate completions. But, in this case it is irrelevant. So retaining just numFinishedTasks in the latest patch. Removed numTasks as well in latest patch. Multiple schedulings are prevented via canScheduleTasks() which is invoked within scheduleTasks() Removed ImmediateStartVertexManager specific checks from testcases. Added CustomEdge in the testcase.
          bikassaha Bikas Saha added a comment -

          From what I see, both of these can be replaced by a single boolean, right? We are only interested in 1 completion.

          +    int numFinishedTasks;
          +    Boolean taskIsFinished[];
          +

          Not sure how we are preventing multiple schedulings of the tasks because scheduleTasks() is now being called on every onSourceTaskCompleted().

          This code should probably be removed since we are trying to test the behavior and not the exact internal impl. The impl could change but the behavior should not. Right? This would also allow us to make this method private.

          +    assertTrue(((ImmediateStartVertexManager)m5.getVertexManager().getPlugin()).canScheduleTasks() == false);

          Looks like the test is only covering the ImmediateStartManager case. Adding a custom edge between M7 and a new vertex (with the new vertex having a RootInputVertexManager would cover the remaining cases. If that gets hard to write then we should at least add M7 to a new vertex with custom edge (no RootInputManager).

          bikassaha Bikas Saha added a comment - From what I see, both of these can be replaced by a single boolean, right? We are only interested in 1 completion. + int numFinishedTasks; + Boolean taskIsFinished[]; + Not sure how we are preventing multiple schedulings of the tasks because scheduleTasks() is now being called on every onSourceTaskCompleted(). This code should probably be removed since we are trying to test the behavior and not the exact internal impl. The impl could change but the behavior should not. Right? This would also allow us to make this method private. + assertTrue(((ImmediateStartVertexManager)m5.getVertexManager().getPlugin()).canScheduleTasks() == false ); Looks like the test is only covering the ImmediateStartManager case. Adding a custom edge between M7 and a new vertex (with the new vertex having a RootInputVertexManager would cover the remaining cases. If that gets hard to write then we should at least add M7 to a new vertex with custom edge (no RootInputManager).
          bikassaha Bikas Saha added a comment -

          We may put this patch in for 0.5.1 as a bug fix. I plan to work on TEZ-1547 and change all existing VMs where needed. That would go into 0.6.0 as its a major change.
          I will have comments for this jira shortly.

          bikassaha Bikas Saha added a comment - We may put this patch in for 0.5.1 as a bug fix. I plan to work on TEZ-1547 and change all existing VMs where needed. That would go into 0.6.0 as its a major change. I will have comments for this jira shortly.

          Thanks Bikas. I believe Tez-1447 is specific to InputInitializers which can not be used directly in this case; Tez-1547 would be relevant for this case. Performance would definitely be better if we rely on vertex-started notification event (as tez would be able to start scheduling instead of waiting for every task to be completed from each source vertex). Should we wait for TEZ-1547 to be completed? Or should we proceed with the current patch and refactor to event based approach when TEZ-1547 is done. Thoughts?

          rajesh.balamohan Rajesh Balamohan added a comment - Thanks Bikas. I believe Tez-1447 is specific to InputInitializers which can not be used directly in this case; Tez-1547 would be relevant for this case. Performance would definitely be better if we rely on vertex-started notification event (as tez would be able to start scheduling instead of waiting for every task to be completed from each source vertex). Should we wait for TEZ-1547 to be completed? Or should we proceed with the current patch and refactor to event based approach when TEZ-1547 is done. Thoughts?
          bikassaha Bikas Saha added a comment -

          Right TEZ-1447.

          bikassaha Bikas Saha added a comment - Right TEZ-1447 .
          sseth Siddharth Seth added a comment -

          rajesh.balamohan - the corner case is what I was referring to. If not addressing in this jira, we should create a separate one to track the issue. I'm not sure if Hive/Pig set the source fractions when they configure a ShuffleEdge.

          sseth Siddharth Seth added a comment - rajesh.balamohan - the corner case is what I was referring to. If not addressing in this jira, we should create a separate one to track the issue. I'm not sure if Hive/Pig set the source fractions when they configure a ShuffleEdge.

          bikassaha, sorry, are you referring to tez-1447 or some other ticket?

          rajesh.balamohan Rajesh Balamohan added a comment - bikassaha , sorry, are you referring to tez-1447 or some other ticket?
          rajesh.balamohan Rajesh Balamohan added a comment - - edited

          @ssseth
          Short answer: Yes, but remote chance.

          • If min/max fraction is set to 0.0 at global level (i.e via tez.shuffle-vertex-manager.min-src-fraction=0.0 in tez-site.xml), ShuffleVertexManager wouldn't change the parallelism as per the current logic. So this wouldn't be a problem.
          • With default min/max (0.25-0.75) or any range, there won't be a problem as ShuffleVertexManager would wait for some source tasks to finish
          • Corner case:
            : Set one of the vertex's min/max to (0.25/0.75) and downstream vertex to (0.0/0.0).
            : Since downstream is set to 0.0, their tasks would start immediately without knowing changes in upstream vertex.
            : If upstream vertex changes parallelism, Downstream vertex wouldn't change its parallelism.
            : However, this requires individual min/max setting for each vertex.
          rajesh.balamohan Rajesh Balamohan added a comment - - edited @ssseth Short answer: Yes, but remote chance. If min/max fraction is set to 0.0 at global level (i.e via tez.shuffle-vertex-manager.min-src-fraction=0.0 in tez-site.xml), ShuffleVertexManager wouldn't change the parallelism as per the current logic. So this wouldn't be a problem. With default min/max (0.25-0.75) or any range, there won't be a problem as ShuffleVertexManager would wait for some source tasks to finish Corner case: : Set one of the vertex's min/max to (0.25/0.75) and downstream vertex to (0.0/0.0). : Since downstream is set to 0.0, their tasks would start immediately without knowing changes in upstream vertex. : If upstream vertex changes parallelism, Downstream vertex wouldn't change its parallelism. : However, this requires individual min/max setting for each vertex.
          sseth Siddharth Seth added a comment -

          rajesh.balamohan - can we run into the same case on Shuffle edges as well - if the min and max fraction are set to 0 ? A simple M - R - R , with a shuffle connection between all vertices.

          sseth Siddharth Seth added a comment - rajesh.balamohan - can we run into the same case on Shuffle edges as well - if the min and max fraction are set to 0 ? A simple M - R - R , with a shuffle connection between all vertices.
          bikassaha Bikas Saha added a comment -

          What do you think of the comment above about using the pubsub mechanism? That may result in faster scheduling than waiting for source task completion. I am holding off review for this until I hear your views.

          bikassaha Bikas Saha added a comment - What do you think of the comment above about using the pubsub mechanism? That may result in faster scheduling than waiting for source task completion. I am holding off review for this until I hear your views.
          rajesh.balamohan Rajesh Balamohan added a comment - - edited

          Adding test cases to the patch & addressing review comments.

          rajesh.balamohan Rajesh Balamohan added a comment - - edited Adding test cases to the patch & addressing review comments.
          bikassaha Bikas Saha added a comment - - edited

          Alternatively we could wait for TEZ-1494. Then we can simply register for vertex started running notification and schedule all vertices once that notification has been received. That would be much simpler than having to monitor for essentially the same thing and faster since we dont have to wait for tasks to complete before we schedule tasks. However for that to work vertex started running notification needs to come when the vertex actually starts running (schedules tasks) instead of when the vertex state machine enters running state. Or maybe add a new notification saying vertex started scheduling.

          bikassaha Bikas Saha added a comment - - edited Alternatively we could wait for TEZ-1494 . Then we can simply register for vertex started running notification and schedule all vertices once that notification has been received. That would be much simpler than having to monitor for essentially the same thing and faster since we dont have to wait for tasks to complete before we schedule tasks. However for that to work vertex started running notification needs to come when the vertex actually starts running (schedules tasks) instead of when the vertex state machine enters running state. Or maybe add a new notification saying vertex started scheduling.
          rajesh.balamohan Rajesh Balamohan added a comment - Review board link: https://reviews.apache.org/r/25287/diff/#

          If the approach listed in latest patch is fine, do we really need ImmediateStartVertexManager?

          rajesh.balamohan Rajesh Balamohan added a comment - If the approach listed in latest patch is fine, do we really need ImmediateStartVertexManager?

          bikassaha - Can you please review?

          rajesh.balamohan Rajesh Balamohan added a comment - bikassaha - Can you please review?
          bikassaha Bikas Saha added a comment -

          problem wherein downstream vertex connected via broadcast edge is not updated when the parallelism is changed.

          This is tracked by TEZ-1059.

          bikassaha Bikas Saha added a comment - problem wherein downstream vertex connected via broadcast edge is not updated when the parallelism is changed. This is tracked by TEZ-1059 .
          sseth Siddharth Seth added a comment -

          We end up initializing a vertex when all of the following are met 1) initializer is complete, 2) edges are setup, 3) parallelism is not -1. All three conditions would be valid for Reducer3, so it would end up allowing Map5 (dependent vertex) to start.
          We currently have no way of knowing whether a Vertex will change parallelism - and whether we should block for such an operation. Alternately, we'll have to end up updating the downstream tasks with the new parallelism information - which may be a better way to deal with this since parallelism could potentially change multiple times at a later point.

          sseth Siddharth Seth added a comment - We end up initializing a vertex when all of the following are met 1) initializer is complete, 2) edges are setup, 3) parallelism is not -1. All three conditions would be valid for Reducer3, so it would end up allowing Map5 (dependent vertex) to start. We currently have no way of knowing whether a Vertex will change parallelism - and whether we should block for such an operation. Alternately, we'll have to end up updating the downstream tasks with the new parallelism information - which may be a better way to deal with this since parallelism could potentially change multiple times at a later point.

          Added small testcase in https://github.com/rajeshbalamohan/tez-1494 which can be run from local-vm to reproduce the issue. With -Dtez.shuffle-vertex-manager.enable.auto-parallel=false, DAG would succeed. Initially thought, it was due to slow-start kicking in too early, but it appears to be a problem wherein downstream vertex connected via broadcast edge is not updated when the parallelism is changed.

          rajesh.balamohan Rajesh Balamohan added a comment - Added small testcase in https://github.com/rajeshbalamohan/tez-1494 which can be run from local-vm to reproduce the issue. With -Dtez.shuffle-vertex-manager.enable.auto-parallel=false, DAG would succeed. Initially thought, it was due to slow-start kicking in too early, but it appears to be a problem wherein downstream vertex connected via broadcast edge is not updated when the parallelism is changed.

          Issue happens when auto parallelism is enabled.

          • Reducer 3 starts with 2 tasks
          • Map 5 (has 1 task and has dependency on Reducer 3) starts before Reducer 3
          • Reducer 3 alters parallelism from 2 to 1
          • Map 5 keeps waiting for inputs from 2 tasks of Reducer 3.
          rajesh.balamohan Rajesh Balamohan added a comment - Issue happens when auto parallelism is enabled. Reducer 3 starts with 2 tasks Map 5 (has 1 task and has dependency on Reducer 3) starts before Reducer 3 Reducer 3 alters parallelism from 2 to 1 Map 5 keeps waiting for inputs from 2 tasks of Reducer 3.
          hitesh Hitesh Shah added a comment -

          Should this be a blocker for 0.5.0?

          hitesh Hitesh Shah added a comment - Should this be a blocker for 0.5.0?
          sseth Siddharth Seth added a comment -

          rajesh.balamohan - have you investigated this any further ? Were all the DataMovementEvents received, was task retry in play ? etc

          sseth Siddharth Seth added a comment - rajesh.balamohan - have you investigated this any further ? Were all the DataMovementEvents received, was task retry in play ? etc

          Original bug was reported with 0.6.0-snapshot. I just retried with 0.5.0-rc1 and its present in that version as well.

          rajesh.balamohan Rajesh Balamohan added a comment - Original bug was reported with 0.6.0-snapshot. I just retried with 0.5.0-rc1 and its present in that version as well.
          hitesh Hitesh Shah added a comment -

          rajesh.balamohan Is this an issue present in the 0.5.0 RC?

          hitesh Hitesh Shah added a comment - rajesh.balamohan Is this an issue present in the 0.5.0 RC?

          People

            rajesh.balamohan Rajesh Balamohan
            rajesh.balamohan Rajesh Balamohan
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: