Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.24.0
-
None
-
None
-
Important
Description
In this case all message from Pub/Sub topic need accumulate in one text file per window, however WriteToFiles produce many file instead one
input = (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes) | "Decode" >> beam.Map(lambda x: x.decode('utf-8')) | 'Parse' >> beam.Map(parse_json) | 'Data w' >> beam.WindowInto( FixedWindows(60), trigger=trigger.AfterWatermark(), accumulation_mode=AccumulationMode.DISCARDING ) | 'Group elements into windows' >> beam.Reshuffle() ) event_data = (input | 'Filter events' >> beam.Filter(lambda x: x['t'] == 'event') | 'Encode ' >> beam.Map(lambda x: json.dumps(x)) | 'Write to file's' >> fileio.WriteToFiles( path='gs://some/gcs/bucket/', file_naming=fileio.default_file_naming( prefix='events', suffix='.txt' ), shards=1 ) )