Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6482

Add nested serializers into configuration snapshots of composite serializers

    Details

      Description

      Currently, the composite serializers' configuration snapshots only wrap the config snapshots of nested serializers.

      We should also consider adding serialization of the nested serializers into the config snapshot, so that in the case where only some nested serializer cannot be loaded (class missing / implementation changed), we can also provide a path for serializer upgrades.

      This applies for all composite serializers that have nested serializers.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3937

          FLINK-6482 [core] Add nested serializers to config snapshots of composite serializers

          This commit adds also the nested serializers themselves to the configuration snapshots of composite serializers. This opens up the opportunity to use the previous nested serializer as the convert deserializer in the case that a nested serializer in the new serializer determines that state migration is required.

          Take for example a Tuple serializer. If any one of the nested field serializers is not compatible, then the whole Tuple serializer will need to be migrated. Prior to this PR, this means that all serializers would need to be able to be either compatible or provide a convert deserializer. With this change, this relaxes this by opening up the opportunity to just use the previous serializer of fields as the convert deserializer.

          Only the composite serializers are affected by this change.

          This commit also consolidates all TypeSerializer-related serialization
          proxies into a single utility class. A few renamings are included, namely:
          1. `ClassNotFoundDummyTypeSerializer` --> `UnloadableDummyTypeSerializer`
          2. `TypeSerializerUtil` --> `TypeSerializerSerializationUtil` (because that's all its used for)
          3. `TypeSerializerSerializationProxy` --> functionality moved to a static method of `TypeSerializerSerializationUtil`
          4. `StateMigrationUtil` --> moved to flink-core and renamed to `CompatibilityUtil`.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6482

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3937.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3937


          commit a189999f799fe943f2297293a2f74707b9c4188d
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-05-18T06:51:33Z

          FLINK-6482 [core] Add nested serializers to config snapshots of composite serializers

          This commit adds also the nested serializers themselves to the
          configuration snapshots of composite serializers. This opens up the
          oppurtunity to use the previous nested serializer as the convert
          deserializer in the case that a nested serializer in the new serializer
          determines that state migration is required.

          This commit also consolidate all TypeSerializer-related serialization
          proxies into a single utility class.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3937 FLINK-6482 [core] Add nested serializers to config snapshots of composite serializers This commit adds also the nested serializers themselves to the configuration snapshots of composite serializers. This opens up the opportunity to use the previous nested serializer as the convert deserializer in the case that a nested serializer in the new serializer determines that state migration is required. Take for example a Tuple serializer. If any one of the nested field serializers is not compatible, then the whole Tuple serializer will need to be migrated. Prior to this PR, this means that all serializers would need to be able to be either compatible or provide a convert deserializer. With this change, this relaxes this by opening up the opportunity to just use the previous serializer of fields as the convert deserializer. Only the composite serializers are affected by this change. This commit also consolidates all TypeSerializer-related serialization proxies into a single utility class. A few renamings are included, namely: 1. `ClassNotFoundDummyTypeSerializer` --> `UnloadableDummyTypeSerializer` 2. `TypeSerializerUtil` --> `TypeSerializerSerializationUtil` (because that's all its used for) 3. `TypeSerializerSerializationProxy` --> functionality moved to a static method of `TypeSerializerSerializationUtil` 4. `StateMigrationUtil` --> moved to flink-core and renamed to `CompatibilityUtil`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6482 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3937.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3937 commit a189999f799fe943f2297293a2f74707b9c4188d Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-05-18T06:51:33Z FLINK-6482 [core] Add nested serializers to config snapshots of composite serializers This commit adds also the nested serializers themselves to the configuration snapshots of composite serializers. This opens up the oppurtunity to use the previous nested serializer as the convert deserializer in the case that a nested serializer in the new serializer determines that state migration is required. This commit also consolidate all TypeSerializer-related serialization proxies into a single utility class.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3937

          Hey,
          what's the status of this PR?
          I see the JIRA is marked as a blocker. When do you think is this PR ready to be merged?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3937 Hey, what's the status of this PR? I see the JIRA is marked as a blocker. When do you think is this PR ready to be merged?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3937

          @rmetzger it is currently under review by @StefanRRichter. The conflicts are pretty trivial and can be resolved very quickly; I'm just leaving it as is because the review started already last week.
          I'm hoping to merge it by the end of Monday (today).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3937 @rmetzger it is currently under review by @StefanRRichter. The conflicts are pretty trivial and can be resolved very quickly; I'm just leaving it as is because the review started already last week. I'm hoping to merge it by the end of Monday (today).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3937

          Thank you for your response.
          This PR is the last item for the next release candidate. Would be good to get this in in the next few hours so that I can finally start the vote for the 1.3 release.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3937 Thank you for your response. This PR is the last item for the next release candidate. Would be good to get this in in the next few hours so that I can finally start the vote for the 1.3 release.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3937#discussion_r117696993

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java —
          @@ -73,13 +91,11 @@ public boolean equals(Object obj) {
          }

          return (obj.getClass().equals(getClass()))

          • && Arrays.equals(
          • nestedSerializerConfigSnapshots,
          • ((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
            + && nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
              • End diff –

          This is not doing a deep equals on the array.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3937#discussion_r117696993 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java — @@ -73,13 +91,11 @@ public boolean equals(Object obj) { } return (obj.getClass().equals(getClass())) && Arrays.equals( nestedSerializerConfigSnapshots, ((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots()); + && nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs()); End diff – This is not doing a deep equals on the array.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3937#discussion_r117697041

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java —
          @@ -73,13 +91,11 @@ public boolean equals(Object obj) {
          }

          return (obj.getClass().equals(getClass()))

          • && Arrays.equals(
          • nestedSerializerConfigSnapshots,
          • ((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
            + && nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
            }

          @Override
          public int hashCode() {

          • return Arrays.hashCode(nestedSerializerConfigSnapshots);
            + return nestedSerializersWithConfigs.hashCode();
              • End diff –

          This is also not giving a deep hashcode of the array.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3937#discussion_r117697041 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java — @@ -73,13 +91,11 @@ public boolean equals(Object obj) { } return (obj.getClass().equals(getClass())) && Arrays.equals( nestedSerializerConfigSnapshots, ((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots()); + && nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs()); } @Override public int hashCode() { return Arrays.hashCode(nestedSerializerConfigSnapshots); + return nestedSerializersWithConfigs.hashCode(); End diff – This is also not giving a deep hashcode of the array.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3937#discussion_r117699760

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java —
          @@ -73,13 +91,11 @@ public boolean equals(Object obj) {
          }

          return (obj.getClass().equals(getClass()))

          • && Arrays.equals(
          • nestedSerializerConfigSnapshots,
          • ((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
            + && nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
              • End diff –

          I've changed `nestedSerializersWithConfigs` to a list, it's no longer an array. The `equals` implementation should suffice?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3937#discussion_r117699760 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java — @@ -73,13 +91,11 @@ public boolean equals(Object obj) { } return (obj.getClass().equals(getClass())) && Arrays.equals( nestedSerializerConfigSnapshots, ((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots()); + && nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs()); End diff – I've changed `nestedSerializersWithConfigs` to a list, it's no longer an array. The `equals` implementation should suffice?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3937#discussion_r117700406

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java —
          @@ -73,13 +91,11 @@ public boolean equals(Object obj) {
          }

          return (obj.getClass().equals(getClass()))

          • && Arrays.equals(
          • nestedSerializerConfigSnapshots,
          • ((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
            + && nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
            }

          @Override
          public int hashCode() {

          • return Arrays.hashCode(nestedSerializerConfigSnapshots);
            + return nestedSerializersWithConfigs.hashCode();
              • End diff –

          Same here; `nestedSerializersWithConfigs` is a `List` not an array, the `hashCode` implementation should suffice?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3937#discussion_r117700406 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java — @@ -73,13 +91,11 @@ public boolean equals(Object obj) { } return (obj.getClass().equals(getClass())) && Arrays.equals( nestedSerializerConfigSnapshots, ((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots()); + && nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs()); } @Override public int hashCode() { return Arrays.hashCode(nestedSerializerConfigSnapshots); + return nestedSerializersWithConfigs.hashCode(); End diff – Same here; `nestedSerializersWithConfigs` is a `List` not an array, the `hashCode` implementation should suffice?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3937#discussion_r117710279

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java —
          @@ -19,47 +19,65 @@
          package org.apache.flink.api.common.typeutils;

          import org.apache.flink.annotation.Internal;
          +import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.core.memory.DataInputView;
          import org.apache.flink.core.memory.DataOutputView;
          import org.apache.flink.util.Preconditions;

          import java.io.IOException;
          -import java.util.Arrays;
          +import java.util.ArrayList;
          +import java.util.List;

          /**

          • A {@link TypeSerializerConfigSnapshot}

            for serializers that has multiple nested serializers.

          • * The configuration snapshot consists of the configuration snapshots of all nested serializers.
            + * The configuration snapshot consists of the configuration snapshots of all nested serializers, and
            + * also the nested serializers themselves.
            + *
            + * <p>Both the nested serializers and the configuration snapshots are written as configuration of
            + * composite serializers, so that on restore, the previous serializer may be used in case migration
            + * is required.
            */
            @Internal
            public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
          • private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
            + private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersWithConfigs;

          /** This empty nullary constructor is required for deserializing the configuration. */
          public CompositeTypeSerializerConfigSnapshot() {}

          • public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) {
          • this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
            + public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializers) {
            + Preconditions.checkNotNull(nestedSerializers);
            +
            + this.nestedSerializersWithConfigs = new ArrayList<>(nestedSerializers.length);
            + TypeSerializerConfigSnapshot configSnapshot;
              • End diff –

          I think this could go inside the loop.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3937#discussion_r117710279 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java — @@ -19,47 +19,65 @@ package org.apache.flink.api.common.typeutils; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; /** A {@link TypeSerializerConfigSnapshot} for serializers that has multiple nested serializers. * The configuration snapshot consists of the configuration snapshots of all nested serializers. + * The configuration snapshot consists of the configuration snapshots of all nested serializers, and + * also the nested serializers themselves. + * + * <p>Both the nested serializers and the configuration snapshots are written as configuration of + * composite serializers, so that on restore, the previous serializer may be used in case migration + * is required. */ @Internal public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot { private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots; + private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersWithConfigs; /** This empty nullary constructor is required for deserializing the configuration. */ public CompositeTypeSerializerConfigSnapshot() {} public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) { this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots); + public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializers) { + Preconditions.checkNotNull(nestedSerializers); + + this.nestedSerializersWithConfigs = new ArrayList<>(nestedSerializers.length); + TypeSerializerConfigSnapshot configSnapshot; End diff – I think this could go inside the loop.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3937#discussion_r117710523

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java —
          @@ -19,47 +19,65 @@
          package org.apache.flink.api.common.typeutils;

          import org.apache.flink.annotation.Internal;
          +import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.core.memory.DataInputView;
          import org.apache.flink.core.memory.DataOutputView;
          import org.apache.flink.util.Preconditions;

          import java.io.IOException;
          -import java.util.Arrays;
          +import java.util.ArrayList;
          +import java.util.List;

          /**

          • A {@link TypeSerializerConfigSnapshot}

            for serializers that has multiple nested serializers.

          • * The configuration snapshot consists of the configuration snapshots of all nested serializers.
            + * The configuration snapshot consists of the configuration snapshots of all nested serializers, and
            + * also the nested serializers themselves.
            + *
            + * <p>Both the nested serializers and the configuration snapshots are written as configuration of
            + * composite serializers, so that on restore, the previous serializer may be used in case migration
            + * is required.
            */
            @Internal
            public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
          • private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
            + private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersWithConfigs;

          /** This empty nullary constructor is required for deserializing the configuration. */
          public CompositeTypeSerializerConfigSnapshot() {}

          • public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) {
          • this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
            + public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializers)
            Unknown macro: { + Preconditions.checkNotNull(nestedSerializers); + + this.nestedSerializersWithConfigs = new ArrayList<>(nestedSerializers.length); + TypeSerializerConfigSnapshot configSnapshot; + for (TypeSerializer<?> nestedSerializer }

          @Override
          public void write(DataOutputView out) throws IOException

          { super.write(out); - TypeSerializerUtil.writeSerializerConfigSnapshots(out, nestedSerializerConfigSnapshots); + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(out, nestedSerializersWithConfigs); }

          @Override
          public void read(DataInputView in) throws IOException

          { super.read(in); - nestedSerializerConfigSnapshots = TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader()); + this.nestedSerializersWithConfigs = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, getUserCodeClassLoader()); }
          • public TypeSerializerConfigSnapshot[] getNestedSerializerConfigSnapshots() {
          • return nestedSerializerConfigSnapshots;
            + public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNestedSerializersAndConfigs() { + return nestedSerializersWithConfigs; }
          • public TypeSerializerConfigSnapshot getSingleNestedSerializerConfigSnapshot() {
          • return nestedSerializerConfigSnapshots[0];
            + public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> getSingleNestedSerializerAndConfig() {
            + return nestedSerializersWithConfigs.get(0);
              • End diff –

          Method name and variable name are not aligned.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3937#discussion_r117710523 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java — @@ -19,47 +19,65 @@ package org.apache.flink.api.common.typeutils; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; /** A {@link TypeSerializerConfigSnapshot} for serializers that has multiple nested serializers. * The configuration snapshot consists of the configuration snapshots of all nested serializers. + * The configuration snapshot consists of the configuration snapshots of all nested serializers, and + * also the nested serializers themselves. + * + * <p>Both the nested serializers and the configuration snapshots are written as configuration of + * composite serializers, so that on restore, the previous serializer may be used in case migration + * is required. */ @Internal public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot { private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots; + private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersWithConfigs; /** This empty nullary constructor is required for deserializing the configuration. */ public CompositeTypeSerializerConfigSnapshot() {} public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) { this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots); + public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializers) Unknown macro: { + Preconditions.checkNotNull(nestedSerializers); + + this.nestedSerializersWithConfigs = new ArrayList<>(nestedSerializers.length); + TypeSerializerConfigSnapshot configSnapshot; + for (TypeSerializer<?> nestedSerializer } @Override public void write(DataOutputView out) throws IOException { super.write(out); - TypeSerializerUtil.writeSerializerConfigSnapshots(out, nestedSerializerConfigSnapshots); + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(out, nestedSerializersWithConfigs); } @Override public void read(DataInputView in) throws IOException { super.read(in); - nestedSerializerConfigSnapshots = TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader()); + this.nestedSerializersWithConfigs = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, getUserCodeClassLoader()); } public TypeSerializerConfigSnapshot[] getNestedSerializerConfigSnapshots() { return nestedSerializerConfigSnapshots; + public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNestedSerializersAndConfigs() { + return nestedSerializersWithConfigs; } public TypeSerializerConfigSnapshot getSingleNestedSerializerConfigSnapshot() { return nestedSerializerConfigSnapshots [0] ; + public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> getSingleNestedSerializerAndConfig() { + return nestedSerializersWithConfigs.get(0); End diff – Method name and variable name are not aligned.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3937

          Overall, LGTM. +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3937 Overall, LGTM. +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3937

          Thanks for the review Stefan!
          @rmetzger currently waiting for a Travis run before merging this: https://travis-ci.org/tzulitai/flink/builds/234805397

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3937 Thanks for the review Stefan! @rmetzger currently waiting for a Travis run before merging this: https://travis-ci.org/tzulitai/flink/builds/234805397
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3937

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3937
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for master via a7bc5de9b17dde793c6da0e7cb700004af117148.
          Fixed for 1.3.0 via 09cc3f7c577741d7271f73c6e916c98f7b79a820.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for master via a7bc5de9b17dde793c6da0e7cb700004af117148. Fixed for 1.3.0 via 09cc3f7c577741d7271f73c6e916c98f7b79a820.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3937

          Thanks a lot for the quick review and merge!!

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3937 Thanks a lot for the quick review and merge!!

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development