Details
-
Bug
-
Status: Open
-
P1
-
Resolution: Unresolved
-
None
-
None
-
None
Description
not only there are more than 1 result per window, results for each window got duplicated as well.
here is some code I made to reproduce the issue, just run it with and without .with_fanout
if running with Dataflow runner, add appropriate gs://path/ in WriteToText
import apache_beam as beam from apache_beam.transforms import window from apache_beam.utils.timestamp import Timestamp class ListFn(beam.CombineFn): def create_accumulator(self): return [] def add_input(self, mutable_accumulator, element): return mutable_accumulator + [element] def merge_accumulators(self, accumulators): res = [] for accu in accumulators: res = res + accu return res def extract_output(self, accumulator): return accumulator p = beam.Pipeline() ( p | beam.Create([ window.TimestampedValue(1, Timestamp(seconds=1596216396)), window.TimestampedValue(2, Timestamp(seconds=1596216397)), window.TimestampedValue(3, Timestamp(seconds=1596216398)), window.TimestampedValue(4, Timestamp(seconds=1596216399)), window.TimestampedValue(5, Timestamp(seconds=1596216400)), window.TimestampedValue(6, Timestamp(seconds=1596216402)), window.TimestampedValue(7, Timestamp(seconds=1596216403)), window.TimestampedValue(8, Timestamp(seconds=1596216405))]) | beam.WindowInto(window.SlidingWindows(10, 5)) | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5) | beam.Map(repr) | beam.io.WriteToText("py-test-result", file_name_suffix='.json', num_shards=1)) p.run()