Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6674 Update release 1.3 docs
  3. FLINK-6478

Add documentation on how to upgrade serializers for managed state

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Documentation
    • Labels:
      None

      Description

      There needs to be a documentation that explains how to use the new serializer upgrade APIs in TypeSerializer, and how the methods work with checkpoints. This documentation should probably be placed under "Application development --> Streaming --> Working with State".

      Ideally, it should also come with a minimal example for users that perhaps use serialization frameworks that already have built-in backwards compatibility (such as Thrift).

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4006

          FLINK-6478 [doc] Document how to upgrade state serializers

          This PR adds documentation on how use custom serializers for managed, as well as handling upgrades to the serializers.

          We could probably also consider restructuring the "Working with State" page to be a navigation to nested subpages that each cover different topics on working with state, e.g. "Programming API" and "State Serialization", as stuffing all this new detail into a single page might be a bit too overwhelming for new users.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6478

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4006.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4006


          commit 46d002c7a81f5f5b2a5648bffbe18e18fda144c5
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-05-29T08:53:11Z

          FLINK-6478 [doc] Document how to upgrade state serializers


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4006 FLINK-6478 [doc] Document how to upgrade state serializers This PR adds documentation on how use custom serializers for managed, as well as handling upgrades to the serializers. We could probably also consider restructuring the "Working with State" page to be a navigation to nested subpages that each cover different topics on working with state, e.g. "Programming API" and "State Serialization", as stuffing all this new detail into a single page might be a bit too overwhelming for new users. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6478 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4006.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4006 commit 46d002c7a81f5f5b2a5648bffbe18e18fda144c5 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-05-29T08:53:11Z FLINK-6478 [doc] Document how to upgrade state serializers
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user alpinegizmo commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4006#discussion_r118905107

          — Diff: docs/dev/stream/state.md —
          @@ -429,3 +429,120 @@ public static class CounterSource

          Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.

          +## Custom serialization for managed state
          +
          +This section is targeted as a guideline for users who require using custom serialization for their state, covering how
          +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using
          +Flink's own serializers, this section is irrelevant and can be skipped.
          +
          +### Using custom serializers
          +
          +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required
          +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's
          +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state.
          +
          +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
          +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:
          +
          +

          {% highlight java %}
          +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
          + new ListStateDescriptor<>(
          + "state-name",
          + new TypeSerializer<> {...});
          +
          +checkpointedState = getRuntimeContext().getListState(descriptor);
          +{% endhighlight %}
          +
          +### Handling serializer upgrades and compatibility
          +
          +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any
          +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer
          +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility,
          +and is replaced as the new serializer for the state.
          +
          +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state,
          +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility
          +is provided through the following two methods of the `TypeSerializer` interface:
          +
          +{% highlight java %}

          +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
          +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
          +

          {% endhighlight %}
          +
          +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a
          +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the
          +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot
          +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This
          +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure
          +the new serializer in the case that it is incompatible.
          +
          +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the
          +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible
          +with their previous configuration.
          +
          +The following subsections illustrate guidelines to implement these two methods when using custom serializers.
          +
          +#### Implementing the `snapshotConfiguration` method
          +
          +The serializer's configuration snapshot should capture enough information such that on restore, the information
          +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible.
          +This could typically contain information about the serializer's parameters or binary format of the serialized data;
          +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized
          +bytes, and that it writes in the same binary format.
          +
          +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below
          +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`.
          +
          +{% highlight java %}
          +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable {
          + public abstract int getVersion();
          + public void read(DataInputView in) {...}
          + public void write(DataOutputView out) {...}
          +}
          +{% endhighlight %}

          +
          +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base
          +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and
          +not completely overridden.
          +
          +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer
          +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration
          +may change over time. When reading from the checkpoint, you can use the `getReadVersion` method to determine the version
          +of the written configuration and adapt the read logic to the specific version.
          +
          +<span class="label label-danger">Attention</span> Do not mistaken the version of the serializer's configuration snapshot
          +to be related with upgrading the serializer. The exact same serializer can have different implementations of its
          +configuration snapshot, for example when more information is added to the configuration to allow more comprehensive
          +compatibility checks in the future.
          +
          +One limitation of implementing a `TyoeSerializerConfigSnapshot` is that an empty constructor must be present. The empty
          +constructor is required when reading the configuration snapshot from checkpoints.
          +
          +#### Implementing the `ensureCompatibility` method
          +
          +The `ensureCompatibility` method should contain logic that performs checks against the information about the previous
          +serializer carried over via the provided `TypeSerializerConfigSnapshot`, basically doing one of the following:
          +
          + * Check whether the serializer is compatible, while possibly reconfiguring itself (if required) with previous
          + configuration so that it may be compatible. Afterwards, acknowledge with Flink that the serializer is compatible.
          — End diff –

          I suggest dropping "with previous configuration"

          Show
          githubbot ASF GitHub Bot added a comment - Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118905107 — Diff: docs/dev/stream/state.md — @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's + [type serialization framework] (../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + + {% highlight java %} +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "state-name", + new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); + {% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + +#### Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersion(); + public void read(DataInputView in) {...} + public void write(DataOutputView out) {...} +} +{% endhighlight %} + +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and +not completely overridden. + +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration +may change over time. When reading from the checkpoint, you can use the `getReadVersion` method to determine the version +of the written configuration and adapt the read logic to the specific version. + +<span class="label label-danger">Attention</span> Do not mistaken the version of the serializer's configuration snapshot +to be related with upgrading the serializer. The exact same serializer can have different implementations of its +configuration snapshot, for example when more information is added to the configuration to allow more comprehensive +compatibility checks in the future. + +One limitation of implementing a `TyoeSerializerConfigSnapshot` is that an empty constructor must be present. The empty +constructor is required when reading the configuration snapshot from checkpoints. + +#### Implementing the `ensureCompatibility` method + +The `ensureCompatibility` method should contain logic that performs checks against the information about the previous +serializer carried over via the provided `TypeSerializerConfigSnapshot`, basically doing one of the following: + + * Check whether the serializer is compatible, while possibly reconfiguring itself (if required) with previous + configuration so that it may be compatible. Afterwards, acknowledge with Flink that the serializer is compatible. — End diff – I suggest dropping "with previous configuration"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user alpinegizmo commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4006#discussion_r118900486

          — Diff: docs/dev/stream/state.md —
          @@ -429,3 +429,120 @@ public static class CounterSource

          Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.

          +## Custom serialization for managed state
          +
          +This section is targeted as a guideline for users who require using custom serialization for their state, covering how
          +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using
          +Flink's own serializers, this section is irrelevant and can be skipped.
          +
          +### Using custom serializers
          +
          +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required
          +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's
          +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state.
          +
          +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
          +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:
          +
          +

          {% highlight java %}
          +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
          + new ListStateDescriptor<>(
          + "state-name",
          + new TypeSerializer<> {...});
          +
          +checkpointedState = getRuntimeContext().getListState(descriptor);
          +{% endhighlight %}
          +
          +### Handling serializer upgrades and compatibility
          +
          +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any
          +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer
          +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility,
          +and is replaced as the new serializer for the state.
          +
          +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state,
          +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility
          +is provided through the following two methods of the `TypeSerializer` interface:
          +
          +{% highlight java %}

          +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
          +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
          +

          {% endhighlight %}
          +
          +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a
          +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the
          +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot
          +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This
          +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure
          +the new serializer in the case that it is incompatible.
          +
          +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the
          +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible
          +with their previous configuration.
          +
          +The following subsections illustrate guidelines to implement these two methods when using custom serializers.
          +
          +#### Implementing the `snapshotConfiguration` method
          +
          +The serializer's configuration snapshot should capture enough information such that on restore, the information
          +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible.
          +This could typically contain information about the serializer's parameters or binary format of the serialized data;
          +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized
          +bytes, and that it writes in the same binary format.
          +
          +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below
          +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`.
          +
          +{% highlight java %}
          +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable {
          + public abstract int getVersion();
          + public void read(DataInputView in) {...}
          + public void write(DataOutputView out) {...}
          +}
          +{% endhighlight %}

          +
          +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base
          +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and
          +not completely overridden.
          +
          +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer
          +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration
          +may change over time. When reading from the checkpoint, you can use the `getReadVersion` method to determine the version
          +of the written configuration and adapt the read logic to the specific version.
          +
          +<span class="label label-danger">Attention</span> Do not mistaken the version of the serializer's configuration snapshot
          +to be related with upgrading the serializer. The exact same serializer can have different implementations of its
          +configuration snapshot, for example when more information is added to the configuration to allow more comprehensive
          +compatibility checks in the future.
          +
          +One limitation of implementing a `TyoeSerializerConfigSnapshot` is that an empty constructor must be present. The empty
          — End diff –

          TyoeSerializerConfigSnapshot => TypeSerializerConfigSnapshot

          Show
          githubbot ASF GitHub Bot added a comment - Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118900486 — Diff: docs/dev/stream/state.md — @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's + [type serialization framework] (../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + + {% highlight java %} +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "state-name", + new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); + {% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + +#### Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersion(); + public void read(DataInputView in) {...} + public void write(DataOutputView out) {...} +} +{% endhighlight %} + +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and +not completely overridden. + +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration +may change over time. When reading from the checkpoint, you can use the `getReadVersion` method to determine the version +of the written configuration and adapt the read logic to the specific version. + +<span class="label label-danger">Attention</span> Do not mistaken the version of the serializer's configuration snapshot +to be related with upgrading the serializer. The exact same serializer can have different implementations of its +configuration snapshot, for example when more information is added to the configuration to allow more comprehensive +compatibility checks in the future. + +One limitation of implementing a `TyoeSerializerConfigSnapshot` is that an empty constructor must be present. The empty — End diff – TyoeSerializerConfigSnapshot => TypeSerializerConfigSnapshot
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user alpinegizmo commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4006#discussion_r118903664

          — Diff: docs/dev/stream/state.md —
          @@ -429,3 +429,120 @@ public static class CounterSource

          Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.

          +## Custom serialization for managed state
          +
          +This section is targeted as a guideline for users who require using custom serialization for their state, covering how
          +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using
          +Flink's own serializers, this section is irrelevant and can be skipped.
          +
          +### Using custom serializers
          +
          +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required
          +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's
          +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state.
          +
          +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
          +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:
          +
          +

          {% highlight java %}
          +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
          + new ListStateDescriptor<>(
          + "state-name",
          + new TypeSerializer<> {...});
          +
          +checkpointedState = getRuntimeContext().getListState(descriptor);
          +{% endhighlight %}
          +
          +### Handling serializer upgrades and compatibility
          +
          +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any
          +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer
          +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility,
          +and is replaced as the new serializer for the state.
          +
          +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state,
          +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility
          +is provided through the following two methods of the `TypeSerializer` interface:
          +
          +{% highlight java %}

          +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
          +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
          +

          {% endhighlight %}
          +
          +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a
          +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the
          +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot
          +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This
          +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure
          +the new serializer in the case that it is incompatible.
          +
          +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the
          +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible
          +with their previous configuration.
          +
          +The following subsections illustrate guidelines to implement these two methods when using custom serializers.
          +
          +#### Implementing the `snapshotConfiguration` method
          +
          +The serializer's configuration snapshot should capture enough information such that on restore, the information
          +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible.
          +This could typically contain information about the serializer's parameters or binary format of the serialized data;
          +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized
          +bytes, and that it writes in the same binary format.
          +
          +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below
          +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`.
          +
          +{% highlight java %}
          +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable {
          + public abstract int getVersion();
          + public void read(DataInputView in) {...}
          + public void write(DataOutputView out) {...}
          +}
          +{% endhighlight %}

          +
          +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base
          +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and
          +not completely overridden.
          +
          +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer
          +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration
          +may change over time. When reading from the checkpoint, you can use the `getReadVersion` method to determine the version
          +of the written configuration and adapt the read logic to the specific version.
          +
          +<span class="label label-danger">Attention</span> Do not mistaken the version of the serializer's configuration snapshot
          +to be related with upgrading the serializer. The exact same serializer can have different implementations of its
          — End diff –

          I think you meant to say "mistake" rather than "mistaken", but how about a simpler construction:

          The version of the serializer's configuration snapshot is *not* related to upgrading the serializer.

          Show
          githubbot ASF GitHub Bot added a comment - Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118903664 — Diff: docs/dev/stream/state.md — @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's + [type serialization framework] (../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + + {% highlight java %} +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "state-name", + new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); + {% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + +#### Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersion(); + public void read(DataInputView in) {...} + public void write(DataOutputView out) {...} +} +{% endhighlight %} + +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and +not completely overridden. + +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration +may change over time. When reading from the checkpoint, you can use the `getReadVersion` method to determine the version +of the written configuration and adapt the read logic to the specific version. + +<span class="label label-danger">Attention</span> Do not mistaken the version of the serializer's configuration snapshot +to be related with upgrading the serializer. The exact same serializer can have different implementations of its — End diff – I think you meant to say "mistake" rather than "mistaken", but how about a simpler construction: The version of the serializer's configuration snapshot is * not * related to upgrading the serializer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user alpinegizmo commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4006#discussion_r118901207

          — Diff: docs/dev/stream/state.md —
          @@ -429,3 +429,120 @@ public static class CounterSource

          Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.

          +## Custom serialization for managed state
          +
          +This section is targeted as a guideline for users who require using custom serialization for their state, covering how
          — End diff –

          This section is targeted as a guideline for users who require custom serialization for their state

          "using" doesn't really work in this context. You could say "the use of" instead, but it doesn't add anything.

          Show
          githubbot ASF GitHub Bot added a comment - Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118901207 — Diff: docs/dev/stream/state.md — @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how — End diff – This section is targeted as a guideline for users who require custom serialization for their state "using" doesn't really work in this context. You could say "the use of" instead, but it doesn't add anything.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user alpinegizmo commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4006#discussion_r118904745

          — Diff: docs/dev/stream/state.md —
          @@ -429,3 +429,120 @@ public static class CounterSource

          Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.

          +## Custom serialization for managed state
          +
          +This section is targeted as a guideline for users who require using custom serialization for their state, covering how
          +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using
          +Flink's own serializers, this section is irrelevant and can be skipped.
          +
          +### Using custom serializers
          +
          +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required
          +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's
          +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state.
          +
          +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
          +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:
          +
          +

          {% highlight java %}
          +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
          + new ListStateDescriptor<>(
          + "state-name",
          + new TypeSerializer<> {...});
          +
          +checkpointedState = getRuntimeContext().getListState(descriptor);
          +{% endhighlight %}
          +
          +### Handling serializer upgrades and compatibility
          +
          +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any
          +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer
          +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility,
          +and is replaced as the new serializer for the state.
          +
          +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state,
          +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility
          +is provided through the following two methods of the `TypeSerializer` interface:
          +
          +{% highlight java %}

          +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
          +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
          +

          {% endhighlight %}

          +
          +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a
          +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the
          +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot
          +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This
          — End diff –

          I don't understand the use of "confront" in this context. Perhaps you mean something like "determine the compatibility of" or "verify the compatibility of" ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118904745 — Diff: docs/dev/stream/state.md — @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's + [type serialization framework] (../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + + {% highlight java %} +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "state-name", + new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); + {% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This — End diff – I don't understand the use of "confront" in this context. Perhaps you mean something like "determine the compatibility of" or "verify the compatibility of" ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user alpinegizmo commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4006#discussion_r118906172

          — Diff: docs/dev/stream/state.md —
          @@ -429,3 +429,140 @@ public static class CounterSource

          Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.

          +## Custom serialization for managed state
          +
          +This section is targeted as a guideline for users who require using custom serialization for their state, covering how
          +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using
          +Flink's own serializers, this section is irrelevant and can be skipped.
          +
          +### Using custom serializers
          +
          +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required
          +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's
          +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state.
          +
          +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
          +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:
          +
          +

          {% highlight java %}
          +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
          + new ListStateDescriptor<>(
          + "state-name",
          + new TypeSerializer<> {...});
          +
          +checkpointedState = getRuntimeContext().getListState(descriptor);
          +{% endhighlight %}
          +
          +### Handling serializer upgrades and compatibility
          +
          +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any
          +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer
          +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility,
          +and is replaced as the new serializer for the state.
          +
          +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state,
          +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility
          +is provided through the following two methods of the `TypeSerializer` interface:
          +
          +{% highlight java %}

          +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
          +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
          +

          {% endhighlight %}
          +
          +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a
          +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the
          +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot
          +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This
          +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure
          +the new serializer in the case that it is incompatible.
          +
          +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the
          +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible
          +with their previous configuration.
          +
          +The following subsections illustrate guidelines to implement these two methods when using custom serializers.
          +
          +#### Implementing the `snapshotConfiguration` method
          +
          +The serializer's configuration snapshot should capture enough information such that on restore, the information
          +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible.
          +This could typically contain information about the serializer's parameters or binary format of the serialized data;
          +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized
          +bytes, and that it writes in the same binary format.
          +
          +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below
          +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`.
          +
          +{% highlight java %}
          +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable {
          + public abstract int getVersion();
          + public void read(DataInputView in) {...}
          + public void write(DataOutputView out) {...}
          +}
          +{% endhighlight %}

          +
          +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base
          +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and
          +not completely overridden.
          +
          +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer
          +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration
          +may change over time. By default, configuration snapshots are only compatible with the current version (as returned by
          +`getVersion`). To indicate that the configuration is compatible with other versions, override the `getCompatibleVersions`
          +method to return more version values. When reading from the checkpoint, you can use the `getReadVersion` method to
          +determine the version of the written configuration and adapt the read logic to the specific version.
          +
          +<span class="label label-danger">Attention</span> Do not mistaken the version of the serializer's configuration snapshot
          +to be related with upgrading the serializer. The exact same serializer can have different implementations of its
          — End diff –

          I imagine you meant to say something like "Do not confuse the version of the serializer's configuration snapshot
          as being related to upgrading the serializer." But I suggest something simpler like "The version of the serializer's configuration snapshot is *not* related to serializer upgrades."

          Show
          githubbot ASF GitHub Bot added a comment - Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118906172 — Diff: docs/dev/stream/state.md — @@ -429,3 +429,140 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's + [type serialization framework] (../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + + {% highlight java %} +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "state-name", + new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); + {% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + +#### Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersion(); + public void read(DataInputView in) {...} + public void write(DataOutputView out) {...} +} +{% endhighlight %} + +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and +not completely overridden. + +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration +may change over time. By default, configuration snapshots are only compatible with the current version (as returned by +`getVersion`). To indicate that the configuration is compatible with other versions, override the `getCompatibleVersions` +method to return more version values. When reading from the checkpoint, you can use the `getReadVersion` method to +determine the version of the written configuration and adapt the read logic to the specific version. + +<span class="label label-danger">Attention</span> Do not mistaken the version of the serializer's configuration snapshot +to be related with upgrading the serializer. The exact same serializer can have different implementations of its — End diff – I imagine you meant to say something like "Do not confuse the version of the serializer's configuration snapshot as being related to upgrading the serializer." But I suggest something simpler like "The version of the serializer's configuration snapshot is * not * related to serializer upgrades."
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4006#discussion_r118906667

          — Diff: docs/dev/stream/state.md —
          @@ -429,3 +429,120 @@ public static class CounterSource

          Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.

          +## Custom serialization for managed state
          +
          +This section is targeted as a guideline for users who require using custom serialization for their state, covering how
          +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using
          +Flink's own serializers, this section is irrelevant and can be skipped.
          +
          +### Using custom serializers
          +
          +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required
          +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's
          +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state.
          +
          +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
          +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:
          +
          +

          {% highlight java %}
          +ListStateDescriptor<Tuple2<String, Integer>> descriptor =
          + new ListStateDescriptor<>(
          + "state-name",
          + new TypeSerializer<> {...});
          +
          +checkpointedState = getRuntimeContext().getListState(descriptor);
          +{% endhighlight %}
          +
          +### Handling serializer upgrades and compatibility
          +
          +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any
          +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer
          +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility,
          +and is replaced as the new serializer for the state.
          +
          +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state,
          +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility
          +is provided through the following two methods of the `TypeSerializer` interface:
          +
          +{% highlight java %}

          +public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
          +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
          +

          {% endhighlight %}

          +
          +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a
          +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the
          +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot
          +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This
          — End diff –

          It was just to explain that the checkpointed serializer config snapshot will be provided to the new serializer to verify its compatibility. I'll use "provide" instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118906667 — Diff: docs/dev/stream/state.md — @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's + [type serialization framework] (../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + + {% highlight java %} +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "state-name", + new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); + {% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the new serializer of the same state via the counterpart method, `ensureCompatibility`. This — End diff – It was just to explain that the checkpointed serializer config snapshot will be provided to the new serializer to verify its compatibility. I'll use "provide" instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4006

          Thanks a lot for the helpful suggestions @alpinegizmo! I've addressed them.

          Also, an additional subsection "Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in user code" was added after your review. Really sorry for the race in updating that. Could you also have a look at that subsection?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4006 Thanks a lot for the helpful suggestions @alpinegizmo! I've addressed them. Also, an additional subsection "Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in user code" was added after your review. Really sorry for the race in updating that. Could you also have a look at that subsection?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user alpinegizmo commented on the issue:

          https://github.com/apache/flink/pull/4006

          @tzulitai I've reviewed the new subsection. I think it's fine as is.

          Show
          githubbot ASF GitHub Bot added a comment - Github user alpinegizmo commented on the issue: https://github.com/apache/flink/pull/4006 @tzulitai I've reviewed the new subsection. I think it's fine as is.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4006

          Thanks @alpinegizmo. I'll merge this as is then

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4006 Thanks @alpinegizmo. I'll merge this as is then
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4006

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4006
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for release-1.3 via 8abf9392ea2cce4d40987b50ccb681c76857dedf.
          Fixed for master via d7946aefc145b66c19fd236560189af12be5e277.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for release-1.3 via 8abf9392ea2cce4d40987b50ccb681c76857dedf. Fixed for master via d7946aefc145b66c19fd236560189af12be5e277.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development