Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10660

[Go SDK] Implement State and Timer support

Details

    • New Feature
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • sdk-go
    • None

    Description

      There's presently no mechanism to specify timers in the Go SDK, or use them at all. The work would be designing the user facing code, and mechanisms, and plumbing through timers properly. For ecample they can't be conflicting with other user facing constructs like Emitter functions and iterator functions.

      However there's an abundance of work to handle before starting to deal with state and timers though.

      While timers should work in batch, they're commonly more appropriate for streaming which the SDK doesn't support very well at the moment. DoFns need to be able to Self Checkpoint in order to behave as a streaming source (early checkpointing allows a bundle to self terminate, so it can be rescheduled later or as a minor way to split to multiple workers.). We should also implement Triggers and Advanced/Custom window fns first as those are simpler ways to get some of the advanced functions that timers allow for. We also need to be able to set and propagate the watermark correctly through the SDK (and validate that we do).

      See the programming guide for a fuller description of State and Timers
      https://beam.apache.org/documentation/programming-guide/#state-and-timers 

      • Design an idiomatic Go approach to Timers and State processing for DoFns
        • Go doesn’t support annotation like constructs, with the exception of struct field tags.
        • Design likely requires new framework side marker types.
        • Design likely requires using field tags.
        • Needs to allow customization for state types. (easier post generics in Go, but an design that doesn’t require that would be viable sooner)
      • State concerns:
        • Should support deferred batch reads of multiple states
        • Needs to be expandable to handle ValueState, Combining State, and BagState
      • Timer concerns: 
        • Needs to handle Event and Processing Time timers.
        • Dynamic Timer tags (likely the one and only way to handle Go SDK timers)
        • Needs to introduce an “OnTimer” method, and associated validation.

      Similar locations need changing relative to the Map Side Inputs https://issues.apache.org/jira/browse/BEAM-3293 

      On the execution layer, the new forms would need to be added like for exec/sideinput.go
      https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sideinput.go
      The inputs layer, for the actual abstraction using reflection:
      https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/input.go

      But for specifically handling State (which leverages the state API in a more sophisticated way than Side Inputs do) and Timers. The State API manager implementation is handled in the harness https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go 

      The funcx package would need to be updated to detect the new parameter forms
      https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/fn.go
      https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/sideinput.go

      as well has the DoFn graph validation code
      https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/fn.go#L566

      They would need to be correctly translated into the pipeline protos:
      https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L315
      and finally back to the newly created handlers in the exec package.
      https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402

      The SideInputCache would need to be changed to be a full [UserState cache]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/harness.go#L101] as the state_caching protocol URN doesn't make a distinction between side inputs and user state, and we should not break behavior.

      It's likely other changes are necessary to handle specifics for state and timers.

      If implemented pre-generics, the code generator frontend, and backend would need to be updated to detect and generate code for efficient no-reflection overhead map access functions if necessary 

      https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/shimx/generate.go
      https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/starcgenx/starcgenx.go

      Unit must be added throughout and Integration tests should be added to verify the functionality against portable beam runners.
      https://github.com/apache/beam/tree/master/sdks/go/test/integration/primitives

      And of course, the user GoDoc should be updated for the support.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              lostluck Robert Burke
              Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: