Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-11956

504 Deadline Exceeded code for very large datasets in Python

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • None
    • None
    • io-py-gcp
    • None
    • Python 3.8, Apache Beam SDK 2.28, Google Dataflow

    Description

      I am building an application in Apache Beam and Python that runs in Google DataFlow. I am using the ReadFromSpanner method in apache_beam.io.gcp.experimental.spannerio. This works for most of my Spanner tables but the really large ones that are >16m rows tend to fail due to the following error:
       Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
      work_executor.execute()
      File "/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py", line 179, in execute
      op.start()
      File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
      File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
      File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
      File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "/usr/local/lib/python3.8/site-packages/future/utils/_init_.py", line 446, in raise_with_traceback
      raise exc.with_traceback(traceback)
      File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 550, in process
      for row in read_action(element['partitions']):
      File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 143, in _iter_
      self._consume_next()
      File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/streamed.py", line 116, in _consume_next
      response = six.next(self._response_iterator)
      File "/usr/local/lib/python3.8/site-packages/google/cloud/spanner_v1/snapshot.py", line 45, in _restart_on_unavailable
      for item in iterator:
      File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 116, in next
      six.raise_from(exceptions.from_grpc_error(exc), exc)
      File "<string>", line 3, in raise_from
      google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded [while running 'Read from Spanner/Read From Partitions']
      From my understanding this error comes from the ReadFromSpanner operation as it's workers have timed out.

      To solve this I have tried the following:

      • Changed the num_workers and disk_size_gb and added the --experiments=shuffle_mode=service flag as suggested in Google's Common error guidance
      • Changed the Machine Type from n1-standard-1 to n1-standard-2 from here

      My latest code is attached below. I am including Transformation for simple data wrangling in the rows.
       """Set pipeline arguments."""
      options = PipelineOptions(
      region=RUNNER_REGION,
      project=RUNNER_PROJECT_ID,
      runner=RUNNER,
      temp_location=TEMP_LOCATION,
      job_name=JOB_NAME,
      service_account_email=SA_EMAIL,
      setup_file=SETUP_FILE_PATH,
      disk_size_gb=500,
      num_workers=10,
      machine_type="n1-standard-2",
      save_main_session=True)

      """Build and run the pipeline."""
      with beam.Pipeline(options=options) as p:
      (p

      "Read from Spanner" >> ReadFromSpanner(SPANNER_PROJECT_ID, SPANNER_INSTANCE_ID, SPANNER_DB, sql=QUERY)
      "Transform elements into dictionary" >> beam.ParDo(Transformation)
      "Write new records to BQ" >> WriteToBigQuery(
      BIGQUERY_TABLE,
      schema=SCHEMA,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
      )
      potential solution is to edit the timeout control; I have seen this being available in Java but not in Python. How can I edit timeout control in Python or is there any other solution to this issue?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              sebastian-montero Sebastian Montero
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1h 40m
                  1h 40m