Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12376

GCS runtime exn: Request payload size exceeds the limit

    XMLWordPrintableJSON

Details

    • Important

    Description

      I'm trying to use the google cloud storage file system, but it would seem that the FLINK / GCS client libs are creating too-large requests far down in the GCS Java client.

      The Java client is added to the lib folder with this command in Dockerfile (probably hadoop2-1.9.16 at the time of writing):

       

      ADD https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar /opt/flink/lib

      This is the crash output. Focus lines:

      java.lang.RuntimeException: Error while confirming checkpoint

      and

       Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes.

      Full stacktrace:

       

      [analytics-867c867ff6-l622h taskmanager] 2019-04-30 20:23:14,532 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Process -> Timestamps/Watermarks -> app_events (1/1) (9a01e96c0271025d5ba73b735847cd4c) switched from RUNNING to FAILED.
      [analytics-867c867ff6-l622h taskmanager] java.lang.RuntimeException: Error while confirming checkpoint
      [analytics-867c867ff6-l622h taskmanager]     at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1211)
      [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      [analytics-867c867ff6-l622h taskmanager]     at java.lang.Thread.run(Thread.java:748)
      [analytics-867c867ff6-l622h taskmanager] Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size exceeds the limit: 524288 bytes.
      [analytics-867c867ff6-l622h taskmanager]     at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49)
      [analytics-867c867ff6-l622h taskmanager]     at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
      [analytics-867c867ff6-l622h taskmanager]     at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
      [analytics-867c867ff6-l622h taskmanager]     at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
      [analytics-867c867ff6-l622h taskmanager]     at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
      [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1056)
      [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
      [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1138)
      [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:958)
      [analytics-867c867ff6-l622h taskmanager]     at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:748)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:507)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:482)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:694)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:397)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
      [analytics-867c867ff6-l622h taskmanager]     at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
      [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      [analytics-867c867ff6-l622h taskmanager]     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      [analytics-867c867ff6-l622h taskmanager]     ... 3 more
      [analytics-867c867ff6-l622h taskmanager]     Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
      [analytics-867c867ff6-l622h taskmanager]         at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
      [analytics-867c867ff6-l622h taskmanager]         at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
      [analytics-867c867ff6-l622h taskmanager]         at okr.sources.PubSubSource.acknowledgeSessionIDs(PubSubSource.java:122)
      [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.acknowledgeIDs(MultipleIdsMessageAcknowledgingSourceBase.java:122)
      [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.notifyCheckpointComplete(MessageAcknowledgingSourceBase.java:231)
      [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
      [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
      [analytics-867c867ff6-l622h taskmanager]         at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1206)
      [analytics-867c867ff6-l622h taskmanager]         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      [analytics-867c867ff6-l622h taskmanager]         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      [analytics-867c867ff6-l622h taskmanager]         ... 3 more
      

      The file system is configured as such in `conf/flink-conf.yaml`:

       

      state.backend: rocksdb
      state.checkpoints.num-retained: 3
      state.checkpoints.dir: gs://example_bucket/flink/checkpoints
      state.savepoints.dir: gs://example_bucket/flink/savepoints
      state.backend.incremental: true
      

      ...and the checkpoints that are created before the crash are small in size:

       

      I'll be testing with Flink 1.8.0 as well.

      The pom.xml config:

      <!-- https://stackoverflow.com/questions/51860988/flink-checkpoints-to-google-cloud-storage -->
      <!-- https://search.maven.org/search?q=a:flink-statebackend-rocksdb_2.11 -->
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
      </dependency>
      
      <!-- https://search.maven.org/search?q=g:com.google.cloud.bigdataoss -->
      <!-- https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/pubsub/README.md -->
      <!-- Cloud Storage: -->
      <dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>hadoop2-1.9.16</version>
      </dependency>
      
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-filesystem_2.11</artifactId>
        <version>${flink.version}</version>
      </dependency>
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            haf Henrik
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: