Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6964

Fix recovery for incremental checkpoints in StandaloneCompletedCheckpointStore

    Details

      Description

      StandaloneCompletedCheckpointStore does not register shared states ion resume. However, for externalized checkpoints, it register the checkpoint from which it resumed. This checkpoint gets added to the completed checkpoint store as part of resume.

        Issue Links

          Activity

          Hide
          srichter Stefan Richter added a comment -

          Yeah, originally I had just a string there and switched UUID to make it more typesafe. This is trivial to fix so if it helps people that try to migrate via externalized checkpoints, why not. I would extend the fix to catch Exception in general, because more parsing problems than the one leading to IllegalArgumentException can happen along the way. Thanks for reporting this

          Show
          srichter Stefan Richter added a comment - Yeah, originally I had just a string there and switched UUID to make it more typesafe. This is trivial to fix so if it helps people that try to migrate via externalized checkpoints, why not. I would extend the fix to catch Exception in general, because more parsing problems than the one leading to IllegalArgumentException can happen along the way. Thanks for reporting this
          Show
          gyfora Gyula Fora added a comment - Something like: https://github.com/gyfora/flink/commit/61dcde87fa3efd9c717d2eb4123fa27f09b2250c
          Hide
          gyfora Gyula Fora added a comment -

          I wonder if creating a new random backend UID for these incremental handles would just work.

          Show
          gyfora Gyula Fora added a comment - I wonder if creating a new random backend UID for these incremental handles would just work.
          Hide
          gyfora Gyula Fora added a comment -

          Stefan Richter I am trying to restore with a checkpoint taken before this fix and I a get this error:

          Caused by: java.lang.IllegalArgumentException: Invalid UUID string: WindowOperator_26_0
          at java.util.UUID.fromString(UUID.java:194)
          at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserializeKeyedStateHandle(SavepointV2Serializer.java:395)
          at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserializeSubtaskState(SavepointV2Serializer.java:290)
          at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserialize(SavepointV2Serializer.java:173)
          at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserialize(SavepointV2Serializer.java:67)
          at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:277)
          at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:69)
          at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1132)

          Have you seen this before?

          Show
          gyfora Gyula Fora added a comment - Stefan Richter I am trying to restore with a checkpoint taken before this fix and I a get this error: Caused by: java.lang.IllegalArgumentException: Invalid UUID string: WindowOperator_26_0 at java.util.UUID.fromString(UUID.java:194) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserializeKeyedStateHandle(SavepointV2Serializer.java:395) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserializeSubtaskState(SavepointV2Serializer.java:290) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserialize(SavepointV2Serializer.java:173) at org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer.deserialize(SavepointV2Serializer.java:67) at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepointWithHandle(SavepointStore.java:277) at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:69) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1132) Have you seen this before?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented on release-1.3 in
          4b003eafdeccf0179511f3cd4a6f3f5bc63cf921

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented on release-1.3 in 4b003eafdeccf0179511f3cd4a6f3f5bc63cf921
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Please only close once the fix is merged both on release-1.3 and master, so that we can accurately track the open blocker issues for Flink 1.3.2.

          Show
          aljoscha Aljoscha Krettek added a comment - Please only close once the fix is merged both on release-1.3 and master, so that we can accurately track the open blocker issues for Flink 1.3.2.
          Hide
          srichter Stefan Richter added a comment -

          Merged in 8cff17fcc9.

          Show
          srichter Stefan Richter added a comment - Merged in 8cff17fcc9.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/4192

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/4192
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r127453343

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java —
          @@ -187,7 +188,7 @@ public void testSharedStateDeRegistration() throws Exception {

          private static IncrementalKeyedStateHandle create(Random rnd) {
          return new IncrementalKeyedStateHandle(

          • "test",
            + UUID.nameUUIDFromBytes("test".getBytes()),
              • End diff –

          Alright I already fixed it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r127453343 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java — @@ -187,7 +188,7 @@ public void testSharedStateDeRegistration() throws Exception { private static IncrementalKeyedStateHandle create(Random rnd) { return new IncrementalKeyedStateHandle( "test", + UUID.nameUUIDFromBytes("test".getBytes()), End diff – Alright I already fixed it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r127453095

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java —
          @@ -187,7 +188,7 @@ public void testSharedStateDeRegistration() throws Exception {

          private static IncrementalKeyedStateHandle create(Random rnd) {
          return new IncrementalKeyedStateHandle(

          • "test",
            + UUID.nameUUIDFromBytes("test".getBytes()),
              • End diff –

          Let's set the right example for everyone that "learns by reading" other code.
          Also, we have seen cases where tests were unstable when executed on an Italian Laptop because of different default locales and charsets

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r127453095 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java — @@ -187,7 +188,7 @@ public void testSharedStateDeRegistration() throws Exception { private static IncrementalKeyedStateHandle create(Random rnd) { return new IncrementalKeyedStateHandle( "test", + UUID.nameUUIDFromBytes("test".getBytes()), End diff – Let's set the right example for everyone that "learns by reading" other code. Also, we have seen cases where tests were unstable when executed on an Italian Laptop because of different default locales and charsets
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/4192

          Thanks @aljoscha and @StephanEwen for the thorough review I will merge this once Travis is green.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4192 Thanks @aljoscha and @StephanEwen for the thorough review I will merge this once Travis is green.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r127431267

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java —
          @@ -187,7 +188,7 @@ public void testSharedStateDeRegistration() throws Exception {

          private static IncrementalKeyedStateHandle create(Random rnd) {
          return new IncrementalKeyedStateHandle(

          • "test",
            + UUID.nameUUIDFromBytes("test".getBytes()),
              • End diff –

          I know that this is true for real code, but for the test, it shouldn't matter?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r127431267 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java — @@ -187,7 +188,7 @@ public void testSharedStateDeRegistration() throws Exception { private static IncrementalKeyedStateHandle create(Random rnd) { return new IncrementalKeyedStateHandle( "test", + UUID.nameUUIDFromBytes("test".getBytes()), End diff – I know that this is true for real code, but for the test, it shouldn't matter?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r127407228

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java —
          @@ -187,7 +188,7 @@ public void testSharedStateDeRegistration() throws Exception {

          private static IncrementalKeyedStateHandle create(Random rnd) {
          return new IncrementalKeyedStateHandle(

          • "test",
            + UUID.nameUUIDFromBytes("test".getBytes()),
              • End diff –

          We need to always pass an explicit charset when converting between strings/bytes:
          ```java
          "test".getBytes(StandardCharsets.UTF_8);
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r127407228 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java — @@ -187,7 +188,7 @@ public void testSharedStateDeRegistration() throws Exception { private static IncrementalKeyedStateHandle create(Random rnd) { return new IncrementalKeyedStateHandle( "test", + UUID.nameUUIDFromBytes("test".getBytes()), End diff – We need to always pass an explicit charset when converting between strings/bytes: ```java "test".getBytes(StandardCharsets.UTF_8); ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r127406715

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java —
          @@ -391,7 +392,7 @@ private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis)
          Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);

          return new IncrementalKeyedStateHandle(

          • operatorId,
            + UUID.fromString(operatorId),
              • End diff –

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r127406715 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java — @@ -391,7 +392,7 @@ private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis); return new IncrementalKeyedStateHandle( operatorId, + UUID.fromString(operatorId), End diff – +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r127407687

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java —
          @@ -57,9 +59,9 @@
          private static final long serialVersionUID = -8328808513197388231L;

          /**

          • * The operator instance identifier for this handle
            + * UUID to identify the backend which created this state handle. This is used to
              • End diff –

          The sentence here is incomplete

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r127407687 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java — @@ -57,9 +59,9 @@ private static final long serialVersionUID = -8328808513197388231L; /** * The operator instance identifier for this handle + * UUID to identify the backend which created this state handle. This is used to End diff – The sentence here is incomplete
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4192

          This looks super nice now! I tried it and disabled each of the two fixes (registering shared state after restore and backend UUIDs) and reverting either made the new integration test fail.

          Let's give @StephanEwen some time to look over this as well but from my side this LGTM now.

          When merging, the changes to `SharedStateRegistry` and `ZooKeeperCompletedCheckpointStoreTest` should be factored out into a separate commit. They are good changes but unrelated to the Jira issue and bug that this fixes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4192 This looks super nice now! I tried it and disabled each of the two fixes (registering shared state after restore and backend UUIDs) and reverting either made the new integration test fail. Let's give @StephanEwen some time to look over this as well but from my side this LGTM now. When merging, the changes to `SharedStateRegistry` and `ZooKeeperCompletedCheckpointStoreTest` should be factored out into a separate commit. They are good changes but unrelated to the Jira issue and bug that this fixes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/4192

          @aljoscha @StephanEwen I have added an IT case for this problem. It is testing a sequence externalized checkpoint recoveries, using full and incremental checkpoints on standalone and zookeeper completed checkpoint store.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4192 @aljoscha @StephanEwen I have added an IT case for this problem. It is testing a sequence externalized checkpoint recoveries, using full and incremental checkpoints on standalone and zookeeper completed checkpoint store.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r126899001

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java —
          @@ -160,9 +162,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
          stateStorage,
          Executors.directExecutor());

          • SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
            + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
            zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);

          + verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
          — End diff –

          Ok, then we have to do that before we can merge this with confidence.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r126899001 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java — @@ -160,9 +162,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable { stateStorage, Executors.directExecutor()); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry); + verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry); — End diff – Ok, then we have to do that before we can merge this with confidence.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r126725696

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java —
          @@ -391,7 +392,7 @@ private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis)
          Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);

          return new IncrementalKeyedStateHandle(

          • operatorId,
            + UUID.fromString(operatorId),
              • End diff –

          That is correct.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r126725696 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java — @@ -391,7 +392,7 @@ private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis); return new IncrementalKeyedStateHandle( operatorId, + UUID.fromString(operatorId), End diff – That is correct.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r126725439

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java —
          @@ -160,9 +162,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
          stateStorage,
          Executors.directExecutor());

          • SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
            + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
            zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);

          + verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
          — End diff –

          The behaviour that is tested here is not strongly connected to the bug, I just noticed it might be also worth checking to avoid future problems. StandaloneCompletedCheckpointStore is not expected to do this, so there is no need for a this particular test.

          The question for a general test is valid. However, I noticed that externalized checkpoints in general are not tested at all, so this is not just a small addition but a whole new test (including some required test methods to trigger and restore from externalized checkpoints in the JM) that would almost justify a new planning task and PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r126725439 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java — @@ -160,9 +162,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable { stateStorage, Executors.directExecutor()); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry); + verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry); — End diff – The behaviour that is tested here is not strongly connected to the bug, I just noticed it might be also worth checking to avoid future problems. StandaloneCompletedCheckpointStore is not expected to do this, so there is no need for a this particular test. The question for a general test is valid. However, I noticed that externalized checkpoints in general are not tested at all, so this is not just a small addition but a whole new test (including some required test methods to trigger and restore from externalized checkpoints in the JM) that would almost justify a new planning task and PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r126721791

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java —
          @@ -160,9 +162,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
          stateStorage,
          Executors.directExecutor());

          • SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
            + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
            zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);

          + verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
          — End diff –

          Was the bug in the zookeeper checkpoint store or the standalone checkpoint store? Should there also be a test for the standalone checkpoint store to verify that it works now?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r126721791 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java — @@ -160,9 +162,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable { stateStorage, Executors.directExecutor()); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry); + verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry); — End diff – Was the bug in the zookeeper checkpoint store or the standalone checkpoint store? Should there also be a test for the standalone checkpoint store to verify that it works now?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4192#discussion_r126720581

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java —
          @@ -391,7 +392,7 @@ private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis)
          Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);

          return new IncrementalKeyedStateHandle(

          • operatorId,
            + UUID.fromString(operatorId),
              • End diff –

          This is not an operator ID anymore, right? Probably the local variable name should change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4192#discussion_r126720581 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java — @@ -391,7 +392,7 @@ private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis); return new IncrementalKeyedStateHandle( operatorId, + UUID.fromString(operatorId), End diff – This is not an operator ID anymore, right? Probably the local variable name should change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/4192

          FLINK-6964 [checkpoint] Fix externalized incremental checkpoints fo…

          This PR fixes a problem that came up with incremental checkpoints and the `StandaloneCompletedCheckpointStore`. After restoring from an externalized incremental checkpoint, the following incremental checkpoints failed to pickup the existing incremental states in the registry that have been restored from the checkpoint. As a second problem, the used operator identifier in the backend could change across restores.

          This PR:

          • registeres restored state from externalized checkpoints/savepoint with the `SharedStateRegistry`
          • introduced artifical backend identifiers to isolate files from differen backends with the same file name in the {SharedStateRegistry}

            . In case of a (non-rescaled) restore, the identifier is picked up again from the previous state.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink fixRecoverStandaloneCompeltedCheckpointStore

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4192.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4192



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/4192 FLINK-6964 [checkpoint] Fix externalized incremental checkpoints fo… This PR fixes a problem that came up with incremental checkpoints and the `StandaloneCompletedCheckpointStore`. After restoring from an externalized incremental checkpoint, the following incremental checkpoints failed to pickup the existing incremental states in the registry that have been restored from the checkpoint. As a second problem, the used operator identifier in the backend could change across restores. This PR: registeres restored state from externalized checkpoints/savepoint with the `SharedStateRegistry` introduced artifical backend identifiers to isolate files from differen backends with the same file name in the {SharedStateRegistry} . In case of a (non-rescaled) restore, the identifier is picked up again from the previous state. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink fixRecoverStandaloneCompeltedCheckpointStore Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4192.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4192
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          Stefan Richter Looks good from this end, all tests passed. Thanks!

          Show
          cresny@gmail.com Cliff Resnick added a comment - Stefan Richter Looks good from this end, all tests passed. Thanks!
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          Stefan Richter So far, looks good! I need to leave early today but I'll hit it a few more times this evening just to be sure.

          Show
          cresny@gmail.com Cliff Resnick added a comment - Stefan Richter So far, looks good! I need to leave early today but I'll hit it a few more times this evening just to be sure.
          Hide
          srichter Stefan Richter added a comment -

          Cliff Resnick I think I have figured out the second problem. Updated my branch.

          Show
          srichter Stefan Richter added a comment - Cliff Resnick I think I have figured out the second problem. Updated my branch.
          Hide
          srichter Stefan Richter added a comment -

          Those placeholder messages look ok and expected, that happens when the placeholder is exchanged with the original state handle. The exception on unregistration looks promising. Will dig into this on monday. Thanks again for the logs!

          Show
          srichter Stefan Richter added a comment - Those placeholder messages look ok and expected, that happens when the placeholder is exchanged with the original state handle. The exception on unregistration looks promising. Will dig into this on monday. Thanks again for the logs!
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          looks likes it's still trying to register a Placeholder?

          Show
          cresny@gmail.com Cliff Resnick added a comment - looks likes it's still trying to register a Placeholder?
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          I ran with your newer precondition. It actually succeeded once, but failed the next two runs, hung on org.apache.flink.runtime.state.SharedStateRegistry - Attempt to register for key WindowOperator...

          I tried with just a a single slot, but that also hung. The log above represents the hang condition.

          All the above logged here https://gist.github.com/cresny/0e109f843730b64d3a330f8fb06bb8a6

          The good news is there was an exception around state registry

          Show
          cresny@gmail.com Cliff Resnick added a comment - I ran with your newer precondition. It actually succeeded once, but failed the next two runs, hung on org.apache.flink.runtime.state.SharedStateRegistry - Attempt to register for key WindowOperator... I tried with just a a single slot, but that also hung. The log above represents the hang condition. All the above logged here https://gist.github.com/cresny/0e109f843730b64d3a330f8fb06bb8a6 The good news is there was an exception around state registry
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          ok, will try that. meanwhile here is a run (and hang) from last push.

          https://gist.github.com/cresny/8d0d24b1bd72031a515bd9a3822da189

          Show
          cresny@gmail.com Cliff Resnick added a comment - ok, will try that. meanwhile here is a run (and hang) from last push. https://gist.github.com/cresny/8d0d24b1bd72031a515bd9a3822da189
          Hide
          srichter Stefan Richter added a comment -

          Alright, it wasn't the precondition. I have pushed the version that runs as expected in my local testing. Log statements are included.

          Show
          srichter Stefan Richter added a comment - Alright, it wasn't the precondition. I have pushed the version that runs as expected in my local testing. Log statements are included.
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          ok I'll wait on your push

          Show
          cresny@gmail.com Cliff Resnick added a comment - ok I'll wait on your push
          Hide
          srichter Stefan Richter added a comment -

          I think I already found the problem. A stupid mistake: the precondition that I introduced in the fix was inverted

          Show
          srichter Stefan Richter added a comment - I think I already found the problem. A stupid mistake: the precondition that I introduced in the fix was inverted
          Hide
          srichter Stefan Richter added a comment -

          In particular, JobManager logs are valuable.

          Show
          srichter Stefan Richter added a comment - In particular, JobManager logs are valuable.
          Hide
          srichter Stefan Richter added a comment -

          I suggest to log Flink runtime at INFO level, thanks!

          Show
          srichter Stefan Richter added a comment - I suggest to log Flink runtime at INFO level, thanks!
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          Ha! I just started running. ok, will merge and rebuild
          The logging scopes above are for a separate network appender, so I tend to keep it narrow. Should I broaden it to all of flink.runtime?

          Show
          cresny@gmail.com Cliff Resnick added a comment - Ha! I just started running. ok, will merge and rebuild The logging scopes above are for a separate network appender, so I tend to keep it narrow. Should I broaden it to all of flink.runtime?
          Hide
          srichter Stefan Richter added a comment -

          I have added a commit to my branch that introduces more logging. The log scope looks good, do you log all remaining packages on INFO?

          Show
          srichter Stefan Richter added a comment - I have added a commit to my branch that introduces more logging. The log scope looks good, do you log all remaining packages on INFO?
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          I'll merge and rerun. This is what I have for log scope. Should I add anything?

          org.apache.flink.contrib.streaming.state=TRACE
          org.apache.flink.runtime.checkpoint=TRACE
          org.apache.flink.runtime.state=TRACE
          
          Show
          cresny@gmail.com Cliff Resnick added a comment - I'll merge and rerun. This is what I have for log scope. Should I add anything? org.apache.flink.contrib.streaming.state=TRACE org.apache.flink.runtime.checkpoint=TRACE org.apache.flink.runtime.state=TRACE
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          Stefan Richter By hanging I mean that the checkpoint, though fully acknowledged, never completes. Looking at the UI I see 100% and a spinning arrow until the checkpoint time expires, apparently without an exception being thrown. I did not merge my added logs into your branch because, from what you described, the issue was with the

          StandaloneCompletedCheckpointStore

          , which I never added logging to anyway. However if the code as-is in your branch has sufficient logging I can reproduce the issue and create a gist.

          Show
          cresny@gmail.com Cliff Resnick added a comment - Stefan Richter By hanging I mean that the checkpoint, though fully acknowledged, never completes. Looking at the UI I see 100% and a spinning arrow until the checkpoint time expires, apparently without an exception being thrown. I did not merge my added logs into your branch because, from what you described, the issue was with the StandaloneCompletedCheckpointStore , which I never added logging to anyway. However if the code as-is in your branch has sufficient logging I can reproduce the issue and create a gist.
          Hide
          srichter Stefan Richter added a comment -

          Besides potential exceptions, the same logging information as you provided last time (e.g. interactions with the shared registry) could be helpful.

          Show
          srichter Stefan Richter added a comment - Besides potential exceptions, the same logging information as you provided last time (e.g. interactions with the shared registry) could be helpful.
          Hide
          srichter Stefan Richter added a comment - - edited

          Cliff Resnick Hanging means that you cannot find any exceptions in the log that indicate a cause?

          Show
          srichter Stefan Richter added a comment - - edited Cliff Resnick Hanging means that you cannot find any exceptions in the log that indicate a cause?
          Hide
          cresny@gmail.com Cliff Resnick added a comment -

          Stefan Richter I tried your fix. After resuming from a checkpoint, the first subsequent checkpoint got to 100% and hung, then expired several minutes later. The second one repeated this. If logging will help you perhaps if you can add some TRACE level logs and let me know the scopes, and I'll create a gist.

          Show
          cresny@gmail.com Cliff Resnick added a comment - Stefan Richter I tried your fix. After resuming from a checkpoint, the first subsequent checkpoint got to 100% and hung, then expired several minutes later. The second one repeated this. If logging will help you perhaps if you can add some TRACE level logs and let me know the scopes, and I'll create a gist.

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development