Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4940

Add support for broadcast state



    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • None
    • API / DataStream
    • None


      As mentioned in https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API we need broadcast state to support job patterns where one (or several) inputs are broadcast to all operator instances and where we keep state that that is mutated only based on input from broadcast inputs. This special restriction ensures that the broadcast state is the same on all parallel operator instances when checkpointing (except when using at-least-once mode). We therefore only have to checkpoint the state of one arbitrary instance, for example instance 0.

      For the different types of side inputs we need different types of state, luckily, the side input types align with these state types we currently have for keyed state:

      • ValueState
      • ListState
      • MapState

      We can therefore reuse keyed state backends for our purposes but need to put a more restricting API in front of it: mutation of broadcast state must only be allowed when actually processing broadcast input. If we don't have this check users can (by mistake) modify broadcast state. This would lead to incorrect results which are very hard to notice, much less debug.

      With the way the Flink state API works (users can get a State in open() and work with state by calling methods on that) we have to add special wrapping state classes that only allow modification of state when processing a broadcast element.

      For the API, I propose to add a new interface `InternalStateAccessor`:

       * Interface for accessing persistent state.
      public interface InternalStateAccessor {
        <N, S extends State> S state(
      			N namespace,
      			TypeSerializer<N> namespaceSerializer,
      			StateDescriptor<S, ?> stateDescriptor)}

      this is the same as `KeyedStateBackend.getPartitionedState()` but allows us to abstract away the special nature of broadcast state. This is also meant as an external interface and is not to be exposed to user functions. Only operators should deal with this.

      AbstractStreamOperator would get a new method `getBroadcastStateAccessor()` that returns an implementation of this interface. The implementation would have a KeyedStateBackend but wrap the state in special wrappers that only allow modification when processing broadcast elements (as mentioned above).

      On the lower implementation levels, we have to add a new entry for our state to `OperatorSnapshotResult`. For example:

      private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture;

      Also the CheckpointCoordinator and savepoint/checkpoint serialisation logic will have to be adapted to support this new kind of state. With the ongoing changes in supporting incremental snapshotting and other new features for `KeyedStateBackend` this should be coordinated with StephanEwen and/or stefanrichter83@gmail.com and/or xiaogang.shi. We also have to be very careful about maintaining compatibility with savepoints from older versions.


        There are no Sub-Tasks for this issue.



            kkl0u Kostas Kloudas
            aljoscha Aljoscha Krettek
            2 Vote for this issue
            11 Start watching this issue