Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7122

Accumulating triggers seem not to work on DirectRunner

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • None
    • Not applicable
    • sdk-py-core
    • None

    Description

      Running this in DirectRunner.

      Test case:

        def test_multiple_accumulating_firings(self):
          # PCollection will contain elements from 1 to 10.
          elements = [i for i in range(1, 11)]
      
          ts = TestStream().advance_watermark_to(0)
          for i in elements:
            ts.add_elements([str(i)])
            if i % 5 == 0:
              ts.advance_watermark_to(i)
              ts.advance_processing_time(5)
      
          with TestPipeline() as p:
            _ = (p
                 | ts
                 | beam.WindowInto(
                     FixedWindows(10),
                     accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
                     trigger=AfterWatermark(
                         early=AfterAll(
                             AfterCount(1), AfterProcessingTime(5))
                     ))
                 | beam.ParDo(self.record_dofn()))
      
      
          # The trigger should fire twice. Once after 5 seconds, and once after 10.
          # The firings should accumulate the output.
          first_firing = [str(i) for i in elements if i <= 5]
          second_firing = [str(i) for i in elements]
      
          # Assert that we have two firings.
          self.assertEqual(2, len(TriggerPipelineTest.all_firings))
          self.assertListEqual(first_firing + second_firing,
                               TriggerPipelineTest.all_records)
      

      Failure:

      ======================================================================
      FAIL: test_multiple_accumulating_firings (apache_beam.transforms.trigger_test.TriggerPipelineTest)
      ----------------------------------------------------------------------
      Traceback (most recent call last):
        File "apache_beam/transforms/trigger_test.py", line 488, in test_multiple_accumulating_firings
          TriggerPipelineTest.all_records)
      AssertionError: Lists differ: ['1', '2', '3', '4', '5', '1',... != ['1', '2', '3', '4', '5', '6',...
      
      First differing element 5:
      '1'
      '6'
      
      First list contains 5 additional elements.
      First extra element 10:
      '6'
      
      - ['1', '2', '3', '4', '5', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
      ?                           -------------------------
      
      + ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
      

      Attachments

        Issue Links

          Activity

            People

              pabloem Pablo Estrada
              pabloem Pablo Estrada
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h