Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-14223

PubsubLiteIO makes excessive calls to GCP google.cloud.pubsublite.v1.AdminService.GetTopicPartitions

Details

    • Bug
    • Status: Open
    • P2
    • Resolution: Unresolved
    • None
    • None
    • io-java-gcp
    • None

    Description

      As per https://the-asf.slack.com/archives/C9H0YNP3P/p1648752819277569?thread_ts=1648749667.880749&cid=C9H0YNP3P Kyle Weaver, tagging Daniel Collins 

      When using PubsubLiteIO.read, the GCP admin client makes excessive calls to google.cloud.pubsublite.v1.AdminService.GetTopicPartitions. In a pipeline that ran for 1 minute and 55 seconds this call is made over 1000 times which results in the quota for this API being reached – resulting in job failure.

      I looked into the history of the module, and I noticed that partition settings were exposed before 2.34.0, but have been removed. https://github.com/apache/beam/commit/8a646aaa95e79a3f33dff204a659c8a221069ffe#diff-9828d5eb9f2fc844e12c0ebc87b3ffe12d7c4db5d9284a34b979598cf8fc6313R129

      To replicate:
      ```

      Pipeline p = Pipeline.create(options);
      SubscriberOptions opts = SubscriberOptions.newBuilder()
      .setSubscriptionPath(SubscriptionPath.parse("projects/<project-id>/locations/us-west1-a/subscriptions/<my-subscription>"))
      .build();
      p.apply(PubsubLiteIO.read(opts));
      ```

      Logs:
      ```
      2022-03-29 15:31:02
      org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
          at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
          at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
          at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
          at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
          at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
          at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
          at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
          at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
          at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
          at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
          at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
          at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
          at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
          at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
          at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
          at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
          at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
          at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
          at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
          at akka.actor.ActorCell.invoke(ActorCell.scala:561)
          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
          at akka.dispatch.Mailbox.run(Mailbox.scala:225)
          at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
          at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
          at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
          at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
          at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.util.concurrent.ExecutionException: org.apache.beam.sdk.util.UserCodeException: com.google.api.gax.rpc.ResourceExhaustedException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED>'.
          at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
          at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
          at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
          at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:453)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:694)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:645)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
          at java.lang.Thread.run(Thread.java:750)
      Caused by: org.apache.beam.sdk.util.UserCodeException: com.google.api.gax.rpc.ResourceExhaustedException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED>'.
          at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
          at org.apache.beam.sdk.transforms.Watch$WatchGrowthFn$DoFnInvoker.invokeProcessElement(Unknown Source)
          at org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:123)
          at org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:523)
          at org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown Source)
          at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
          at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
          at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:174)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1001)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.processPendingProcessingTimeTimers(DoFnOperator.java:1356)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.close(DoFnOperator.java:587)
          at org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.close(SplittableDoFnOperator.java:182)
          at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:213)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
          at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:210)
          at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:185)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
          at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
          at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:162)
          ... 9 more
      Caused by: com.google.api.gax.rpc.ResourceExhaustedException: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED>'.
          at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:57)
          at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
          at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
          at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
          at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
          at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1074)
          at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
          at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1213)
          at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
          at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
          at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
          at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
          at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463)
          at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427)
          at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460)
          at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553)
          at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68)
          at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739)
          at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718)
          at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
          at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      Caused by: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Read-only administrator operations' and limit 'Read-only administrator operations per minute per region' of service 'pubsublite.googleapis.com' for consumer 'project_number:<THE PROJECT NUMBER REDACTED>'.
          at io.grpc.Status.asRuntimeException(Status.java:535)
          at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
          at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463)
          at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427)
          at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460)
          at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553)
          at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68)
          at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739)
          at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718)
          at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
          at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:750)

      ```

       

      Flink version: 1.13.5
      Beam version: 2.34.0

      Attachments

        1. image-2022-03-31-15-20-15-033.png
          28 kB
          Daniel Lindeman

        Activity

          People

            Unassigned Unassigned
            daniel.lindeman Daniel Lindeman

            Dates

              Created:
              Updated:

              Slack

                Issue deployment