Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
None
-
None
Description
Running this in DirectRunner.
Test case:
def test_multiple_accumulating_firings(self): # PCollection will contain elements from 1 to 10. elements = [i for i in range(1, 11)] ts = TestStream().advance_watermark_to(0) for i in elements: ts.add_elements([str(i)]) if i % 5 == 0: ts.advance_watermark_to(i) ts.advance_processing_time(5) with TestPipeline() as p: _ = (p | ts | beam.WindowInto( FixedWindows(10), accumulation_mode=trigger.AccumulationMode.ACCUMULATING, trigger=AfterWatermark( early=AfterAll( AfterCount(1), AfterProcessingTime(5)) )) | beam.ParDo(self.record_dofn())) # The trigger should fire twice. Once after 5 seconds, and once after 10. # The firings should accumulate the output. first_firing = [str(i) for i in elements if i <= 5] second_firing = [str(i) for i in elements] # Assert that we have two firings. self.assertEqual(2, len(TriggerPipelineTest.all_firings)) self.assertListEqual(first_firing + second_firing, TriggerPipelineTest.all_records)
Failure:
====================================================================== FAIL: test_multiple_accumulating_firings (apache_beam.transforms.trigger_test.TriggerPipelineTest) ---------------------------------------------------------------------- Traceback (most recent call last): File "apache_beam/transforms/trigger_test.py", line 488, in test_multiple_accumulating_firings TriggerPipelineTest.all_records) AssertionError: Lists differ: ['1', '2', '3', '4', '5', '1',... != ['1', '2', '3', '4', '5', '6',... First differing element 5: '1' '6' First list contains 5 additional elements. First extra element 10: '6' - ['1', '2', '3', '4', '5', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10'] ? ------------------------- + ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
Attachments
Issue Links
- links to