Details
-
Bug
-
Status: Triage Needed
-
P2
-
Resolution: Fixed
-
None
-
None
Description
The test below will always produce empty output, due to combiner lifting, which already combines all input values of one shard into one before grouping. To fix this, we shall not do Combiner lifting for the data-driven triggers.
l = [window.TimestampedValue(('a', 1), 1),
window.TimestampedValue(('b', 3), 3),
window.TimestampedValue(('a', 2), 2),
window.TimestampedValue(('a', 5), 5),]
result = (p | Map(lambda x : x) | 'window' >> beam.WindowInto(FixedWindows(6), trigger=trigger.AfterCount(2), accumulation_mode=trigger.AccumulationMode.DISCARDING) | beam.CombinePerKey(combine.Largest(1)))