Details
-
Improvement
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.22.0
-
None
-
None
Description
We are using FILE_LOADS to write to big query in streaming mode using python. Input is coming from a pubsub topic with ~5000 reqs/sec and each request is around 6KB. We perform some transforms on the input and then write to BigQuery.
beam.io.WriteToBigQuery( table=table_name, schema=schema, dataset=dataset_name, project=project', method=beam.io.WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=2 * 60, create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND), )
We noticed that each temporary GCS file created by the load process had a very small number of rows (~1-5). We are able to reproduce it both through direct runner and dataflow runnner. On debugging we believe the following to be an issue:
In WriteRecordsToFile (apache_beam/io/gcp/bigquery_file_loads.py), we create destinations within start_bundle and clear them up in finish_bundle. When this is used within streaming mode, a typical bundle size within Pardo is coming out to be ~1-5. We do see Windowing applied before Pardo but since there's no GroupByKey, window doesn't affect Pardo. Below is a small code which can reproduce the issue highlighted:
class WriteRecordsToFile(beam.DoFn): def start_bundle(self): print('start bundle') self.data = [] def process(self, element): self.data.append(element) def finish_bundle(self): print('finish bundle', len(self.data)) self.data = [] def run(argv=None): parser = argparse.ArgumentParser() parser.add_argument( '--input_subscription', required=True, help='Input PubSub subscription of the form "projects/<project>/subscriptions/<subscription>".') known_args, pipeline_args = parser.parse_known_args(argv) with beam.Pipeline(argv=pipeline_args) as p: lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription) (lines | beam.WindowInto(beam.window.GlobalWindows(), trigger=trigger.Repeatedly( trigger.AfterAny( trigger.AfterProcessingTime( 60), trigger.AfterCount( 100))), accumulation_mode=trigger.AccumulationMode.DISCARDING) | beam.ParDo(WriteRecordsToFile()))
In the above example, we see that start bundle is called very often and does not respect triggers.
To fix, the behavior of BigQueryBatchFileLoads, we suggest doing a grouping after the window triggers before calling the ParDo(WriteRecordsToFile).