Details

      Description

      In order for serializers to be able to be reconfigured on restore, we need knowledge of the previous serializer configurations, e.g. what types were registered, with which specific / default serializers, and especially for Kryo, the order they were registered.

      For this, we will need serializer configuration metainfo to be self-contained within the written state.

      For the implementation, we propose to include the following changes:

      • Have a new separate SerializersConfig class that is extracted from ExecutionConfig. This new class should contain only the serializer-related configurations (e.g., registeredKryoTypes, registeredPojoTypes, etc.). The SerializersConfig class should only be internally used, and therefore annotated with Internal. Users should still use the ExecutionConfig to configure serializers.
      • For serializers that previously require a ExecutionConfig in constructors, try changing them to take a SerializersConfig instead.
      • Introduce SerializersConfigSerializationProxy, which is in charge of serializing the current SerializersConfig when writing state to streams. This proxy defines the the serialized format of serializer configurations, therefore should be a VersionedIOReadableWritable as we may change the format / information written in the future.
      • Add SerializersConfigSerializationProxy into state backends serialization proxies (e.g. KeyedBackendSerializationProxy) so that the serializer configuration is written into state. Need to additionally make sure backwards compatibility of previous-version backend serialization proxies.

      For the initial version, we propose to include the following within the written serialization config metadata (ordered):
      1. registeredPojoTypes
      2. Throwable.class --> org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer default Flink-specific registry for serializing throwables.
      3. defaultKryoSerializers
      4. defaultKryoSerializerClasses
      5. Kryo registrations for all primitive types (and boxed versions). This is to allow compatibility in case the built-in registrations for the primitive types change in Kryo in the future.
      6. registeredTypesWithKryoSerializers
      7. registeredTypesWithKryoSerializerClasses

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3804

          FLINK-6190 [core] Reconfigurable TypeSerializers

          Reconfigurable TypeSerializers is the first step towards allowing serializer upgrades. The second step, activating serializer upgrades for registered state, will be a separate follow-up PR based on this one.

          This PR includes both FLINK-6190(https://issues.apache.org/jira/browse/FLINK-6190) (configuration snapshot for all serializers) and FLINK-6191(https://issues.apache.org/jira/browse/FLINK-6191) (reconfiguration logic for all serializers). They have been bundled together in this PR because they share common concerns.

          Since the change touches all serializers currently in Flink, to ease review, the PR is broken up into incrementally buildable commits that are independent for different kinds of serializers.

            1. Description

          Reconfigurable serializers consist of 2 parts:
          1. Creating a snapshot of a serializer's configuration, that can be written to state with definable versioned format. A serializer's configuration snapshot is a point-in-time view of the current state and parameters of the serializer at the time of the snapshot. For example, for the `KryoSerializer`, its configuration should contain the registration order of its classes and serializers. Other serializers like the `PojoSerializer` contain configuration parameters that change during runtime, e.g. the cached serializers for its non-registered POJO subclasses (the explanation of why certain information needs to be persisted for each serializer's configuration snapshot is included as code comments and Javadocs in the PR).

          2. Reconfiguring a new serializer with the configuration snapshot of a preceding serializer, so that it may be compatible to read old data written by the preceding serializer.

            1. New user interfaces

          New methods in the `TypeSerializer` interface:

          • `TypeSerializer#snapshotConfiguration()`: extracts a configuration snapshot from the serializer
          • `TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)`: reconfigure the serializer with its preceding serializer's configuration snapshot.

          New classes:

          • `TypeSerializerConfigSnapshot`: a `VersionedIOReadableWritable` base class for serializers to implement their own configuration snapshot class. Each serializer needs to encode its own information about its serialization format and its required parameters within its own config snapshot class.
          • `ForwardCompatibleSerializationFormatConfig`: a special marker config that implementations of `TypeSerializer#snapshotConfiguration()` can return to signal that new serializers for the data written by the serializer does not need to be reconfigured for compatibility. For example, user custom serializers using serialization frameworks with built-in compatibility mechanisms (e.g. Thrift, Protobut etc.) can simply just return this marker.
          • `ReconfigureResult`: enum representing the result of a reconfiguration, returned from `TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)`. 3 kinds of results are currently defined - please see the Javadoc of the class for details.
            1. Tests

          Specific tests have been added for the more complex serializers, namely:

          • `EnumSerializer` (841298a)
          • `KryoSerializer` (5b50505)
          • `PojoSerializer` (7ca5496)

          General `TypeSerializerConfigSnapshot` tests such as de-/serialization of configuration snapshots is added in c2bb1ef.

          Also, two fundamental unit tests related to config snapshots have also been added to `SerializerTestBase` (b917941).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6190

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3804.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3804



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3804 FLINK-6190 [core] Reconfigurable TypeSerializers Reconfigurable TypeSerializers is the first step towards allowing serializer upgrades. The second step, activating serializer upgrades for registered state, will be a separate follow-up PR based on this one. This PR includes both FLINK-6190 ( https://issues.apache.org/jira/browse/FLINK-6190 ) (configuration snapshot for all serializers) and FLINK-6191 ( https://issues.apache.org/jira/browse/FLINK-6191 ) (reconfiguration logic for all serializers). They have been bundled together in this PR because they share common concerns. Since the change touches all serializers currently in Flink, to ease review, the PR is broken up into incrementally buildable commits that are independent for different kinds of serializers. Description Reconfigurable serializers consist of 2 parts: 1. Creating a snapshot of a serializer's configuration, that can be written to state with definable versioned format. A serializer's configuration snapshot is a point-in-time view of the current state and parameters of the serializer at the time of the snapshot. For example, for the `KryoSerializer`, its configuration should contain the registration order of its classes and serializers. Other serializers like the `PojoSerializer` contain configuration parameters that change during runtime, e.g. the cached serializers for its non-registered POJO subclasses (the explanation of why certain information needs to be persisted for each serializer's configuration snapshot is included as code comments and Javadocs in the PR). 2. Reconfiguring a new serializer with the configuration snapshot of a preceding serializer, so that it may be compatible to read old data written by the preceding serializer. New user interfaces New methods in the `TypeSerializer` interface: `TypeSerializer#snapshotConfiguration()`: extracts a configuration snapshot from the serializer `TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)`: reconfigure the serializer with its preceding serializer's configuration snapshot. New classes: `TypeSerializerConfigSnapshot`: a `VersionedIOReadableWritable` base class for serializers to implement their own configuration snapshot class. Each serializer needs to encode its own information about its serialization format and its required parameters within its own config snapshot class. `ForwardCompatibleSerializationFormatConfig`: a special marker config that implementations of `TypeSerializer#snapshotConfiguration()` can return to signal that new serializers for the data written by the serializer does not need to be reconfigured for compatibility. For example, user custom serializers using serialization frameworks with built-in compatibility mechanisms (e.g. Thrift, Protobut etc.) can simply just return this marker. `ReconfigureResult`: enum representing the result of a reconfiguration, returned from `TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)`. 3 kinds of results are currently defined - please see the Javadoc of the class for details. Tests Specific tests have been added for the more complex serializers, namely: `EnumSerializer` (841298a) `KryoSerializer` (5b50505) `PojoSerializer` (7ca5496) General `TypeSerializerConfigSnapshot` tests such as de-/serialization of configuration snapshots is added in c2bb1ef. Also, two fundamental unit tests related to config snapshots have also been added to `SerializerTestBase` (b917941). You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6190 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3804.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3804
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3804

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3804
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Merged for 1.3.0 with https://git-wip-us.apache.org/repos/asf/flink/commit/63c04a5

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development