Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5114

PartitionState update with finished execution fails

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.1.4
    • Component/s: Network
    • Labels:
      None

      Description

      If a partition state request is triggered for a producer that finishes before the request arrives, the execution is unregistered and the producer cannot be found. In this case the PartitionState returns null and the job fails.

      We need to check the producer location via the intermediate result partition in this case.

      See here: https://api.travis-ci.org/jobs/177668505/log.txt?deansi=true

        Issue Links

          Activity

          Hide
          till.rohrmann Till Rohrmann added a comment -

          The failing TimestampITCase seems to be related.

          https://api.travis-ci.org/jobs/177675631/log.txt?deansi=true

          Show
          till.rohrmann Till Rohrmann added a comment - The failing TimestampITCase seems to be related. https://api.travis-ci.org/jobs/177675631/log.txt?deansi=true
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/2912

          FLINK-5114 [network] Handle partition producer state check for unregistered executions

          If a partition state request is triggered for a producer that terminates before the request arrives, the execution is unregistered and the producer cannot be found. In this case the partition state returns `null` and the job fails although this is perfectly legal.

          For these cases, we look up the respective intermediate result partition and find the producing execution manually instead of looking it up via the registered executions.

          I've removed some unused message parameters that have become obsolete with other recent refactorings.

          This adds a hash map to `IntermediateResult` for lookups by partition ID. I didn't dare to change the partition connect logic in other places that is tightly coupled to the partitions being held as an array. As an alternative, we could to a linear scan over the partitions as this happens rarely. The memory overhead for the hash map should be acceptable as it's created per produced result and only has entries for each partition.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 5114-partition_state

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2912.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2912


          commit 6308ff0aba49f026c23c67af4a2f3943b16f2b31
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-11-22T15:15:04Z

          FLINK-5114 [network] Handle partition producer state check for unregistered executions


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2912 FLINK-5114 [network] Handle partition producer state check for unregistered executions If a partition state request is triggered for a producer that terminates before the request arrives, the execution is unregistered and the producer cannot be found. In this case the partition state returns `null` and the job fails although this is perfectly legal. For these cases, we look up the respective intermediate result partition and find the producing execution manually instead of looking it up via the registered executions. I've removed some unused message parameters that have become obsolete with other recent refactorings. This adds a hash map to `IntermediateResult` for lookups by partition ID. I didn't dare to change the partition connect logic in other places that is tightly coupled to the partitions being held as an array. As an alternative, we could to a linear scan over the partitions as this happens rarely. The memory overhead for the hash map should be acceptable as it's created per produced result and only has entries for each partition. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 5114-partition_state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2912.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2912 commit 6308ff0aba49f026c23c67af4a2f3943b16f2b31 Author: Ufuk Celebi <uce@apache.org> Date: 2016-11-22T15:15:04Z FLINK-5114 [network] Handle partition producer state check for unregistered executions
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/2913

          [backport] FLINK-5114 [network] Handle partition producer state check for unregistered executions

          This is a backport of #2912.

          Code between 1.1 and 1.2 slightly changed. I decided to further backport the new callback method via `TaskActions`.

          This would be important to get into 1.1.4.

          \cc @tillrohrmann

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 5114-partition_state-1.1

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2913.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2913


          commit 628c6e63c424ef11f7d650f8e88ea50af515fb84
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-11-30T14:09:44Z

          FLINK-5114 [network] Handle partition producer state check for unregistered executions


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2913 [backport] FLINK-5114 [network] Handle partition producer state check for unregistered executions This is a backport of #2912. Code between 1.1 and 1.2 slightly changed. I decided to further backport the new callback method via `TaskActions`. This would be important to get into 1.1.4. \cc @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 5114-partition_state-1.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2913.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2913 commit 628c6e63c424ef11f7d650f8e88ea50af515fb84 Author: Ufuk Celebi <uce@apache.org> Date: 2016-11-30T14:09:44Z FLINK-5114 [network] Handle partition producer state check for unregistered executions
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2913

          Having a quick look at this: I think this breaks with a fundamental design in the ExecutionGraph:
          The `findExecutionAttemptWithId(...)` method searches the prior execution attempts.

          Why is that necessary? Can we not just assume that if the attempt is not equal to the current execution attempt, then the status is some form of "disposed".

          If the produced result is finished, the execution will still not be in the "prior execution attempts". That can only happen once the task restarts, in which case you should not try and fetch the partition any more.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2913 Having a quick look at this: I think this breaks with a fundamental design in the ExecutionGraph: The `findExecutionAttemptWithId(...)` method searches the prior execution attempts. Why is that necessary? Can we not just assume that if the attempt is not equal to the current execution attempt, then the status is some form of "disposed". If the produced result is finished, the execution will still not be in the "prior execution attempts". That can only happen once the task restarts, in which case you should not try and fetch the partition any more.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2913

          > Why is that necessary? Can we not just assume that if the attempt is not equal to the current execution attempt, then the status is some form of "disposed".

          It's not necessary. It's perfectly fine to do it as you describe. Not having the `currentExecution` set to the producer execution means that the producer was restarted (hence cancelled or failed). This only made the handling in `Task` easier, but it should not dictate this change in the `ExecutionVertex`. I'll change that to only check the `currentExecution` and handle it accordingly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2913 > Why is that necessary? Can we not just assume that if the attempt is not equal to the current execution attempt, then the status is some form of "disposed". It's not necessary. It's perfectly fine to do it as you describe. Not having the `currentExecution` set to the producer execution means that the producer was restarted (hence cancelled or failed). This only made the handling in `Task` easier, but it should not dictate this change in the `ExecutionVertex`. I'll change that to only check the `currentExecution` and handle it accordingly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2912

          Rebased on master and implemented feedback from #2913.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2912 Rebased on master and implemented feedback from #2913.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2913

          I removed the `findExecutionAttemptWithId` and only check the latest attempt. If that does not match the expected producer attempt, I answer with a `PartitionProducerDisposedException` to which the requesting `Task` reacts with a `cancelExecution`. I would really like to merge this and kick off a new RC for 1.1.4 soon.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2913 I removed the `findExecutionAttemptWithId` and only check the latest attempt. If that does not match the expected producer attempt, I answer with a `PartitionProducerDisposedException` to which the requesting `Task` reacts with a `cancelExecution`. I would really like to merge this and kick off a new RC for 1.1.4 soon.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2913

          Closing in favour of #2975.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2913 Closing in favour of #2975.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce closed the pull request at:

          https://github.com/apache/flink/pull/2913

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce closed the pull request at: https://github.com/apache/flink/pull/2913
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/2975

          [backport] FLINK-5114 [network] Handle partition producer state check for unregistered executions

          Reverted some changes made in #2913 after a discussion with @StephanEwen and decided to close the other one in favour of this PR for cleaner diffs.

          The main difference to the previous variants in #2913 and #2912 (for `master`) is that here I stick to the JobManager side changes only. The clumsy way of how the TaskManagers ask the JobManager for the producer state via a `tell` that is manually routed back to the `Task` is kept in order to keep the changes minimially invasive, which is better to oversee given that this goes into a bugfix release.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 5114-partition_state-1.1-reworked

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2975.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2975



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2975 [backport] FLINK-5114 [network] Handle partition producer state check for unregistered executions Reverted some changes made in #2913 after a discussion with @StephanEwen and decided to close the other one in favour of this PR for cleaner diffs. The main difference to the previous variants in #2913 and #2912 (for `master`) is that here I stick to the JobManager side changes only. The clumsy way of how the TaskManagers ask the JobManager for the producer state via a `tell` that is manually routed back to the `Task` is kept in order to keep the changes minimially invasive, which is better to oversee given that this goes into a bugfix release. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 5114-partition_state-1.1-reworked Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2975.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2975
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2975#discussion_r91706223

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala —
          @@ -503,15 +504,37 @@ class TaskManager(
          )
          }

          • case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
          • Option(runningTasks.get(taskExecutionId)) match {
            + // Updates the partition producer state
            + case PartitionProducerState(receiverExecutionId, result) =>
            + Option(runningTasks.get(receiverExecutionId)) match {
            case Some(task) =>
          • task.onPartitionStateUpdate(taskResultId, partitionId, state)
            + try {
            + result match {
            + case Left((intermediateDataSetId, resultPartitionId, producerState)) =>
            + // Forward the state update to the task
            + task.onPartitionStateUpdate(
            + intermediateDataSetId,
            + resultPartitionId.getPartitionId,
            + producerState)
            +
            + case Right(failure) =>
            + // Cancel or fail the execution
            + if (failure.isInstanceOf[PartitionProducerDisposedException]) {
            + log.debug("Partition producer disposed. Cancelling execution.", failure)
              • End diff –

          I think this log statement should be on `info` level. Otherwise, going through the logs in standard settings (only info level logging) leaves you wondering why the task is cancelled all of a sudden.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2975#discussion_r91706223 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala — @@ -503,15 +504,37 @@ class TaskManager( ) } case PartitionState(taskExecutionId, taskResultId, partitionId, state) => Option(runningTasks.get(taskExecutionId)) match { + // Updates the partition producer state + case PartitionProducerState(receiverExecutionId, result) => + Option(runningTasks.get(receiverExecutionId)) match { case Some(task) => task.onPartitionStateUpdate(taskResultId, partitionId, state) + try { + result match { + case Left((intermediateDataSetId, resultPartitionId, producerState)) => + // Forward the state update to the task + task.onPartitionStateUpdate( + intermediateDataSetId, + resultPartitionId.getPartitionId, + producerState) + + case Right(failure) => + // Cancel or fail the execution + if (failure.isInstanceOf [PartitionProducerDisposedException] ) { + log.debug("Partition producer disposed. Cancelling execution.", failure) End diff – I think this log statement should be on `info` level. Otherwise, going through the logs in standard settings (only info level logging) leaves you wondering why the task is cancelled all of a sudden.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2975

          Thanks for your review Stephan! Going to address your comment and merge this for 1.1.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2975 Thanks for your review Stephan! Going to address your comment and merge this for 1.1.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2912

          Forwarding the feedback from #2975 since the main changes are similar and merging this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2912 Forwarding the feedback from #2975 since the main changes are similar and merging this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce closed the pull request at:

          https://github.com/apache/flink/pull/2975

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce closed the pull request at: https://github.com/apache/flink/pull/2975
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2912

          Build passed. Going to merge with the next batch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2912 Build passed. Going to merge with the next batch.
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in a078666 (master), 2b612f2 (release-1.1)

          Show
          uce Ufuk Celebi added a comment - Fixed in a078666 (master), 2b612f2 (release-1.1)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2912

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2912

            People

            • Assignee:
              uce Ufuk Celebi
              Reporter:
              uce Ufuk Celebi
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development