Details

      Description

      Concurrent checkpoints could access `materializedSstFiles` in the `RocksDBStateBackend` concurrently. This should be avoided.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shixiaogang opened a pull request:

          https://github.com/apache/flink/pull/3859

          FLINK-6504 FLINK-6467 [checkpoints] Add needed synchronization for RocksDBIncrementalSnapshotOperation

          This pull request adds missing synchronization for the access to the following variables:
          1. `materializedSstFiles` in `RocksDBKeyedStateBackend`: The variable may be accessed simultaneously by the processing thread (read) and the materialization threads (write). Now we use `asynchronousSnapshotLock` to prevent concurrent access.
          2. `newSstFiles`, `oldSstFiles` and `metaStateHandle` in `RocksDBIncrementalSnapshotOperation`: These variables may be accessed by both the cancel thread and the materialization thread. Though the materialization thread are supposed to be stopped when `releaseResources()` is executed, we add synchronization here to prevent potential conflicts.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shixiaogang/flink flink-6504

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3859.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3859


          commit 1c1a08e0f7db5ec8663a46c495468583983c7291
          Author: xiaogang.sxg <xiaogang.sxg@alibaba-inc.com>
          Date: 2017-05-10T01:55:34Z

          Add needed synchronization for RocksDBIncrementalSnapshotOperation


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3859 FLINK-6504 FLINK-6467 [checkpoints] Add needed synchronization for RocksDBIncrementalSnapshotOperation This pull request adds missing synchronization for the access to the following variables: 1. `materializedSstFiles` in `RocksDBKeyedStateBackend`: The variable may be accessed simultaneously by the processing thread (read) and the materialization threads (write). Now we use `asynchronousSnapshotLock` to prevent concurrent access. 2. `newSstFiles`, `oldSstFiles` and `metaStateHandle` in `RocksDBIncrementalSnapshotOperation`: These variables may be accessed by both the cancel thread and the materialization thread. Though the materialization thread are supposed to be stopped when `releaseResources()` is executed, we add synchronization here to prevent potential conflicts. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6504 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3859.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3859 commit 1c1a08e0f7db5ec8663a46c495468583983c7291 Author: xiaogang.sxg <xiaogang.sxg@alibaba-inc.com> Date: 2017-05-10T01:55:34Z Add needed synchronization for RocksDBIncrementalSnapshotOperation
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3859#discussion_r115714053

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -911,9 +915,11 @@ void releaseResources(boolean canceled) {
          if (canceled) {
          List<StateObject> statesToDiscard = new ArrayList<>();
          — End diff –

          We could also preallocate the size of `statesToDiscard` as the sum all things we are planning to insert in the next step.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3859#discussion_r115714053 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -911,9 +915,11 @@ void releaseResources(boolean canceled) { if (canceled) { List<StateObject> statesToDiscard = new ArrayList<>(); — End diff – We could also preallocate the size of `statesToDiscard` as the sum all things we are planning to insert in the next step.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3859#discussion_r115719033

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -911,9 +915,11 @@ void releaseResources(boolean canceled) {
          if (canceled) {
          List<StateObject> statesToDiscard = new ArrayList<>();

          • statesToDiscard.add(metaStateHandle);
          • statesToDiscard.addAll(miscFiles.values());
          • statesToDiscard.addAll(newSstFiles.values());
            + synchronized (this) {
              • End diff –

          Having a second look at this, the suggestion from the jira is not going far enough.

          We actually must ensure that all checkpointing efforts (e.g. writing to HDFS) have already come to an end, so that no new state handles can be created anymore at this point. Otherwise, even if we synchronize, the parallel thread could still create a new state handle after the getting back the lock.

          However, once this is ensured, also the need for synchronization goes away. So I suggest that we rather make sure that cleanup is only performed after all checkpointing ended, and that no more checkpointing through this object can run after the cleanup.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3859#discussion_r115719033 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -911,9 +915,11 @@ void releaseResources(boolean canceled) { if (canceled) { List<StateObject> statesToDiscard = new ArrayList<>(); statesToDiscard.add(metaStateHandle); statesToDiscard.addAll(miscFiles.values()); statesToDiscard.addAll(newSstFiles.values()); + synchronized (this) { End diff – Having a second look at this, the suggestion from the jira is not going far enough. We actually must ensure that all checkpointing efforts (e.g. writing to HDFS) have already come to an end, so that no new state handles can be created anymore at this point. Otherwise, even if we synchronize, the parallel thread could still create a new state handle after the getting back the lock. However, once this is ensured, also the need for synchronization goes away. So I suggest that we rather make sure that cleanup is only performed after all checkpointing ended, and that no more checkpointing through this object can run after the cleanup.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3859#discussion_r115721158

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -880,7 +882,9 @@ KeyedStateHandle materializeSnapshot() throws Exception {
          sstFiles.putAll(newSstFiles);
          sstFiles.putAll(oldSstFiles);

          • stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
            + synchronized (stateBackend.asyncSnapshotLock) {
              • End diff –

          I wonder if this is enough. As soon as we assume that there can be parallel incremental snapshots (let's say s1 and s2) what could happen is the following race:

          • s1 completes and acknowledges to checkpoint coordinator, but the notifyCheckpointComplete did not yet arrive.
          • s2 runs and checks in `materializedSstFiles`, where it can not find anything from s1, yet.
          • s2 will perceive some files as new, which s1 has already registered in the shared state.
          • notifyCheckpointComplete from s1 arrives.
          • s2 acknowledges. Now, in the `SharedStateRegistry`, the handles from s2 will try to register some sst files as new, which have been registered by s1 before (without s1 noticing). On the checkpoint coordinator, the `RocksDBIncrementalStateHandle` will fail int the precondition check `Preconditions.checkState(referenceCount == 1);` as soon as it tries to register it's "new" sst files with `SharedStateRegistry`.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3859#discussion_r115721158 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -880,7 +882,9 @@ KeyedStateHandle materializeSnapshot() throws Exception { sstFiles.putAll(newSstFiles); sstFiles.putAll(oldSstFiles); stateBackend.materializedSstFiles.put(checkpointId, sstFiles); + synchronized (stateBackend.asyncSnapshotLock) { End diff – I wonder if this is enough. As soon as we assume that there can be parallel incremental snapshots (let's say s1 and s2) what could happen is the following race: s1 completes and acknowledges to checkpoint coordinator, but the notifyCheckpointComplete did not yet arrive. s2 runs and checks in `materializedSstFiles`, where it can not find anything from s1, yet. s2 will perceive some files as new, which s1 has already registered in the shared state. notifyCheckpointComplete from s1 arrives. s2 acknowledges. Now, in the `SharedStateRegistry`, the handles from s2 will try to register some sst files as new, which have been registered by s1 before (without s1 noticing). On the checkpoint coordinator, the `RocksDBIncrementalStateHandle` will fail int the precondition check `Preconditions.checkState(referenceCount == 1);` as soon as it tries to register it's "new" sst files with `SharedStateRegistry`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3859#discussion_r116148624

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -911,9 +915,11 @@ void releaseResources(boolean canceled) {
          if (canceled) {
          List<StateObject> statesToDiscard = new ArrayList<>();

          • statesToDiscard.add(metaStateHandle);
          • statesToDiscard.addAll(miscFiles.values());
          • statesToDiscard.addAll(newSstFiles.values());
            + synchronized (this) {
              • End diff –

          Yes, i agree. The key point here is to make sure the stopping of the materialization thread. Synchronization does little help here. So i prefer to remove synchronization here, what do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3859#discussion_r116148624 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -911,9 +915,11 @@ void releaseResources(boolean canceled) { if (canceled) { List<StateObject> statesToDiscard = new ArrayList<>(); statesToDiscard.add(metaStateHandle); statesToDiscard.addAll(miscFiles.values()); statesToDiscard.addAll(newSstFiles.values()); + synchronized (this) { End diff – Yes, i agree. The key point here is to make sure the stopping of the materialization thread. Synchronization does little help here. So i prefer to remove synchronization here, what do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on the issue:

          https://github.com/apache/flink/pull/3859

          I noticed that FLINK-6504 is also fixed in https://github.com/apache/flink/pull/3870 , close this pr and let us address all problems of incremental checkpointing there.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3859 I noticed that FLINK-6504 is also fixed in https://github.com/apache/flink/pull/3870 , close this pr and let us address all problems of incremental checkpointing there.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang closed the pull request at:

          https://github.com/apache/flink/pull/3859

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3859
          Hide
          srichter Stefan Richter added a comment -

          fixed in 958773b71c

          Show
          srichter Stefan Richter added a comment - fixed in 958773b71c

            People

            • Assignee:
              xiaogang.shi Xiaogang Shi
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development