Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.31.0
-
None
-
Ubuntu 20.04 (running inside Docker, python:3.8-slim)
Description
`WriteToBigQuery` fails when using the `FILE_LOADS` method in the `BundleBasedDirectRunner`.
The issue appears to be in `wait_for_bq_job`, where the function expects `job_reference` to be an actual JobReference instance and not a string. However, the `WaitForBQJobs` DoFn appears to be [passing a string](https://github.com/apache/beam/blob/5a029fd97d663e19a9bcd6bff61648bccbd7f95b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L730) as the argument. I believe this is during the copy step, and I'm not calling this code directly (so unfortunately I can't just pass a TableReference instance myself).
Here is a traceback:
request_worker_1 | ERROR:root:Traceback (most recent call last): request_worker_1 | File "/app/main.py", line 209, in process_message request_worker_1 | construct_and_run_pipeline(request) request_worker_1 | File "/app/main.py", line 190, in construct_and_run_pipeline request_worker_1 | return result.wait_until_finish() request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 588, in wait_until_finish request_worker_1 | self._executor.await_completion() request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 433, in await_completion request_worker_1 | self._executor.await_completion() request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 482, in await_completion request_worker_1 | raise t(v).with_traceback(tb) request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 371, in call request_worker_1 | self.attempt_call( request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/executor.py", line 414, in attempt_call request_worker_1 | evaluator.process_element(value) request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 880, in process_element request_worker_1 | self.runner.process(element) request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1225, in process request_worker_1 | self._reraise_augmented(exn) request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1306, in _reraise_augmented request_worker_1 | raise new_exn.with_traceback(tb) request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 1223, in process request_worker_1 | return self.do_fn_invoker.invoke_process(windowed_value) request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 752, in invoke_process request_worker_1 | self._invoke_process_per_window( request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/common.py", line 877, in _invoke_process_per_window request_worker_1 | self.process_method(*args_for_process), request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process request_worker_1 | self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0) request_worker_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 562, in wait_for_bq_job request_worker_1 | job_reference.projectId, job_reference.jobId, job_reference.location) request_worker_1 | AttributeError: 'str' object has no attribute 'projectId' [while running 'write tweets to bigquery/Write/BigQueryBatchFileLoads/WaitForTempTableLoadJobs']
Here is the `WriteToBigQuery` step that is failing (note that the callable passed for `table` returns a TableReference instance):
WriteToBigQuery( table=lambda row: bigquery_tools.parse_table_reference(row["table_name"]), create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, ignore_insert_ids=True, method="FILE_LOADS", # using STREAMING_INSERTS 'fixes' the issue batch_size=int(os.getenv("BIGQUERY_BATCH_SIZE", "10")), schema=schema, )
Note that this issue does not occur when using the standard `DirectRunner`, nor does it occur when using the `STREAMING_INSERTS` method.
Thanks! (And apologies if I left out any important information. This is the first issue I've opened here.)