Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6600

Add key serializer's config snapshot to KeyedBackendSerializationProxy

    Details

      Description

      Currently, only the key serializer is included in what's written for a keyed state backend in checkpoints. The namespace and state serializer already have their config snapshots included in the individual registered state's metainfo.

      We should also include the configuration snapshot of the key serializer in KeyedBackendSerializationProxy so that we can also be flexible in allowing migration / transformation of the key in the future.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3925

          FLINK-6600 Add key serializer config snapshot to keyed backend checkpoints

          This PR adds the config snapshot of the key serializer of keyed
          backends to its checkpoints. This allows the opportunity to upgrade key
          serializers, as well as state migration in the future in the case of
          incompatible old and new key serializers.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6600

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3925.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3925


          commit afe1acb02a95cc04f6efd929e66c4b919a4b7757
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-05-16T17:15:57Z

          FLINK-6600 Add key serializer config snapshot to keyed backend checkpoints

          This commit adds the config snapshot of the key serializer of keyed
          backends to its checkpoints. This allows the oppurtunity to upgrade key
          serializers, as well as state migration in the future in the case of
          incompatible old and new key serializers.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3925 FLINK-6600 Add key serializer config snapshot to keyed backend checkpoints This PR adds the config snapshot of the key serializer of keyed backends to its checkpoints. This allows the opportunity to upgrade key serializers, as well as state migration in the future in the case of incompatible old and new key serializers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6600 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3925 commit afe1acb02a95cc04f6efd929e66c4b919a4b7757 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-05-16T17:15:57Z FLINK-6600 Add key serializer config snapshot to keyed backend checkpoints This commit adds the config snapshot of the key serializer of keyed backends to its checkpoints. This allows the oppurtunity to upgrade key serializers, as well as state migration in the future in the case of incompatible old and new key serializers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3925#discussion_r116977075

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -1116,13 +1116,27 @@ private void restoreKeyGroupsInStateHandle()

          • @throws ClassNotFoundException
          • @throws RocksDBException
            */
            + @SuppressWarnings("unchecked")
            private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {

          KeyedBackendSerializationProxy serializationProxy =
          new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);

          serializationProxy.read(currentStateHandleInView);

          + // check for key serializer compatibility; this also reconfigures the
          + // key serializer to be compatible, if it is required and is possible
          + if (StateMigrationUtil.resolveCompatibilityResult(
          + serializationProxy.getKeySerializer(),
          + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
          + serializationProxy.getKeySerializerConfigSnapshot(),
          + (TypeSerializer) rocksDBKeyedStateBackend.keySerializer)
          + .isRequiresMigration()) {
          +
          + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
          — End diff –

          In the future, we need to be extra careful in this case. If the key's hash codes change through a state transformation, the their assignment to the operator instances is broken. Maybe we should already leave a comment about this here, so that we remember it in the future.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3925#discussion_r116977075 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -1116,13 +1116,27 @@ private void restoreKeyGroupsInStateHandle() @throws ClassNotFoundException @throws RocksDBException */ + @SuppressWarnings("unchecked") private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException { KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader); serializationProxy.read(currentStateHandleInView); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { + + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + — End diff – In the future, we need to be extra careful in this case. If the key's hash codes change through a state transformation, the their assignment to the operator instances is broken. Maybe we should already leave a comment about this here, so that we remember it in the future.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3925#discussion_r116977261

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -1228,6 +1243,19 @@ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBack
          DataInputView in = new DataInputViewStreamWrapper(inputStream);
          serializationProxy.read(in);

          + // check for key serializer compatibility; this also reconfigures the
          + // key serializer to be compatible, if it is required and is possible
          + if (StateMigrationUtil.resolveCompatibilityResult(
          + serializationProxy.getKeySerializer(),
          + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
          + serializationProxy.getKeySerializerConfigSnapshot(),
          + (TypeSerializer) stateBackend.keySerializer)
          + .isRequiresMigration()) {
          +
          + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
          + "Aborting now since state migration is currently not available");
          — End diff –

          Same here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3925#discussion_r116977261 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -1228,6 +1243,19 @@ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBack DataInputView in = new DataInputViewStreamWrapper(inputStream); serializationProxy.read(in); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) stateBackend.keySerializer) + .isRequiresMigration()) { + + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); — End diff – Same here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3925#discussion_r116977867

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java —
          @@ -385,6 +386,19 @@ private void restorePartitionedState(Collection<KeyedStateHandle> state) throws

          serializationProxy.read(inView);

          + // check for key serializer compatibility; this also reconfigures the
          + // key serializer to be compatible, if it is required and is possible
          + if (StateMigrationUtil.resolveCompatibilityResult(
          + serializationProxy.getKeySerializer(),
          + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
          + serializationProxy.getKeySerializerConfigSnapshot(),
          + (TypeSerializer) keySerializer)
          + .isRequiresMigration()) {
          +
          + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
          — End diff –

          Same here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3925#discussion_r116977867 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java — @@ -385,6 +386,19 @@ private void restorePartitionedState(Collection<KeyedStateHandle> state) throws serializationProxy.read(inView); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) keySerializer) + .isRequiresMigration()) { + + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + — End diff – Same here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3925

          Thanks for adding this so quickly @tzulitai ! The changes look good to me +1.

          @tillrohrmann maybe you can already include this for your testing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3925 Thanks for adding this so quickly @tzulitai ! The changes look good to me +1. @tillrohrmann maybe you can already include this for your testing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3925

          Thanks for the review @StefanRRichter!
          I'll add the reminder comment and then merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3925 Thanks for the review @StefanRRichter! I'll add the reminder comment and then merge this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3925

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3925
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for 1.3 via 8b5ba676fb3b52f69b0a523d57264462ac73c23f
          Fixed for master via d8a467b01ab63127dbf563b6aa8c68fe5d9c85d4

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for 1.3 via 8b5ba676fb3b52f69b0a523d57264462ac73c23f Fixed for master via d8a467b01ab63127dbf563b6aa8c68fe5d9c85d4
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3925#discussion_r117266306

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -1116,13 +1116,27 @@ private void restoreKeyGroupsInStateHandle()

          • @throws ClassNotFoundException
          • @throws RocksDBException
            */
            + @SuppressWarnings("unchecked")
            private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {

          KeyedBackendSerializationProxy serializationProxy =
          new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);

          serializationProxy.read(currentStateHandleInView);

          + // check for key serializer compatibility; this also reconfigures the
          + // key serializer to be compatible, if it is required and is possible
          + if (StateMigrationUtil.resolveCompatibilityResult(
          + serializationProxy.getKeySerializer(),
          + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
          + serializationProxy.getKeySerializerConfigSnapshot(),
          + (TypeSerializer) rocksDBKeyedStateBackend.keySerializer)
          + .isRequiresMigration()) {
          — End diff –

          Maybe as an idea for the future: If I'm not mistaken, then is the KeySerializerConfigSnapshot constant across all `KeyedStateHandles`. Thus, having something like a broadcast state for state backend meta data would be cool. Then we don't have to store redundant information.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3925#discussion_r117266306 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -1116,13 +1116,27 @@ private void restoreKeyGroupsInStateHandle() @throws ClassNotFoundException @throws RocksDBException */ + @SuppressWarnings("unchecked") private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException { KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader); serializationProxy.read(currentStateHandleInView); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { — End diff – Maybe as an idea for the future: If I'm not mistaken, then is the KeySerializerConfigSnapshot constant across all `KeyedStateHandles`. Thus, having something like a broadcast state for state backend meta data would be cool. Then we don't have to store redundant information.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development