Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10406

BigQueryBatchFileLoads does not bundle rows correctly in streaming mode in python

Details

    • Improvement
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.22.0
    • None
    • io-py-gcp
    • None

    Description

      We are using FILE_LOADS to write to big query in streaming mode using python. Input is coming from a pubsub topic with ~5000 reqs/sec and each request is around 6KB. We perform some transforms on the input and then write to BigQuery. 

       

      beam.io.WriteToBigQuery(
       table=table_name,
       schema=schema,
       dataset=dataset_name,
       project=project',
       method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
       triggering_frequency=2 * 60,
       create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
       write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND),
      )
      

       

      We noticed that each temporary GCS file created by the load process had a very small number of rows (~1-5). We are able to reproduce it both through direct runner and dataflow runnner. On debugging we believe the following to be an issue:

      In WriteRecordsToFile (apache_beam/io/gcp/bigquery_file_loads.py), we create destinations within start_bundle and clear them up in finish_bundle. When this is used within streaming mode, a typical bundle size within Pardo is coming out to be ~1-5. We do see Windowing applied before Pardo but since there's no GroupByKey, window doesn't affect Pardo. Below is a small code which can reproduce the issue highlighted:

       

       

      class WriteRecordsToFile(beam.DoFn):
       def start_bundle(self):
       print('start bundle')
       self.data = []
      
       def process(self, element):
       self.data.append(element)
      
       def finish_bundle(self):
       print('finish bundle', len(self.data))
       self.data = []
      
      
      def run(argv=None):
       parser = argparse.ArgumentParser()
       parser.add_argument(
       '--input_subscription',
       required=True,
       help='Input PubSub subscription of the form "projects/<project>/subscriptions/<subscription>".')
      
       known_args, pipeline_args = parser.parse_known_args(argv)
       with beam.Pipeline(argv=pipeline_args) as p:
       lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
       (lines
       | beam.WindowInto(beam.window.GlobalWindows(),
       trigger=trigger.Repeatedly(
       trigger.AfterAny(
       trigger.AfterProcessingTime(
       60),
       trigger.AfterCount(
       100))),
       accumulation_mode=trigger.AccumulationMode.DISCARDING)
       | beam.ParDo(WriteRecordsToFile()))

       

      In the above example, we see that start bundle is called very often and does not respect triggers. 

      To fix, the behavior of BigQueryBatchFileLoads, we suggest doing a grouping after the window triggers before calling the ParDo(WriteRecordsToFile).

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            nikunj-jira Nikunj Aggarwal
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 168h
                168h
                Remaining:
                Remaining Estimate - 168h
                168h
                Logged:
                Time Spent - Not Specified
                Not Specified