Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3806

DirectRunner hangs if multiple timers set in the same bundle

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • 2.5.0
    • runner-direct
    • None

    Description

      See the repro below:

      package com.simbly.beam.cassandra;
      
      import org.apache.beam.sdk.coders.KvCoder;
      import org.apache.beam.sdk.coders.StringUtf8Coder;
      import org.apache.beam.sdk.state.TimeDomain;
      import org.apache.beam.sdk.state.Timer;
      import org.apache.beam.sdk.state.TimerSpec;
      import org.apache.beam.sdk.state.TimerSpecs;
      import org.apache.beam.sdk.testing.PAssert;
      import org.apache.beam.sdk.testing.TestPipeline;
      import org.apache.beam.sdk.testing.TestStream;
      import org.apache.beam.sdk.transforms.DoFn;
      import org.apache.beam.sdk.transforms.ParDo;
      import org.apache.beam.sdk.values.KV;
      import org.apache.beam.sdk.values.PCollection;
      import org.joda.time.Duration;
      import org.junit.Rule;
      import org.junit.Test;
      
      public class DirectRunnerTest {
      
        @Rule
        public TestPipeline pipeline = TestPipeline.create();
      
        @Test
        public void badTimerBehavior() {
          TestStream<KV<String, String>> stream = TestStream
              .create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
              .addElements(KV.of("key1", "v1"))
              .advanceWatermarkToInfinity();
      
          PCollection<String> result = pipeline
              .apply(stream)
              .apply(ParDo.of(new TestDoFn()));
          PAssert.that(result).containsInAnyOrder("It works");
      
          pipeline.run().waitUntilFinish();
        }
      
        private static class TestDoFn extends DoFn<KV<String, String>, String> {
          @TimerId("timer")
          private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
      
          @ProcessElement
          public void process(ProcessContext c,
              @TimerId("timer") Timer timer) {
            timer.offset(Duration.standardMinutes(10)).setRelative();
            timer.offset(Duration.standardMinutes(30)).setRelative();
          }
      
          @OnTimer("timer")
          public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
            c.output("It works");
          }
        }
      }
      

      From inspection, this seems to be caused by the logic in WatermarkManager, which does the following if there are multiple timers for akey:

      1. Adds the first timer to the `pendingTimers`, `keyTimers`, and `existingTimersForKey`.
      2. Removes the first timer from `keyTimers`
      3. Adds the second timer to `keyTimers` and `existingTimersForKey`.

      This leads to inconsistencies since pendingTimers has only the first timer, keyTimers only the second, and existingTimers has both. This becomes more problematic since one of these lists is used for firing (and thus releasing holds) and the other is used for holds.

      Attachments

        Issue Links

          Activity

            People

              tgroh Thomas Groh
              bchambers Ben Chambers
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 40m
                  40m