Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
None
-
None
-
None
-
Python 3.8, Apache Beam SDK 2.28, Google Dataflow
Description
I am building an application in Apache Beam and Python that runs in Google DataFlow. I am using the ReadFromSpanner method in apache_beam.io.gcp.experimental.spannerio. This works for most of my Spanner tables but the really large ones that are >16m rows tend to fail due to the following error:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.8/site-packages/future/utils/_init_.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
for row in read_action(element['partitions']):
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in _iter_
self._consume_next()
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
response = six.next(self._response_iterator)
File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
for item in iterator:
File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']
From my understanding this error comes from the ReadFromSpanner operation as it's workers have timed out.
To solve this I have tried the following:
- Changed the num_workers and disk_size_gb and added the --experiments=shuffle_mode=service flag as suggested in Google's Common error guidance
- Changed the Machine Type from n1-standard-1 to n1-standard-2 from here
My latest code is attached below. I am including Transformation for simple data wrangling in the rows.
"""Set pipeline arguments."""
options = PipelineOptions(
region=RUNNER_REGION,
project=RUNNER_PROJECT_ID,
runner=RUNNER,
temp_location=TEMP_LOCATION,
job_name=JOB_NAME,
service_account_email=SA_EMAIL,
setup_file=SETUP_FILE_PATH,
disk_size_gb=500,
num_workers=10,
machine_type="n1-standard-2",
save_main_session=True)
"""Build and run the pipeline."""
with beam.Pipeline(options=options) as p:
(p
"Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY) |
"Transform elements into dictionary" >> beam.ParDo(Transformation) |
"Write new records to BQ" >> WriteToBigQuery( BIGQUERY_TABLE, schema=SCHEMA, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) ) A potential solution is to edit the timeout control; I have seen this being available in Java but not in Python. How can I edit timeout control in Python or is there any other solution to this issue? |
Attachments
Issue Links
- is duplicated by
-
BEAM-11953 504 Deadline Exceeded code for very large datasets in Python
- Resolved
-
BEAM-11954 504 Deadline Exceeded code for very large datasets in Python
- Resolved
-
BEAM-11955 504 Deadline Exceeded code for very large datasets in Python
- Resolved
- links to