Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-16263

camel-google-pubsub - Consumer does not recover from 500 series error from Google

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.8.0
    • 3.11.0
    • camel-google-pubsub
    • None
    • Novice

    Description

      When the GooglePubsubConsumer receives a 500 series error from Google I expect it to catch that exception, log it and continue trying to pull messages from PubSub.

      What is actually happening is when an error like that occurs, the SubscriberWrapper thread started by the GooglePubsubConsumer exits and never restarts. The application will pull no more messages from PubSub for the lifetime of the application.

      The error is intermittent but can be reproduced using a simple route and waiting:

      public class PubSubRoutes extends RouteBuilder {
          @Override
          public void configure() throws Exception {
              from("google-pubsub://{{gcp.project.id}}:{{gcp.subscription.id}}?synchronousPull=true")
                      .log(LoggingLevel.INFO, "MessageReceived! ${body}");
          }
      }
      

      Sample project: https://github.com/anglind/camel-pubsub-bug

      The error log:

      16:58:36.394 [com.github.anglind.App.main()] INFO  o.a.c.c.g.p.GooglePubsubConsumer - Starting Google PubSub consumer for myproject/camel-test-topic-sub
      16:58:36.450 [com.github.anglind.App.main()] INFO  o.a.c.i.engine.AbstractCamelContext - Routes startup summary (total:1 started:1)
      16:58:36.451 [com.github.anglind.App.main()] INFO  o.a.c.i.engine.AbstractCamelContext - 	Started route1 (google-pubsub://myproject:camel-test-topic-sub)
      16:58:36.452 [com.github.anglind.App.main()] INFO  o.a.c.i.engine.AbstractCamelContext - Apache Camel 3.8.0 (camel-1) started in 308ms (build:42ms init:204ms start:62ms)
      16:59:04.044 [Camel (camel-1) thread #0 - GooglePubsubConsumer[camel-test-topic-sub]] INFO  route1 - MessageReceived! This is a message I published
      18:23:12.719 [Camel (camel-1) thread #0 - GooglePubsubConsumer[camel-test-topic-sub]] ERROR o.a.c.c.g.p.GooglePubsubConsumer - Failure getting messages from PubSub
      com.google.api.gax.rpc.DeadlineExceededException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 23.965861200s. [buffered_nanos=7490862100, buffered_nanos=65424900, remote_addr=pubsub.googleapis.com/74.125.193.95:443]
      	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:51)
      	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
      	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
      	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
      	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
      	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1041)
      	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
      	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1215)
      	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
      	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
      	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
      	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
      	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464)
      	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428)
      	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461)
      	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617)
      	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
      	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803)
      	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782)
      	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
      	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      	Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
      		at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
      		at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
      		at org.apache.camel.component.google.pubsub.GooglePubsubConsumer$SubscriberWrapper.synchronousPull(GooglePubsubConsumer.java:152)
      		at org.apache.camel.component.google.pubsub.GooglePubsubConsumer$SubscriberWrapper.run(GooglePubsubConsumer.java:113)
      		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      		... 3 common frames omitted
      Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 23.965861200s. [buffered_nanos=7490862100, buffered_nanos=65424900, remote_addr=pubsub.googleapis.com/74.125.193.95:443]
      	at io.grpc.Status.asRuntimeException(Status.java:533)
      	... 17 common frames omitted
      
      

      I think the component should catch and continue pulling for all the 500 series error codes listed on this page: https://cloud.google.com/pubsub/docs/reference/error-codes

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            acosentino Andrea Cosentino
            anglind Donal Anglin
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment