Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-16263

camel-google-pubsub - Consumer does not recover from 500 series error from Google

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.8.0
    • Fix Version/s: 3.11.0
    • Component/s: camel-google-pubsub
    • Labels:
      None
    • Estimated Complexity:
      Novice

      Description

      When the GooglePubsubConsumer receives a 500 series error from Google I expect it to catch that exception, log it and continue trying to pull messages from PubSub.

      What is actually happening is when an error like that occurs, the SubscriberWrapper thread started by the GooglePubsubConsumer exits and never restarts. The application will pull no more messages from PubSub for the lifetime of the application.

      The error is intermittent but can be reproduced using a simple route and waiting:

      public class PubSubRoutes extends RouteBuilder {
          @Override
          public void configure() throws Exception {
              from("google-pubsub://{{gcp.project.id}}:{{gcp.subscription.id}}?synchronousPull=true")
                      .log(LoggingLevel.INFO, "MessageReceived! ${body}");
          }
      }
      

      Sample project: https://github.com/anglind/camel-pubsub-bug

      The error log:

      16:58:36.394 [com.github.anglind.App.main()] INFO  o.a.c.c.g.p.GooglePubsubConsumer - Starting Google PubSub consumer for myproject/camel-test-topic-sub
      16:58:36.450 [com.github.anglind.App.main()] INFO  o.a.c.i.engine.AbstractCamelContext - Routes startup summary (total:1 started:1)
      16:58:36.451 [com.github.anglind.App.main()] INFO  o.a.c.i.engine.AbstractCamelContext - 	Started route1 (google-pubsub://myproject:camel-test-topic-sub)
      16:58:36.452 [com.github.anglind.App.main()] INFO  o.a.c.i.engine.AbstractCamelContext - Apache Camel 3.8.0 (camel-1) started in 308ms (build:42ms init:204ms start:62ms)
      16:59:04.044 [Camel (camel-1) thread #0 - GooglePubsubConsumer[camel-test-topic-sub]] INFO  route1 - MessageReceived! This is a message I published
      18:23:12.719 [Camel (camel-1) thread #0 - GooglePubsubConsumer[camel-test-topic-sub]] ERROR o.a.c.c.g.p.GooglePubsubConsumer - Failure getting messages from PubSub
      com.google.api.gax.rpc.DeadlineExceededException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 23.965861200s. [buffered_nanos=7490862100, buffered_nanos=65424900, remote_addr=pubsub.googleapis.com/74.125.193.95:443]
      	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:51)
      	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
      	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
      	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
      	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
      	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1041)
      	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
      	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1215)
      	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983)
      	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771)
      	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
      	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
      	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464)
      	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428)
      	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461)
      	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617)
      	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
      	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803)
      	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782)
      	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
      	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      	Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
      		at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
      		at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
      		at org.apache.camel.component.google.pubsub.GooglePubsubConsumer$SubscriberWrapper.synchronousPull(GooglePubsubConsumer.java:152)
      		at org.apache.camel.component.google.pubsub.GooglePubsubConsumer$SubscriberWrapper.run(GooglePubsubConsumer.java:113)
      		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      		... 3 common frames omitted
      Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 23.965861200s. [buffered_nanos=7490862100, buffered_nanos=65424900, remote_addr=pubsub.googleapis.com/74.125.193.95:443]
      	at io.grpc.Status.asRuntimeException(Status.java:533)
      	... 17 common frames omitted
      
      

      I think the component should catch and continue pulling for all the 500 series error codes listed on this page: https://cloud.google.com/pubsub/docs/reference/error-codes

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                acosentino Andrea Cosentino
                Reporter:
                anglind Donal Anglin
              • Votes:
                1 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: