Details
-
Bug
-
Status: Open
-
P1
-
Resolution: Unresolved
-
2.31.0
-
None
-
None
Description
We are seeing Dataflow pipelines being stuck indefinitely, the common theme of this behaviour is a bundle failing with the Broken Pipe error and subsequently the next bundle being stuck at the `StartBundle` stage (reported by Dataflow).
Specifically, we see an exception like the following for a bundle (truncated re-raise exception log as it’s long):-
"Error processing instruction process_bundle-7079259598045896145-12555. Original traceback is Traceback (most recent call last): File "apache_beam/runners/common.py", line 1223, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 875, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1359, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.6/site-packages/resolution/utilities/beam.py", line 192, in process writer.write(element) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1454, in write return self._file_handle.write(self._coder.encode(row) + b'\n') File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 200, in write self._uploader.put(b) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 661, in put self._conn.send_bytes(data.tobytes()) File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size]) File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 398, in _send_bytes self._send(buf) File "/usr/local/lib/python3.6/multiprocessing/connection.py", line 368, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe
And as previously mentioned, the next bundle is stuck at the `StartBundle` stage (reported by Dataflow), the progress report thread logs message like these:-
"Operation ongoing for over 10087.60 seconds in state start-msecs in step Assign to Location for POI joins-ptransform-49654 without returning. Current Traceback: File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap self._bootstrap_inner() File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run self._work_item.run() File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in task lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute response = task() File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 983, in process_bundle expected_inputs): File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", line 459, in input_elements element = received.get(timeout=1) File "/usr/local/lib/python3.6/queue.py", line 173, in get self.not_empty.wait(remaining) File "/usr/local/lib/python3.6/threading.py", line 299, in wait gotit = waiter.acquire(True, timeout)
Some details about the Broken Pipe error
As observed from the logs, the exception is related to this line, since the exception is a BrokenPipeError instead of an OSError, the connection must have been closed from the other end, i.e. here.
Since it’s closed from the other end, there must be some error in this try/except block, searching through the logs does reveal an error in the thread spawned for uploading whatever data is sent through the pipe.
Error as follow:-
"Error in _start_upload while inserting file gs://hc-resolution-temp/bq_load_staging/055471a8-bef6-4afb-a850-3a5f9edc43f6/huq-core.enriched_impression_1.impressions/1901c43b-ecd0-49f7-a03a-6d6aa418d36d: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/gcsio.py", line 649, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert upload=upload, upload_config=upload_config) File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 715, in _RunMethod http_request, client=self.client) File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 908, in InitializeUpload return self.StreamInChunks() File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 1020, in StreamInChunks additional_headers=additional_headers) File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 971, in __StreamMedia self.RefreshResumableUploadState() File "/usr/local/lib/python3.6/site-packages/apitools/base/py/transfer.py", line 873, in RefreshResumableUploadState self.stream.seek(self.progress) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystemio.py", line 302, in seek (offset, whence, self.position, self.last_block_position)) NotImplementedError: offset: 169869312, whence: 0, position: 176160768, last: 167772160
Specifically this error, when seeking an offset between the last block position and current offset.
My thoughts:
- Why is the pipeline stalled at the `StartBundle` stage after such a Broken Pipe Error? Something to do with the uploader thread?
- Could seeking an offset between the last block position and current offset be implemented without significant repercussions?
Same issue but with picture attached here