Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Duplicate
-
2.36.0
-
None
-
GCP Dataflow, Java 11
Description
Running a streaming pipeline on the latest Beam version (2.36.0), reading data from PubSub and writing to BigQuery with the following sink code:
BigQueryIO.<Row>write()
.to("project.datatset.table")
.useBeamSchema()
.withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Worked for several days before the following exception was raised:
Error message from worker: java.lang.RuntimeException: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Stream is already closed org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:268) org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:248) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:315) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:382) Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Stream is already closed com.google.cloud.bigquery.storage.v1.StreamWriter.appendInternal(StreamWriter.java:294) com.google.cloud.bigquery.storage.v1.StreamWriter.append(StreamWriter.java:272) org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.appendRows(BigQueryServicesImpl.java:1243) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$1(StorageApiWriteUnshardedRecords.java:241) org.apache.beam.sdk.io.gcp.bigquery.RetryManager$Operation.run(RetryManager.java:129) org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:245) org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:274) org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:248) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:315) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:382) org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeFinishBundle(Unknown Source) org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:238) org.apache.beam.runners.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:433) org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:56) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:94) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1437) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1113) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834)
After which, the pipeline enters an endless retry loop, and new data is no longer processed. As a result, Streaming Writes cannot be used for production workloads.
Also reported by another user on the mailing list: https://lists.apache.org/thread/21st7zpj3qvn5d9jrcmdroyolbf7pyro