Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4940

Add support for broadcast state

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • None
    • API / DataStream
    • None

    Description

      As mentioned in https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API we need broadcast state to support job patterns where one (or several) inputs are broadcast to all operator instances and where we keep state that that is mutated only based on input from broadcast inputs. This special restriction ensures that the broadcast state is the same on all parallel operator instances when checkpointing (except when using at-least-once mode). We therefore only have to checkpoint the state of one arbitrary instance, for example instance 0.

      For the different types of side inputs we need different types of state, luckily, the side input types align with these state types we currently have for keyed state:

      • ValueState
      • ListState
      • MapState

      We can therefore reuse keyed state backends for our purposes but need to put a more restricting API in front of it: mutation of broadcast state must only be allowed when actually processing broadcast input. If we don't have this check users can (by mistake) modify broadcast state. This would lead to incorrect results which are very hard to notice, much less debug.

      With the way the Flink state API works (users can get a State in open() and work with state by calling methods on that) we have to add special wrapping state classes that only allow modification of state when processing a broadcast element.

      For the API, I propose to add a new interface `InternalStateAccessor`:

      /**
       * Interface for accessing persistent state.
       */
      @PublicEvolving
      public interface InternalStateAccessor {
        <N, S extends State> S state(
      			N namespace,
      			TypeSerializer<N> namespaceSerializer,
      			StateDescriptor<S, ?> stateDescriptor)}
      

      this is the same as `KeyedStateBackend.getPartitionedState()` but allows us to abstract away the special nature of broadcast state. This is also meant as an external interface and is not to be exposed to user functions. Only operators should deal with this.

      AbstractStreamOperator would get a new method `getBroadcastStateAccessor()` that returns an implementation of this interface. The implementation would have a KeyedStateBackend but wrap the state in special wrappers that only allow modification when processing broadcast elements (as mentioned above).

      On the lower implementation levels, we have to add a new entry for our state to `OperatorSnapshotResult`. For example:

      private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;
      

      Also the CheckpointCoordinator and savepoint/checkpoint serialisation logic will have to be adapted to support this new kind of state. With the ongoing changes in supporting incremental snapshotting and other new features for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or Stefan Richter and/or [~xiaogang.shi]. We also have to be very careful about maintaining compatibility with savepoints from older versions.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kkl0u Kostas Kloudas
            aljoscha Aljoscha Krettek
            Votes:
            2 Vote for this issue
            Watchers:
            11 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment