Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6813

Issues with state + timers in java Direct Runner (state cell is null)

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.11.0
    • Fix Version/s: 2.14.0
    • Component/s: runner-direct
    • Labels:
      None

      Description

      I was experimenting with a stateful DoFn with timers, and ran into a weird bug where a state cell I was writing to would come back as null when I read it inside a timer callback.

      I've attached the code below [1] (please excuse the scala  ).

      After I dug into this a little bit, I found that the state's value was present in the `underlying` table in CopyOnAccessMemoryStateTable [2], but not set in the `stateTable` itself on the instance. [3]   Based on my very rudimentary understanding of how this works in the direct runner, it seems like commit() is not being called on the state table before the timer is firing?
       
      [1]

      private final class AggregatorDoFn[K, V, Acc, Out](
        combiner: CombineFn[V, Acc, Out],
        keyCoder: Coder[K],
        accumulatorCoder: Coder[Acc]
      ) extends DoFn[KV[K, V], KV[K, Out]] {
      
        @StateId(KeyId)
        private final val keySpec = StateSpecs.value(keyCoder)
      
        @StateId(AggregationId)
        private final val stateSpec = StateSpecs.combining(accumulatorCoder, combiner)
      
        @StateId("numElements")
        private final val numElementsSpec = StateSpecs.combining(Sum.ofLongs())
      
        @TimerId(FlushTimerId)
        private final val flushTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME)
      
        @ProcessElement
        def processElement(
          @StateId(KeyId) key: ValueState[K],
          @StateId(AggregationId) state: CombiningState[V, Acc, Out],
          @StateId("numElements") numElements: CombiningState[JLong, _, JLong],
          @TimerId(FlushTimerId) flushTimer: Timer,
          @Element element: KV[K, V],
          window: BoundedWindow
        ): Unit = {
          key.write(element.getKey)
          state.add(element.getValue)
          numElements.add(1L)
      
          if (numElements.read() == 1) {
            flushTimer
              .offset(Duration.standardSeconds(10))
              .setRelative()
          }
        }
      
        @OnTimer(FlushTimerId)
        def onFlushTimer(
          @StateId(KeyId) key: ValueState[K],
          @StateId(AggregationId) state: CombiningState[V, _, Out],
          @StateId("numElements") numElements: CombiningState[JLong, _, JLong],
          output: OutputReceiver[KV[K, Out]]
        ): Unit = {
          if (numElements.read() > 0) {
            val k = key.read()
            output.output(
              KV.of(k, state.read())
            )
          }
          numElements.clear()
        }
      }

      [2]
      https://imgur.com/a/xvPR5nd

      [3]
      https://imgur.com/a/jznMdaQ
       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                janl Jan Lukavský
                Reporter:
                SteveNiemitz Steve Niemitz
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: