Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10660

[Go SDK] Implement State and Timer support


    • New Feature
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • sdk-go
    • None


      There's presently no mechanism to specify timers in the Go SDK, or use them at all. The work would be designing the user facing code, and mechanisms, and plumbing through timers properly. For ecample they can't be conflicting with other user facing constructs like Emitter functions and iterator functions.

      However there's an abundance of work to handle before starting to deal with state and timers though.

      While timers should work in batch, they're commonly more appropriate for streaming which the SDK doesn't support very well at the moment. DoFns need to be able to Self Checkpoint in order to behave as a streaming source (early checkpointing allows a bundle to self terminate, so it can be rescheduled later or as a minor way to split to multiple workers.). We should also implement Triggers and Advanced/Custom window fns first as those are simpler ways to get some of the advanced functions that timers allow for. We also need to be able to set and propagate the watermark correctly through the SDK (and validate that we do).

      See the programming guide for a fuller description of State and Timers

      • Design an idiomatic Go approach to Timers and State processing for DoFns
        • Go doesn’t support annotation like constructs, with the exception of struct field tags.
        • Design likely requires new framework side marker types.
        • Design likely requires using field tags.
        • Needs to allow customization for state types. (easier post generics in Go, but an design that doesn’t require that would be viable sooner)
      • State concerns:
        • Should support deferred batch reads of multiple states
        • Needs to be expandable to handle ValueState, Combining State, and BagState
      • Timer concerns: 
        • Needs to handle Event and Processing Time timers.
        • Dynamic Timer tags (likely the one and only way to handle Go SDK timers)
        • Needs to introduce an “OnTimer” method, and associated validation.

      Similar locations need changing relative to the Map Side Inputs https://issues.apache.org/jira/browse/BEAM-3293 

      On the execution layer, the new forms would need to be added like for exec/sideinput.go
      The inputs layer, for the actual abstraction using reflection:

      But for specifically handling State (which leverages the state API in a more sophisticated way than Side Inputs do) and Timers. The State API manager implementation is handled in the harness https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go 

      The funcx package would need to be updated to detect the new parameter forms

      as well has the DoFn graph validation code

      They would need to be correctly translated into the pipeline protos:
      and finally back to the newly created handlers in the exec package.

      The SideInputCache would need to be changed to be a full [UserState cache]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/harness.go#L101] as the state_caching protocol URN doesn't make a distinction between side inputs and user state, and we should not break behavior.

      It's likely other changes are necessary to handle specifics for state and timers.

      If implemented pre-generics, the code generator frontend, and backend would need to be updated to detect and generate code for efficient no-reflection overhead map access functions if necessary 


      Unit must be added throughout and Integration tests should be added to verify the functionality against portable beam runners.

      And of course, the user GoDoc should be updated for the support.


        Issue Links



              Unassigned Unassigned
              lostluck Robert Burke




                  Issue deployment