Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-3863

AfterProcessingTime trigger doesn't fire reliably

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.1.0, 2.2.0, 2.3.0
    • 2.13.0
    • runner-flink
    • None

    Description

      Issue

      Beam AfterProcessingTime trigger doesn't fire always reliably after a configured delay.

      The following job triggers should fire after watermark passes the end of the window and then every 5 seconds for late data and the finally at the end of allowed lateness.

      Expected behaviour

      Late firing after processing time trigger should fire after 5 seconds since first late records arrive in the pane.

      Actual behaviour

      From my testings late triggers works for some keys but not for the other - it's pretty random which keys are affected. The DummySource generates 15 distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one late record. In case late trigger firing is missed it won't fire until the allowed lateness period. 

      Job code

      String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};
      
      FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
      Pipeline pipeline = Pipeline.create(options);
      PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
              .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
                      .triggering(AfterWatermark.pastEndOfWindow()
                              .withLateFirings(
                                      AfterProcessingTime
                                              .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
                      .accumulatingFiredPanes()
                      .withAllowedLateness(Duration.standardMinutes(2), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
              );
      apply.apply(Count.perElement())
              .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
                  @ProcessElement
                  public void process(ProcessContext context, BoundedWindow window) {
                      LOG.info("Count: {}. For window {}, Pane {}", context.element(), window, context.pane());
                  }
              }));
      
      pipeline.run().waitUntilFinish();

       

      How can you replicate the issue?

       I've created a github repo https://github.com/pbartoszek/BEAM-3863_late_trigger with the code shown above. Please check out the README file for details how to replicate the issue.

      What's is causing the issue?

      I explained the cause in PR.

       

      Attachments

        Issue Links

          Activity

            People

              mxm Maximilian Michels
              pawelbartoszek Pawel Bartoszek
              Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 40m
                  2h 40m