Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6742

Improve error message when savepoint migration fails due to task removal

    Details

      Description

      Caused by: java.lang.NullPointerException
      at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2.convertToOperatorStateSavepointV2(SavepointV2.java:171)
      at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:75)
      at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1090)

        Issue Links

          Activity

          Hide
          Zentol Chesnay Schepler added a comment -

          We can improve the error message, but this is supposed to fail. When restoring from a 1.2 savepoint in 1.3 the topology must not change as outlined in https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/upgrading.html#application-topology. Admittedly, we should make that more prominent.

          Once you took a new 1.3 savepoint you can modify them at will again.

          The allowNonRestoredState flag is ignored on purpose to prevent users from violating the above requirement by accident.

          Show
          Zentol Chesnay Schepler added a comment - We can improve the error message, but this is supposed to fail. When restoring from a 1.2 savepoint in 1.3 the topology must not change as outlined in https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/upgrading.html#application-topology . Admittedly, we should make that more prominent. Once you took a new 1.3 savepoint you can modify them at will again. The allowNonRestoredState flag is ignored on purpose to prevent users from violating the above requirement by accident.
          Hide
          gyfora Gyula Fora added a comment -

          what's the reason for not allowing the removal / new operators here?

          Show
          gyfora Gyula Fora added a comment - what's the reason for not allowing the removal / new operators here?
          Hide
          Zentol Chesnay Schepler added a comment -

          Well, alright, technically you can add operators, so long as they don't modify chains.

          The internal structure of savepoints was changed in 1.3. A 1.2 savepoint contains the state of tasks, as a list of states of the contained operators. In 1.3 a savepoint only contains the states of operators, the notion of tasks was removed. In order to map an old savepoint to a new one we have to map the state of a task to the individual operators. For non-chains this is easy, but for chains this can only be done in a reliable way if the chains don't change, i.e no operator removed nor added.

          The problem is that we don't know what happened to the missing task. It may very well be that the task was removed on purpose and the state should be lost. But it could also be that a user read that you can modify chains in 1.3 and did so before migrating the savepoint, this however only works after migration.

          This isn't a technical hurdle, but a safety precaution.

          Show
          Zentol Chesnay Schepler added a comment - Well, alright, technically you can add operators, so long as they don't modify chains. The internal structure of savepoints was changed in 1.3. A 1.2 savepoint contains the state of tasks, as a list of states of the contained operators. In 1.3 a savepoint only contains the states of operators, the notion of tasks was removed. In order to map an old savepoint to a new one we have to map the state of a task to the individual operators. For non-chains this is easy, but for chains this can only be done in a reliable way if the chains don't change, i.e no operator removed nor added. The problem is that we don't know what happened to the missing task. It may very well be that the task was removed on purpose and the state should be lost. But it could also be that a user read that you can modify chains in 1.3 and did so before migrating the savepoint, this however only works after migration. This isn't a technical hurdle, but a safety precaution.
          Hide
          gyfora Gyula Fora added a comment -

          In my case I would like to remove 2 operators while migrating because the state for those is not compatible (basically just changing the uids for those). In this case it actually becomes a techincal hurdle

          Show
          gyfora Gyula Fora added a comment - In my case I would like to remove 2 operators while migrating because the state for those is not compatible (basically just changing the uids for those). In this case it actually becomes a techincal hurdle
          Hide
          Zentol Chesnay Schepler added a comment -

          Why is the state incompatible? Is this tied to upgrading Flink or a change in the user code?

          At the moment i can only think of the following workarounds:

          1. Remove the operators from the topology and load them in 1.2 while allowing non-restored State. Take a new savepoint, add your operators, load in 1.3.
          2. Add 2 no-op operators to the topology and assign them the UID's of the operators you want to drop. Load 1.2 savepoint, create 1.3 SP, drop operators, reload.

          Show
          Zentol Chesnay Schepler added a comment - Why is the state incompatible? Is this tied to upgrading Flink or a change in the user code? At the moment i can only think of the following workarounds: 1. Remove the operators from the topology and load them in 1.2 while allowing non-restored State. Take a new savepoint, add your operators, load in 1.3. 2. Add 2 no-op operators to the topology and assign them the UID's of the operators you want to drop. Load 1.2 savepoint, create 1.3 SP, drop operators, reload.
          Hide
          gyfora Gyula Fora added a comment -

          They are incompatible due to some custom state backend code, that's my problem really

          I like option 2, but now I just went with adding an extra null check to the conversion step to avoid the nullpointer.

          Show
          gyfora Gyula Fora added a comment - They are incompatible due to some custom state backend code, that's my problem really I like option 2, but now I just went with adding an extra null check to the conversion step to avoid the nullpointer.
          Hide
          gyfora Gyula Fora added a comment -

          Thank you for the help Chesnay! Should we close this JIRA?

          Show
          gyfora Gyula Fora added a comment - Thank you for the help Chesnay! Should we close this JIRA?
          Hide
          Zentol Chesnay Schepler added a comment -

          I would keep it, at the very least for improving the error message.

          Show
          Zentol Chesnay Schepler added a comment - I would keep it, at the very least for improving the error message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/4083

          FLINK-6742 Improve savepoint migration failure error message

          This PR improves the error messages if the savepoint migration fails because a stateful task was removed or the parallelism of stateful operator was changed.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 6742

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4083.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4083


          commit 6f701d17e7eb62a21f5c9466ba9acf8696ec9ab8
          Author: zentol <chesnay@apache.org>
          Date: 2017-06-07T10:03:21Z

          FLINK-6742 Improve savepoint migration failure error message

          commit 38b07a7c4654a84b4370ed948be3ab76c28afad5
          Author: zentol <chesnay@apache.org>
          Date: 2017-06-07T10:03:57Z

          [hotfix] Improve readability in SPV2#convertToOperatorStateSavepointV2


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4083 FLINK-6742 Improve savepoint migration failure error message This PR improves the error messages if the savepoint migration fails because a stateful task was removed or the parallelism of stateful operator was changed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6742 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4083 commit 6f701d17e7eb62a21f5c9466ba9acf8696ec9ab8 Author: zentol <chesnay@apache.org> Date: 2017-06-07T10:03:21Z FLINK-6742 Improve savepoint migration failure error message commit 38b07a7c4654a84b4370ed948be3ab76c28afad5 Author: zentol <chesnay@apache.org> Date: 2017-06-07T10:03:57Z [hotfix] Improve readability in SPV2#convertToOperatorStateSavepointV2
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/4083

          Change looks good to merge!

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/4083 Change looks good to merge!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4083

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4083
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4083#discussion_r123895607

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java —
          @@ -168,10 +168,27 @@ public static Savepoint convertToOperatorStateSavepointV2(
          expandedToLegacyIds = true;
          }

          + if (jobVertex == null)

          { + throw new IllegalStateException( + "Could not find task for state with ID " + taskState.getJobVertexID() + ". " + + "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " + + "changed through removal of a stateful operator or modification of a chain containing a stateful " + + "operator."); + }

          +
          List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();

          for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {

          • SubtaskState subtaskState = taskState.getState(subtaskIndex);
            + SubtaskState subtaskState;
            + try {
            + subtaskState = taskState.getState(subtaskIndex);
              • End diff –

          Sorry for commenting late on this but I have had some major migration issues in the last few days
          I think we should explicitly compare parallelism instead of relying on the error:
          if (taskState.getStates().size() != jobVertex.getParallelism()) --> error

          Otherwise this will not fail on lower parallelism.

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/4083#discussion_r123895607 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java — @@ -168,10 +168,27 @@ public static Savepoint convertToOperatorStateSavepointV2( expandedToLegacyIds = true; } + if (jobVertex == null) { + throw new IllegalStateException( + "Could not find task for state with ID " + taskState.getJobVertexID() + ". " + + "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " + + "changed through removal of a stateful operator or modification of a chain containing a stateful " + + "operator."); + } + List<OperatorID> operatorIDs = jobVertex.getOperatorIDs(); for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) { SubtaskState subtaskState = taskState.getState(subtaskIndex); + SubtaskState subtaskState; + try { + subtaskState = taskState.getState(subtaskIndex); End diff – Sorry for commenting late on this but I have had some major migration issues in the last few days I think we should explicitly compare parallelism instead of relying on the error: if (taskState.getStates().size() != jobVertex.getParallelism()) --> error Otherwise this will not fail on lower parallelism.
          Hide
          Zentol Chesnay Schepler added a comment -

          1.3: 2bbfe0292c13d875b531a6c168ea78bfc7f21f0b
          1.4: 72b0ae069f8404a2f8a952e1a20004b9d340c445

          Show
          Zentol Chesnay Schepler added a comment - 1.3: 2bbfe0292c13d875b531a6c168ea78bfc7f21f0b 1.4: 72b0ae069f8404a2f8a952e1a20004b9d340c445
          Hide
          gyfora Gyula Fora added a comment -

          Ah sorry Chesnay I missed it on the 1.3 branch :/

          Show
          gyfora Gyula Fora added a comment - Ah sorry Chesnay I missed it on the 1.3 branch :/
          Hide
          Zentol Chesnay Schepler added a comment -

          I merged it to 1.3 after your comment, so it was ok

          I'll think about your suggestion regarding the parallelism tomorrow.

          Show
          Zentol Chesnay Schepler added a comment - I merged it to 1.3 after your comment, so it was ok I'll think about your suggestion regarding the parallelism tomorrow.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4083#discussion_r123972388

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java —
          @@ -168,10 +168,27 @@ public static Savepoint convertToOperatorStateSavepointV2(
          expandedToLegacyIds = true;
          }

          + if (jobVertex == null)

          { + throw new IllegalStateException( + "Could not find task for state with ID " + taskState.getJobVertexID() + ". " + + "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " + + "changed through removal of a stateful operator or modification of a chain containing a stateful " + + "operator."); + }

          +
          List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();

          for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {

          • SubtaskState subtaskState = taskState.getState(subtaskIndex);
            + SubtaskState subtaskState;
            + try {
            + subtaskState = taskState.getState(subtaskIndex);
              • End diff –

          yes that's true, I'll create a follow-up PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4083#discussion_r123972388 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java — @@ -168,10 +168,27 @@ public static Savepoint convertToOperatorStateSavepointV2( expandedToLegacyIds = true; } + if (jobVertex == null) { + throw new IllegalStateException( + "Could not find task for state with ID " + taskState.getJobVertexID() + ". " + + "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " + + "changed through removal of a stateful operator or modification of a chain containing a stateful " + + "operator."); + } + List<OperatorID> operatorIDs = jobVertex.getOperatorIDs(); for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) { SubtaskState subtaskState = taskState.getState(subtaskIndex); + SubtaskState subtaskState; + try { + subtaskState = taskState.getState(subtaskIndex); End diff – yes that's true, I'll create a follow-up PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/4185

          FLINK-6742 Add eager checks for parallelism/chain-length change

          This is a follow-up to #4083 that adds checks to the savepoint migration for any change in parallelism or chain length.

          Should be merged for 1.3 and 1.4.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 6742_2

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4185.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4185


          commit 7f95bc69e4aced1ca1d89c1cbc2067150f5f583b
          Author: zentol <chesnay@apache.org>
          Date: 2017-06-26T11:38:54Z

          FLINK-6742 Add eager checks for parallelism/chain-length change


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/4185 FLINK-6742 Add eager checks for parallelism/chain-length change This is a follow-up to #4083 that adds checks to the savepoint migration for any change in parallelism or chain length. Should be merged for 1.3 and 1.4. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 6742_2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4185.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4185 commit 7f95bc69e4aced1ca1d89c1cbc2067150f5f583b Author: zentol <chesnay@apache.org> Date: 2017-06-26T11:38:54Z FLINK-6742 Add eager checks for parallelism/chain-length change
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4185

          Thank you for taking a look, merging this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4185 Thank you for taking a look, merging this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4185

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4185
          Hide
          Zentol Chesnay Schepler added a comment -

          Additional checks for parallelism and chain-length changes added in

          1.3: c65317dc9619f2a5459c39278b2109137e94d79f
          1.4: cd45825958297a0da157f850c3c63e93dbadfb7a

          Show
          Zentol Chesnay Schepler added a comment - Additional checks for parallelism and chain-length changes added in 1.3: c65317dc9619f2a5459c39278b2109137e94d79f 1.4: cd45825958297a0da157f850c3c63e93dbadfb7a

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              gyfora Gyula Fora
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development