Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.2.0
-
None
-
None
Description
assert_that does not work for AfterWatermark timers.
Easy way to reproduce: modify test_gbk_execution [1] in this form:
def test_this(self): test_stream = (TestStream() .add_elements(['a', 'b', 'c']) .advance_watermark_to(20)) def fnc(x): print 'fired_elem:', x return x options = PipelineOptions() options.view_as(StandardOptions).streaming = True p = TestPipeline(options=options) records = (p | test_stream | beam.WindowInto( FixedWindows(15), trigger=trigger.AfterWatermark(early=trigger.AfterCount(2)), accumulation_mode=trigger.AccumulationMode.ACCUMULATING) | beam.Map(lambda x: ('k', x)) | beam.GroupByKey()) assert_that(records, equal_to([ ('k', ['a', 'b', 'c'])])) p.run()
This test will pass, but if the .advance_watermark_to(20) is removed, the test will fail. However, both cases fire the same elements:
fired_elem: ('k', ['a', 'b', 'c'])
fired_elem: ('k', ['a', 'b', 'c'])
In the passing case, they correspond to the sorted_actual inside the assert_that. In the failing case:
sorted_actual: [('k', ['a', 'b', 'c']), ('k', ['a', 'b', 'c'])]
sorted_actual: []