Details

      Description

      Incremental checkpoints are currently not externalizible. The missing piece is familiarizing the SavepointSerializer(s) with the new state handles classes that we added for incremental checkpointing. Currently, some of those (e.g. org.apache.flink.contrib.streaming.state.RocksDBIncrementalKeyedStateHandle) currently live in the contrib.rocksdb package and need to be migrated. We can also push them to a different abstraction level, i.e. IncrementalKeyedStateHandle with some private data, referenced existing shared data (from previous checkpoints), and (presumably) newly created shared data (first created by the current checkpoint).

        Issue Links

          Activity

          Hide
          srichter Stefan Richter added a comment -

          fixed in 098e46f2d2

          Show
          srichter Stefan Richter added a comment - fixed in 098e46f2d2
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3870#discussion_r116175297

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java —
          @@ -180,69 +176,66 @@ public long getStateSize() {

          @Override
          public void registerSharedStates(SharedStateRegistry stateRegistry) {
          +
          Preconditions.checkState(!registered, "The state handle has already registered its shared states.");

          • for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) {
          • SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
            + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) {
              • End diff –

          For FLINK-6545 we need to familiarize the savepoint serializer with the new incremental handles, but currently they are in the RocksDB package, invisible for savepoint classes.

          I am currently already working on making incremental snapshots a concept on a higher abstraction level, that is less tightly coupled to RocksDB. I think that we can then have incremental snapshots also for other backends, e.g. the memory based. The abstraction is simply, that all incremental snapshots can be divided into backend meta data, private data, newly created shared data and referenced shared data. Also the placeholder handle will become publicly available. Would this address comments? I could make those preparations already part of this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116175297 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java — @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { End diff – For FLINK-6545 we need to familiarize the savepoint serializer with the new incremental handles, but currently they are in the RocksDB package, invisible for savepoint classes. I am currently already working on making incremental snapshots a concept on a higher abstraction level, that is less tightly coupled to RocksDB. I think that we can then have incremental snapshots also for other backends, e.g. the memory based. The abstraction is simply, that all incremental snapshots can be divided into backend meta data, private data, newly created shared data and referenced shared data. Also the placeholder handle will become publicly available. Would this address comments? I could make those preparations already part of this PR.
          Hide
          xiaogang.shi Xiaogang Shi added a comment -

          I prefer not to use `SavepointSerializer` to serialize/deserialize external checkpoints. Since the implementation of incremental checkpointing may vary a lot in different state backends, it will be very tedious and error-prone for `SavepointSerializer` to support each kind of incremental state handle.

          Given that checkpoints are not supposed to be back-compatible, maybe we can simply use java serializer to do the serialization of external checkpoints. What do you think?

          Show
          xiaogang.shi Xiaogang Shi added a comment - I prefer not to use `SavepointSerializer` to serialize/deserialize external checkpoints. Since the implementation of incremental checkpointing may vary a lot in different state backends, it will be very tedious and error-prone for `SavepointSerializer` to support each kind of incremental state handle. Given that checkpoints are not supposed to be back-compatible, maybe we can simply use java serializer to do the serialization of external checkpoints. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3870

          @gyfora Yes, this is still a limitation, because the incremental checkpoints are currently not (yet) externalizable. The missing piece is, exactly as you said, familiarizing the `SavepointSerializer` with the new state handle classes from the incremental checkpoints.

          I will create a jira to track this: https://issues.apache.org/jira/browse/FLINK-6545

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3870 @gyfora Yes, this is still a limitation, because the incremental checkpoints are currently not (yet) externalizable. The missing piece is, exactly as you said, familiarizing the `SavepointSerializer` with the new state handle classes from the incremental checkpoints. I will create a jira to track this: https://issues.apache.org/jira/browse/FLINK-6545

            People

            • Assignee:
              Unassigned
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development