Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5991

Expose Broadcast Operator State through public APIs

    Details

      Description

      The broadcast operator state functionality was added in FLINK-5265, it just hasn't been exposed through any public APIs yet.

      Currently, we have 2 streaming connector features for 1.3 that are pending on broadcast state: rescalable Kinesis / Kafka consumers with shard / partition discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast state for the 1.3 release also.

      This JIRA also serves the purpose to discuss how we want to expose it.

      To initiate the discussion, I propose:

      1. For the more powerful CheckpointedFunction, add the following to the OperatorStateStore interface:

      <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor);
      
      <T extends Serializable> ListState<T> getBroadcastSerializableListState(String stateName);
      

      2. For a simpler ListCheckpointed variant, we probably should have a separate BroadcastListCheckpointed interface.

      Extending ListCheckpointed to let the user define either the list state type of either PARTITIONABLE or BROADCAST might also be possible, if we can rely on a contract that the value doesn't change. Or we expose a defining method (e.g. getListStateType()) that is called only once in the operator. This would break user code, but can be considered because it is marked as PublicEvolving.

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          I prefer to only expose it on the operator state store and leave the ListCheckpointed interface simple as it is now.

          Show
          aljoscha Aljoscha Krettek added a comment - I prefer to only expose it on the operator state store and leave the ListCheckpointed interface simple as it is now.
          Hide
          srichter Stefan Richter added a comment -

          I agree with Aljoscha Krettek 's suggestion.

          Show
          srichter Stefan Richter added a comment - I agree with Aljoscha Krettek 's suggestion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3508

          FLINK-5991 [state-backend, streaming] Expose Broadcast Operator State

          This PR exposes broadcast operator state through the `CheckpointedFunction` interface, by adding broadcast state access methods to `OperatorStateStore`.

          Since the functionality was already internally available and had test workarounds using casting, this PR simply removes those casts and properly tests the functionality through the `OperatorStateStore` interface.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-5991

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3508.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3508


          commit 84e02d31f3e77af679728f544629becf2a857be9
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-03-10T09:22:22Z

          FLINK-5991 [state-backend, streaming] Expose Broadcast Operator State

          This commit exposes broadcast operator state through the
          `CheckpointedFunction` interface, by adding broadcast state access
          methods to `OperatorStateStore`.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3508 FLINK-5991 [state-backend, streaming] Expose Broadcast Operator State This PR exposes broadcast operator state through the `CheckpointedFunction` interface, by adding broadcast state access methods to `OperatorStateStore`. Since the functionality was already internally available and had test workarounds using casting, this PR simply removes those casts and properly tests the functionality through the `OperatorStateStore` interface. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5991 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3508.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3508 commit 84e02d31f3e77af679728f544629becf2a857be9 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-03-10T09:22:22Z FLINK-5991 [state-backend, streaming] Expose Broadcast Operator State This commit exposes broadcast operator state through the `CheckpointedFunction` interface, by adding broadcast state access methods to `OperatorStateStore`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          R: @aljoscha @StefanRRichter

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 R: @aljoscha @StefanRRichter
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3508

          The PR re-introduces methods that have been removed from the public interface before the release. Reason the remove the methods was that we had too little time to make a final decision on the API. As this matches my suggestion, +1 from me. However, @aljoscha should also give his thumps up because I think is this is the point where we should catch up with our API discussion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 The PR re-introduces methods that have been removed from the public interface before the release. Reason the remove the methods was that we had too little time to make a final decision on the API. As this matches my suggestion, +1 from me. However, @aljoscha should also give his thumps up because I think is this is the point where we should catch up with our API discussion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3508

          Looks good in general. Some thoughts on refinement:

          • I think we should not really encourage the use of Java Serialization, so I would suggest to remove the serializable shortcut for broadcast state. We have it for the default operator state only as a bridge to mimick the previous behavior (easier migration)
          • Since broadcast state already defines its redistribution pattern (broadcast), the shape of the datastructure is orthogonal. We can for example also have broadcast map state, broadcast value state, etc. Hence, I would suggest to take "list" somewhere into the name of the methods.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3508 Looks good in general. Some thoughts on refinement: I think we should not really encourage the use of Java Serialization, so I would suggest to remove the serializable shortcut for broadcast state. We have it for the default operator state only as a bridge to mimick the previous behavior (easier migration) Since broadcast state already defines its redistribution pattern (broadcast), the shape of the datastructure is orthogonal. We can for example also have broadcast map state, broadcast value state, etc. Hence, I would suggest to take "list" somewhere into the name of the methods.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3508

          I would like us to take some time and think about the name. We are about to introduce a thing called "broadcast state" somewhat soon in the effort to make streaming joins possible. This broadcast state will provide an interface very similar to the current keyed state (we'll probably reuse the descriptors and state interfaces) but be checkpointed only on one operator because we only allow modifications based on broadcast input.

          I propose to rename the state we're talking about here to `UnionState` (or something) similar because what it does is take the snapshotted state from all operators and when restoring sends the union of that to all operators.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 I would like us to take some time and think about the name. We are about to introduce a thing called "broadcast state" somewhat soon in the effort to make streaming joins possible. This broadcast state will provide an interface very similar to the current keyed state (we'll probably reuse the descriptors and state interfaces) but be checkpointed only on one operator because we only allow modifications based on broadcast input. I propose to rename the state we're talking about here to `UnionState` (or something) similar because what it does is take the snapshotted state from all operators and when restoring sends the union of that to all operators.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3508

          +1 on not exposing the Java Serialisation shortcut, btw. I was very unhappy that we even have it for the normal operator state. 😃

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 +1 on not exposing the Java Serialisation shortcut, btw. I was very unhappy that we even have it for the normal operator state. 😃
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3508

          I wonder if there could also exist a case for broadcasting operator state (non-keyed), where only one operator instance is selected as sender and all others receive on restore. Furthermore, the union aspect may (or may not) happen at restore time, but not at the time that a user requests this state. For what this currently does, I think `ReplicatingState` describes it pretty well. Broadcast would be a good description from the operator's perspective: it broadcasts the generated data to all peers on restore.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 I wonder if there could also exist a case for broadcasting operator state (non-keyed), where only one operator instance is selected as sender and all others receive on restore. Furthermore, the union aspect may (or may not) happen at restore time, but not at the time that a user requests this state. For what this currently does, I think `ReplicatingState` describes it pretty well. Broadcast would be a good description from the operator's perspective: it broadcasts the generated data to all peers on restore.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3508

          @StefanRRichter I think I was faster ... 😉

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 @StefanRRichter I think I was faster ... 😉
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          Nice following the discussion here

          I think I'm also leaning a bit more towards naming it `UnionListState`.
          From the user's prospective, it seems to be more clearer if the name of the type of state describes how the state will be restored.

          This would imply that it would be ideal to rename the repartitionable list state methods, though.

          Another follow up question regarding the Java serialization shorcuts:
          should we also deprecate the `getSerializableListState` method to prepare it for removal?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 Nice following the discussion here I think I'm also leaning a bit more towards naming it `UnionListState`. From the user's prospective, it seems to be more clearer if the name of the type of state describes how the state will be restored. This would imply that it would be ideal to rename the repartitionable list state methods, though. Another follow up question regarding the Java serialization shorcuts: should we also deprecate the `getSerializableListState` method to prepare it for removal?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3508

          I think @StephanEwen wanted to keep a simple, intuitive way for users to register their state that does not require them to think about serializers etc.. While I understand this point, I am also leaning towards @aljoscha 's feeling that using this should at least be disencouraged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 I think @StephanEwen wanted to keep a simple, intuitive way for users to register their state that does not require them to think about serializers etc.. While I understand this point, I am also leaning towards @aljoscha 's feeling that using this should at least be disencouraged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3508

          We can deprecate, because it was added to provide a smooth path from Flink 1.1.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 We can deprecate, because it was added to provide a smooth path from Flink 1.1.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3508

          @StefanRRichter and @tzulitai what's the state of this? Can we go forward with calling it union state?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 @StefanRRichter and @tzulitai what's the state of this? Can we go forward with calling it union state?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3508

          Fine with me.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 Fine with me.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          @aljoscha @StefanRRichter I'll update the PR today, thanks for the reminder.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 @aljoscha @StefanRRichter I'll update the PR today, thanks for the reminder.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          @aljoscha @StefanRRichter Rebased on master to resolve conflicts.

          Also, the follow-up commits addresses the following:

          1. Remove / deprecate the Java serialization shortcuts. We have some connectors that use the deprecated shortcut method, but I think since changing those will require some more involved migration work, I propose to fix those as a separate JIRA / PR.

          2. According to Stephan's suggestion, refine the current naming of the operator state methods. `getOperatorState` is renamed to `getListState`, and the new union state is named as `getUnionListState`.

          Further question regarding deprecating Java serialization shortcuts: do we then also have a cause to deprecate the `ListCheckpointed` interface, since that itself is a shortcut also?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 @aljoscha @StefanRRichter Rebased on master to resolve conflicts. Also, the follow-up commits addresses the following: 1. Remove / deprecate the Java serialization shortcuts. We have some connectors that use the deprecated shortcut method, but I think since changing those will require some more involved migration work, I propose to fix those as a separate JIRA / PR. 2. According to Stephan's suggestion, refine the current naming of the operator state methods. `getOperatorState` is renamed to `getListState`, and the new union state is named as `getUnionListState`. Further question regarding deprecating Java serialization shortcuts: do we then also have a cause to deprecate the `ListCheckpointed` interface, since that itself is a shortcut also?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3508

          This looks very good. 👍

          And yes, I would be in favour of deprecating `ListCheckpointed` but let's see what @StefanRRichter and @StephanEwen think.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 This looks very good. 👍 And yes, I would be in favour of deprecating `ListCheckpointed` but let's see what @StefanRRichter and @StephanEwen think.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3508

          I would also be in favour of deprecating `ListCheckpointed`, but I understand the argument that we should provide an easy way of state access for the 90% cases without fiddling around with serializers.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 I would also be in favour of deprecating `ListCheckpointed`, but I understand the argument that we should provide an easy way of state access for the 90% cases without fiddling around with serializers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          Isn't the 90% cases already covered with `StateDescriptor(stateName, typeClass)`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 Isn't the 90% cases already covered with `StateDescriptor(stateName, typeClass)`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3508

          Well, this way is easy, but still slightly more complicated that the original `Checkpointed`. Personally, I would also consider that an easy-enough replacement. That is why my vote was also on deprecation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 Well, this way is easy, but still slightly more complicated that the original `Checkpointed`. Personally, I would also consider that an easy-enough replacement. That is why my vote was also on deprecation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          I've opened a separate JIRA to deprecate `ListCheckpointed`. Lets keep this PR self-contained in just refining the `OperatorStateStore` interface.

          I think this PR is still lacking an update to https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state before its good to go. Adding this

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 I've opened a separate JIRA to deprecate `ListCheckpointed`. Lets keep this PR self-contained in just refining the `OperatorStateStore` interface. I think this PR is still lacking an update to https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state before its good to go. Adding this
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          Updated managed operator state docs.
          @aljoscha @StefanRRichter could you have a final look at it? If there's no other problems I'll proceed to merge this. Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 Updated managed operator state docs. @aljoscha @StefanRRichter could you have a final look at it? If there's no other problems I'll proceed to merge this. Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3508#discussion_r109547924

          — Diff: docs/dev/stream/state.md —
          @@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream

            1. Using Managed Operator State

          -A stateful function can implement either the more general `CheckpointedFunction`
          +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction`
          interface, or the `ListCheckpointed<T extends Serializable>` interface.

          -In both cases, the non-keyed state is expected to be a `List` of serializable objects, independent from each other,
          -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
          -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
          -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
          -while `(test2, 2)` will go to task 1.
          -

                    1. ListCheckpointed
                      +#### CheckpointedFunction

          -The `ListCheckpointed` interface requires the implementation of two methods:
          -
          -

          {% highlight java %}
          -List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
          -
          -void restoreState(List<T> state) throws Exception;
          -{% endhighlight %}
          -
          -On `snapshotState()` the operator should return a list of objects to checkpoint and
          -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always
          -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
          -
          -##### CheckpointedFunction
          -
          -The `CheckpointedFunction` interface also requires the implementation of two methods:
          +The `CheckpointedFunction` interface provides access to non-keyed state with different
          +redistribution schemes. It requires the implementation of two methods:

          {% highlight java %}

          void snapshotState(FunctionSnapshotContext context) throws Exception;

          void initializeState(FunctionInitializationContext context) throws Exception;

          {% endhighlight %}

          -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized
          -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
          +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`,
          +is called every time the user-defined function is initialized, be that when the function is first initialized
          +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
          only the place where different types of state are initialized, but also where state recovery logic is included.

          -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that
          -uses state to buffer elements before sending them to the outside world:
          +Currently, list-style managed operator state is supported. The state
          +is expected to be a `List` of serializable objects, independent from each other,
          +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
          +non-keyed state can be redistributed. Depending on the state accessing method,
          +the following redistribution schemes are defined:
          +
          + - *Even-split redistribution:* Each operator returns a List of state elements. The whole state is logically a concatenation of
          — End diff –

          "Even-split" --> Not really sure what would be the best wording here ...
          Any ideas?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3508#discussion_r109547924 — Diff: docs/dev/stream/state.md — @@ -233,45 +229,44 @@ val counts: DataStream [(String, Int)] = stream Using Managed Operator State -A stateful function can implement either the more general `CheckpointedFunction` +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed<T extends Serializable>` interface. -In both cases, the non-keyed state is expected to be a `List` of serializable objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - ListCheckpointed +#### CheckpointedFunction -The `ListCheckpointed` interface requires the implementation of two methods: - - {% highlight java %} -List<T> snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List<T> state) throws Exception; -{% endhighlight %} - -On `snapshotState()` the operator should return a list of objects to checkpoint and -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. - -##### CheckpointedFunction - -The `CheckpointedFunction` interface also requires the implementation of two methods: +The `CheckpointedFunction` interface provides access to non-keyed state with different +redistribution schemes. It requires the implementation of two methods: {% highlight java %} void snapshotState(FunctionSnapshotContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception; {% endhighlight %} -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, +is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included. -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that -uses state to buffer elements before sending them to the outside world: +Currently, list-style managed operator state is supported. The state +is expected to be a `List` of serializable objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be redistributed. Depending on the state accessing method, +the following redistribution schemes are defined: + + - * Even-split redistribution: * Each operator returns a List of state elements. The whole state is logically a concatenation of — End diff – "Even-split" --> Not really sure what would be the best wording here ... Any ideas?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3508#discussion_r109610444

          — Diff: docs/dev/stream/state.md —
          @@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream

            1. Using Managed Operator State

          -A stateful function can implement either the more general `CheckpointedFunction`
          +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction`
          interface, or the `ListCheckpointed<T extends Serializable>` interface.

          -In both cases, the non-keyed state is expected to be a `List` of serializable objects, independent from each other,
          -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
          -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
          -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
          -while `(test2, 2)` will go to task 1.
          -

                    1. ListCheckpointed
                      +#### CheckpointedFunction

          -The `ListCheckpointed` interface requires the implementation of two methods:
          -
          -

          {% highlight java %}
          -List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
          -
          -void restoreState(List<T> state) throws Exception;
          -{% endhighlight %}
          -
          -On `snapshotState()` the operator should return a list of objects to checkpoint and
          -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always
          -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
          -
          -##### CheckpointedFunction
          -
          -The `CheckpointedFunction` interface also requires the implementation of two methods:
          +The `CheckpointedFunction` interface provides access to non-keyed state with different
          +redistribution schemes. It requires the implementation of two methods:

          {% highlight java %}

          void snapshotState(FunctionSnapshotContext context) throws Exception;

          void initializeState(FunctionInitializationContext context) throws Exception;

          {% endhighlight %}

          -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized
          -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
          +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`,
          +is called every time the user-defined function is initialized, be that when the function is first initialized
          +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
          only the place where different types of state are initialized, but also where state recovery logic is included.

          -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that
          -uses state to buffer elements before sending them to the outside world:
          +Currently, list-style managed operator state is supported. The state
          +is expected to be a `List` of serializable objects, independent from each other,
          +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
          +non-keyed state can be redistributed. Depending on the state accessing method,
          +the following redistribution schemes are defined:
          +
          + - *Even-split redistribution:* Each operator returns a List of state elements. The whole state is logically a concatenation of
          — End diff –

          Could use "Round-Robin redistribution". Maybe...

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3508#discussion_r109610444 — Diff: docs/dev/stream/state.md — @@ -233,45 +229,44 @@ val counts: DataStream [(String, Int)] = stream Using Managed Operator State -A stateful function can implement either the more general `CheckpointedFunction` +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed<T extends Serializable>` interface. -In both cases, the non-keyed state is expected to be a `List` of serializable objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - ListCheckpointed +#### CheckpointedFunction -The `ListCheckpointed` interface requires the implementation of two methods: - - {% highlight java %} -List<T> snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List<T> state) throws Exception; -{% endhighlight %} - -On `snapshotState()` the operator should return a list of objects to checkpoint and -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. - -##### CheckpointedFunction - -The `CheckpointedFunction` interface also requires the implementation of two methods: +The `CheckpointedFunction` interface provides access to non-keyed state with different +redistribution schemes. It requires the implementation of two methods: {% highlight java %} void snapshotState(FunctionSnapshotContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception; {% endhighlight %} -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, +is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included. -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that -uses state to buffer elements before sending them to the outside world: +Currently, list-style managed operator state is supported. The state +is expected to be a `List` of serializable objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be redistributed. Depending on the state accessing method, +the following redistribution schemes are defined: + + - * Even-split redistribution: * Each operator returns a List of state elements. The whole state is logically a concatenation of — End diff – Could use "Round-Robin redistribution". Maybe...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3508

          Nice update to the docs! And yeah, a separate issue for deprecation is very good. 👍

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 Nice update to the docs! And yeah, a separate issue for deprecation is very good. 👍
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3508

          Very good work and thanks for improving the documentation. I like the update. From what I have seen in the past, some user have mistaken the list-nature of the operator state and simply dumped lots of small elements in the list, that should not actually be the unit of repartitioning and sometimes even logically belonged together. I wonder if the different semantics in list state between the operator state and the keyed state can be confusing and error-prone for users and what we could do about this? A method called `getListState` might be a step in the wrong direction.

          Besides this, +1 from me.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 Very good work and thanks for improving the documentation. I like the update. From what I have seen in the past, some user have mistaken the list-nature of the operator state and simply dumped lots of small elements in the list, that should not actually be the unit of repartitioning and sometimes even logically belonged together. I wonder if the different semantics in list state between the operator state and the keyed state can be confusing and error-prone for users and what we could do about this? A method called `getListState` might be a step in the wrong direction. Besides this, +1 from me.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          I'm not so sure, but I think Stefan has a point with the potential confusion. Especially if in the future, if both the keyed state and the operator state are registered through the initialization context, it can definitely be confusing if both states have the `getListState` or even `getMapState` ... methods, but actually have completely different semantics.

          To address @StefanRRichter's concerns, perhaps we could
          1) Try to emphasise the semantic differences in the docs / Javadocs.
          2) Consider renaming `getListState` to `getXXXListState`? "XXX" needs to imply the round-robin redistribution scheme.
          I was also wondering about another possibility, which follows a completely different approach: `getRepartitionableListState(descriptor, REDISTRIBUTE_MODE)`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 I'm not so sure, but I think Stefan has a point with the potential confusion. Especially if in the future, if both the keyed state and the operator state are registered through the initialization context, it can definitely be confusing if both states have the `getListState` or even `getMapState` ... methods, but actually have completely different semantics. To address @StefanRRichter's concerns, perhaps we could 1) Try to emphasise the semantic differences in the docs / Javadocs. 2) Consider renaming `getListState` to `getXXXListState`? "XXX" needs to imply the round-robin redistribution scheme. I was also wondering about another possibility, which follows a completely different approach: `getRepartitionableListState(descriptor, REDISTRIBUTE_MODE)`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3508

          I would stick to `getListState()` and rely on users knowing on what kind of state store they are calling the method.

          In general, users should not use this interface too much, I think it's mostly there for sources, such as Kafka.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 I would stick to `getListState()` and rely on users knowing on what kind of state store they are calling the method. In general, users should not use this interface too much, I think it's mostly there for sources, such as Kafka.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3508

          In general, I agree with @aljoscha, however I have already heard at least one case where this confusion was leading to problems. Given the short time that we provide this feature, this could also come up more often in the future and is not so nice to change after the fact.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 In general, I agree with @aljoscha, however I have already heard at least one case where this confusion was leading to problems. Given the short time that we provide this feature, this could also come up more often in the future and is not so nice to change after the fact.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          I'd say lets keep the naming as is, since in general all of us agree on that already.

          I'll try to do some more work on the docs and Javadocs to make sure that at least the documentation clearly explains and emphasizes the semantical differences.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 I'd say lets keep the naming as is, since in general all of us agree on that already. I'll try to do some more work on the docs and Javadocs to make sure that at least the documentation clearly explains and emphasizes the semantical differences.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3508

          Merging after Travis turns green

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 Merging after Travis turns green
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3508

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3508
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/2ef4900

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development