Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-12843

(Broken Pipe induced) Bricked Dataflow Pipeline 

Details

    • Bug
    • Status: Open
    • P1
    • Resolution: Unresolved
    • 2.31.0
    • None
    • io-py-gcp, runner-dataflow
    • None

    Description

      We are seeing Dataflow pipelines being stuck indefinitely, the common theme of this behaviour is a bundle failing with the Broken Pipe error and subsequently the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).

      Specifically, we see an exception like the following for a bundle (truncated re-raise exception log as it’s long):-

       

      "Error processing instruction process_bundle-7079259598045896145-12555. Original traceback is
      Traceback (most recent call last):
        File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process
        File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
        File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
        File "apache_beam/runners/common.py", line 1359, in apache_beam.runners.common._OutputProcessor.process_outputs
        File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", line 192, in process
          writer.write(element)
        File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1454, in write
          return self._file_handle.write(self._coder.encode(row) + b'\n')
        File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 200, in write
          self._uploader.put(b)
        File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 661, in put
          self._conn.send_bytes(data.tobytes())
        File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
          self._send_bytes(m[offset:offset + size])
        File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in _send_bytes
          self._send(buf)
        File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in _send
          n = write(self._handle, buf)
      BrokenPipeError: [Errno 32] Broken pipe
      

      And as previously mentioned,  the next bundle is stuck at the `StartBundle` stage (reported by Dataflow), the progress report thread logs message like these:-

       

      "Operation ongoing for over 10087.60 seconds in state start-msecs in step Assign to Location for POI joins-ptransform-49654  without returning. Current Traceback:
        File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
          self._bootstrap_inner()
      
        File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
          self.run()
      
        File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
          self._work_item.run()
      
        File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
          self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
      
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in task
          lambda: self.create_worker().do_instruction(request), request)
      
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
          response = task()
      
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
          lambda: self.create_worker().do_instruction(request), request)
      
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
          getattr(request, request_type), request.instruction_id)
      
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
          bundle_processor.process_bundle(instruction_id))
      
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 983, in process_bundle
          expected_inputs):
      
        File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", line 459, in input_elements
          element = received.get(timeout=1)
      
        File "/usr/local/lib/python3.6/queue.py", line 173, in get
          self.not_empty.wait(remaining)
      
        File "/usr/local/lib/python3.6/threading.py", line 299, in wait
          gotit = waiter.acquire(True, timeout)
      

       

      Some details about the Broken Pipe error

      As observed from the logs, the exception is related to this line, since the exception is a BrokenPipeError instead of an OSError, the connection must have been closed from the other end, i.e. here.

      Since it’s closed from the other end, there must be some error in this try/except block, searching through the logs does reveal an error in the thread spawned for uploading whatever data is sent through the pipe.

      Error as follow:-

       

      "Error in _start_upload while inserting file gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d: Traceback (most recent call last):
        File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 649, in _start_upload
          self._client.objects.Insert(self._insert_request, upload=self._upload)
        File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert
          upload=upload, upload_config=upload_config)
        File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod
          http_request, client=self.client)
        File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload
          return self.StreamInChunks()
        File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks
          additional_headers=additional_headers)
        File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 971, in __StreamMedia
          self.RefreshResumableUploadState()
        File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 873, in RefreshResumableUploadState
          self.stream.seek(self.progress)
        File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 302, in seek
          (offset, whence, self.position, self.last_block_position))
      NotImplementedError: offset: 169869312, whence: 0, position: 176160768, last: 167772160
      

       

      Specifically this error, when seeking an offset between the last block position and current offset.

       

      My thoughts:

      • Why is the pipeline stalled at the `StartBundle` stage after such a Broken Pipe Error? Something to do with the uploader thread?
      • Could seeking an offset between the last block position and current offset be implemented without significant repercussions?

       

      Same issue but with picture attached here

      Attachments

        Activity

          People

            Unassigned Unassigned
            ryantam626 Ryan Tam
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: