Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-13954

BigQueryIO Storage Write API: Stream is already closed

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Duplicate
    • 2.36.0
    • 2.38.0
    • io-java-gcp
    • None
    • GCP Dataflow, Java 11

    Description

      Running a streaming pipeline on the latest Beam version (2.36.0), reading data from PubSub and writing to BigQuery with the following sink code:

      BigQueryIO.<Row>write()
        .to("project.datatset.table")
        .useBeamSchema()
        .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

      Worked for several days before the following exception was raised:

      Error message from worker: java.lang.RuntimeException: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Stream is already closed
              org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:268)
              org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:248)
              org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:315)
              org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:382)
      Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Stream is already closed
              com.google.cloud.bigquery.storage.v1.StreamWriter.appendInternal(StreamWriter.java:294)
              com.google.cloud.bigquery.storage.v1.StreamWriter.append(StreamWriter.java:272)
              org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.appendRows(BigQueryServicesImpl.java:1243)
              org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.lambda$flush$1(StorageApiWriteUnshardedRecords.java:241)
              org.apache.beam.sdk.io.gcp.bigquery.RetryManager$Operation.run(RetryManager.java:129)
              org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:245)
              org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:274)
              org.apache.beam.sdk.io.gcp.bigquery.RetryManager.run(RetryManager.java:248)
              org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:315)
              org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.finishBundle(StorageApiWriteUnshardedRecords.java:382)
              org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
              org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:238)
              org.apache.beam.runners.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:433)
              org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:56)
              org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:94)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1437)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
              org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1113)
              java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              java.base/java.lang.Thread.run(Thread.java:834)
      

      After which, the pipeline enters an endless retry loop, and new data is no longer processed. As a result, Streaming Writes cannot be used for production workloads.

      Also reported by another user on the mailing list: https://lists.apache.org/thread/21st7zpj3qvn5d9jrcmdroyolbf7pyro

      Attachments

        Activity

          People

            Unassigned Unassigned
            dennis.waldron Dennis Waldron
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: