Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12659

WriteToBigQuery in BundleBasedDirectRunner fails when method is FILE_LOADS

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.31.0
    • None
    • io-py-gcp, runner-py-direct
    • Ubuntu 20.04 (running inside Docker, python:3.8-slim)

    Description

      `WriteToBigQuery` fails when using the `FILE_LOADS` method in the `BundleBasedDirectRunner`.

      The issue appears to be in `wait_for_bq_job`, where the function expects `job_reference` to be an actual JobReference instance and not a string. However, the `WaitForBQJobs` DoFn appears to be [passing a string](https://github.com/apache/beam/blob/5a029fd97d663e19a9bcd6bff61648bccbd7f95b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L730) as the argument. I believe this is during the copy step, and I'm not calling this code directly (so unfortunately I can't just pass a TableReference instance myself).

      Here is a traceback:

       

      request_worker_1      | ERROR:root:Traceback (most recent call last):
      request_worker_1      |   File "/app/main.py", line 209, in process_message
      request_worker_1      |     construct_and_run_pipeline(request)
      request_worker_1      |   File "/app/main.py", line 190, in construct_and_run_pipeline
      request_worker_1      |     return result.wait_until_finish()
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 588, in wait_until_finish
      request_worker_1      |     self._executor.await_completion()
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 433, in await_completion
      request_worker_1      |     self._executor.await_completion()
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 482, in await_completion
      request_worker_1      |     raise t(v).with_traceback(tb)
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 371, in call
      request_worker_1      |     self.attempt_call(
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 414, in attempt_call
      request_worker_1      |     evaluator.process_element(value)
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 880, in process_element
      request_worker_1      |     self.runner.process(element)
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1225, in process
      request_worker_1      |     self._reraise_augmented(exn)
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1306, in _reraise_augmented
      request_worker_1      |     raise new_exn.with_traceback(tb)
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1223, in process
      request_worker_1      |     return self.do_fn_invoker.invoke_process(windowed_value)
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 752, in invoke_process
      request_worker_1      |     self._invoke_process_per_window(
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 877, in _invoke_process_per_window
      request_worker_1      |     self.process_method(*args_for_process),
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process
      request_worker_1      |     self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
      request_worker_1      |   File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 562, in wait_for_bq_job
      request_worker_1      |     job_reference.projectId, job_reference.jobId, job_reference.location)
      request_worker_1      | AttributeError: 'str' object has no attribute 'projectId' [while running 'write tweets to bigquery/Write/BigQueryBatchFileLoads/WaitForTempTableLoadJobs']
      

       

      Here is the `WriteToBigQuery` step that is failing (note that the callable passed for `table` returns a TableReference instance):

      WriteToBigQuery(
           table=lambda row: bigquery_tools.parse_table_reference(row["table_name"]),
           create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
           ignore_insert_ids=True,
           method="FILE_LOADS", # using STREAMING_INSERTS 'fixes' the issue
           batch_size=int(os.getenv("BIGQUERY_BATCH_SIZE", "10")),
           schema=schema,
      )
      

       

      Note that this issue does not occur when using the standard `DirectRunner`, nor does it occur when using the `STREAMING_INSERTS` method.

      Thanks! (And apologies if I left out any important information. This is the first issue I've opened here.)

      Attachments

        Activity

          People

            Unassigned Unassigned
            milesmcc Miles McCain
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: