Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7268

Zookeeper Checkpoint Store interacting with Incremental State Handles can lead to loss of handles

    Details

      Description

      Release testing for Flink 1.3.2 has shown that this combination of features leads to this errors when using a very low restart delay:

      java.lang.IllegalStateException: Could not initialize keyed state backend.
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.FileNotFoundException: Item not found: aljoscha/state-machine-checkpoints-2/f26e2b4c6891f2a9e0c5e4ba014733c3/chk-3/b246db8c-4f25-483a-b1fc-234f4319004d
      	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.getFileNotFoundException(GoogleCloudStorageExceptions.java:42)
      	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:551)
      	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:322)
      	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:121)
      	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1076)
      	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
      	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
      	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
      	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1281)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1468)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1324)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1503)
      	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:970)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
      	... 6 more
      

      When this occurs the job is stuck in a restart loop. The problem (according to Stefan Richter) seems to be that removal of pending checkpoints from Zookeeper happens asynchronously and those request can go though when the Job has already restarted.

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          Attaching full logs from a failed run.

          Show
          aljoscha Aljoscha Krettek added a comment - Attaching full logs from a failed run.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/4410

          FLINK-7268 [checkpoints] Scope SharedStateRegistry objects per (re)start

            1. What is the purpose of the change

          This PR fixes FLINK-7268. The problem was that `ZookeeperCompletedCheckpointStore` deletes checkpoints asynchronously. When this happens parallel to a restart, it could happen that the async delete performed shared state de-registration.

          Before this PR, the old `SharedStateRegistry` was kept after restart and the counts where updated from the completed checkpoint store. In the described race, a checkpoint that has a pending delete will not contribute to the new count, but it can still decrement the count once the shared state is unregistered in the async deletion thread. This can accidentally drop counts below 1 and lead to data loss.

          The core idea behind the PR is to scope the `SharedStateRegistry` per (re-)start, so that old pending deletes cannot influence the current count.

          `SharedStateRegistry` is now created via a factory that is passed into the `CheckpointCoordinator` to simplify testing.

          The PR also introduces additional tests and improves the debug/trace logging of incremental checkpointing.

            1. Verifying this change

          This change added tests and can be verified as follows:

          Run a job with keyed state, using incremental checkpoints and HA mode. Kill TMs to trigger recovery. After a couple of attemts, the problematic condition should be triggered, leading to an infinite recovery loop due to `FileNotFoundException`.

          Additional tests:

          `HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase`
          `CheckpointCoordinatortest::testSharedStateRegistrationOnRestore``
          `IncrementalKeyedStateHandleTest::testSharedStateReRegistration`

            1. Does this pull request potentially affect one of the following parts:
          • Dependencies (does it add or upgrade a dependency): (no)
          • The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
          • The serializers: (no)
          • The runtime per-record code paths (performance sensitive): (no)
          • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (YES)
            1. Documentation
          • Does this pull request introduce a new feature? (no)

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink ImprovedIncreemntalCPDebug

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4410.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4410


          commit 2ed4f6b28c2fda674f1319f2a3678b2a231988ac
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-06-26T16:07:59Z

          FLINK-7213 Introduce state management by OperatorID in TaskManager

          commit a50eda8602d2034753b42413d23842a888e73611
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-07-11T15:10:03Z

          FLINK-7213 Introduce TaskStateSnapshot to unify TaskStateHandles and SubtaskState

          commit bce928fcfec73ff7584840ae7eb6b31fb727604f
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-07-25T10:14:03Z

          review comments zentol

          commit 363f0ee18e06affe95c30095ed229ca8dfd47801
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-07-26T11:31:30Z

          review comments zentol part 2

          commit 98e657ea02c17d972391dd2360c287a00f27231e
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-07-26T11:31:47Z

          review comments zentol part 2

          commit 4460d3412f3ffc2e5496b65949751a31dae8a01a
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-07-25T10:04:16Z

          FLINK-7268 [checkpoints] Scope SharedStateRegistry objects per (re)start


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/4410 FLINK-7268 [checkpoints] Scope SharedStateRegistry objects per (re)start What is the purpose of the change This PR fixes FLINK-7268 . The problem was that `ZookeeperCompletedCheckpointStore` deletes checkpoints asynchronously. When this happens parallel to a restart, it could happen that the async delete performed shared state de-registration. Before this PR, the old `SharedStateRegistry` was kept after restart and the counts where updated from the completed checkpoint store. In the described race, a checkpoint that has a pending delete will not contribute to the new count, but it can still decrement the count once the shared state is unregistered in the async deletion thread. This can accidentally drop counts below 1 and lead to data loss. The core idea behind the PR is to scope the `SharedStateRegistry` per (re-)start, so that old pending deletes cannot influence the current count. `SharedStateRegistry` is now created via a factory that is passed into the `CheckpointCoordinator` to simplify testing. The PR also introduces additional tests and improves the debug/trace logging of incremental checkpointing. Verifying this change This change added tests and can be verified as follows: Run a job with keyed state, using incremental checkpoints and HA mode. Kill TMs to trigger recovery. After a couple of attemts, the problematic condition should be triggered, leading to an infinite recovery loop due to `FileNotFoundException`. Additional tests: `HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase` `CheckpointCoordinatortest::testSharedStateRegistrationOnRestore`` `IncrementalKeyedStateHandleTest::testSharedStateReRegistration` Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (no) The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) The serializers: (no) The runtime per-record code paths (performance sensitive): (no) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (YES) Documentation Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink ImprovedIncreemntalCPDebug Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4410.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4410 commit 2ed4f6b28c2fda674f1319f2a3678b2a231988ac Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-06-26T16:07:59Z FLINK-7213 Introduce state management by OperatorID in TaskManager commit a50eda8602d2034753b42413d23842a888e73611 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-07-11T15:10:03Z FLINK-7213 Introduce TaskStateSnapshot to unify TaskStateHandles and SubtaskState commit bce928fcfec73ff7584840ae7eb6b31fb727604f Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-07-25T10:14:03Z review comments zentol commit 363f0ee18e06affe95c30095ed229ca8dfd47801 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-07-26T11:31:30Z review comments zentol part 2 commit 98e657ea02c17d972391dd2360c287a00f27231e Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-07-26T11:31:47Z review comments zentol part 2 commit 4460d3412f3ffc2e5496b65949751a31dae8a01a Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-07-25T10:04:16Z FLINK-7268 [checkpoints] Scope SharedStateRegistry objects per (re)start
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/4410

          CC @aljoscha

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4410 CC @aljoscha
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4410

          I also reviewed `CheckpointCoordinatorTest.testSharedStateRegistrationOnRestore()`. Looks good 👍 The rest is the same as in #4410 so let's wait until #4353 is merged and then we can merge this one as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4410 I also reviewed `CheckpointCoordinatorTest.testSharedStateRegistrationOnRestore()`. Looks good 👍 The rest is the same as in #4410 so let's wait until #4353 is merged and then we can merge this one as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4410

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4410
          Hide
          srichter Stefan Richter added a comment -

          Merged in 91a4b27617 (1.4) and 09caa9ffdc (1.3)

          Show
          srichter Stefan Richter added a comment - Merged in 91a4b27617 (1.4) and 09caa9ffdc (1.3)

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development