Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12489

Python GCSIO upload does not retry

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: P2
    • Resolution: Unresolved
    • Affects Version/s: 2.28.0, 2.29.0, 2.30.0
    • Fix Version/s: None
    • Component/s: io-py-gcp
    • Labels:
      None

      Description

      vI have a streaming pipeline running in Dataflow which is loading data to Google Cloud Storage. I get sporadic errors like "Error in _start_upload while inserting file ...". The underlying issue seems to be that there is no retry logic applied in method _start_upload here :

      (There is even a TODO stating the need for this implementation.): 

      # TODO(silviuc): Refactor so that retry logic can be applied.
       # There is retry logic in the underlying transfer library but we should make
       # it more explicit so we can control the retry parameters.
       @retry.no_retries # Using no_retries marks this as an integration point.
       def _start_upload(self):

       

      All the other methods in the same module have this backoff implementation:

       

      @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)

       

      , as the Google Cloud Storage docs suggest.

      Is there any potential problem simply adding the same backoff implementation to the def _start_upload method?

       

      It's difficult to state the rate when these errors occurs, since it's a backend issue which is not properly handled in the code. In my case, for a pipeline which is constantly handling loads of events, in the last 15 days there were 5 occurrences. However, even if the rate is low, my main concern here is that from my point of view when these errors on a resumable upload are thrown, since there is no retry strategy, I'm just loosing that data right?

      If I'm wrong I'd love to learn why, what's actually happening and what I'm missing. If I'm right, it means that there is potential data losses and the priority for this should be raised?

       

      In my case I'm using Dataflow with ApacheBeam 2.28, but checking the code in the different versions the problem would be the same.

       

      The piece of code where this is happening is this:

       

       
      from apache_beam.io.fileio import WriteToFiles 
             ... 
             | "Write to GCS" >> WriteToFiles(
                                   path=output_path, shards=1, max_writers_per_bundle=0,
                                   destination=lambda record: record['topic_kafka'],
                                   sink=JsonSink(), 
                                   file_naming=destination_partitioning_naming(extension="json", topics=topics) ) )
       
      

       

      ***EDIT:
      I got an answer in a Stackoverflow question I made:
      https://stackoverflow.com/questions/67972758/apache-beam-python-gscio-upload-method-has-retry-no-retries-implemented-causes/67975695#67975695

      Referencing this doc:
      https://cloud.google.com/dataflow/docs/resources/faq#how-are-java-exceptions-handled-in-cloud-dataflow

      It makes sense that since Dataflow retries work items the code itself doesn't need to have the retry logic. Still, is there any problem implementing the @retry.with_exponential_backoff(..) in this method "_start_upload"?  I guess at least it would be cleaner

       

      Here [1] is the full stacktrace of the error that appears in the main Dataflow page.

      [1] 
      2021-06-10 16:58:55.104 CEST
      Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker.invoke_process_per_window File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in process writer.close() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 220, in close self._uploader.finish() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 651, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert return self._RunMethod( File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self.ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/<bucket-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<uploadid>>: response: <{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id>, 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 999, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/init.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in process writer.close() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 220, in close self._uploader.finish() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 651, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert return self._RunMethod( File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self._ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/<bucke-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<id>>: response: <{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id>', 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> [while running 'Write to GCS/ParDo(_WriteShardedRecordsFn)-ptransform-50705'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              VictorGea Victor Luis Gea Garcia
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated: