Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.8.0
-
None
-
Novice
Description
When the GooglePubsubConsumer receives a 500 series error from Google I expect it to catch that exception, log it and continue trying to pull messages from PubSub.
What is actually happening is when an error like that occurs, the SubscriberWrapper thread started by the GooglePubsubConsumer exits and never restarts. The application will pull no more messages from PubSub for the lifetime of the application.
The error is intermittent but can be reproduced using a simple route and waiting:
public class PubSubRoutes extends RouteBuilder { @Override public void configure() throws Exception { from("google-pubsub://{{gcp.project.id}}:{{gcp.subscription.id}}?synchronousPull=true") .log(LoggingLevel.INFO, "MessageReceived! ${body}"); } }
Sample project: https://github.com/anglind/camel-pubsub-bug
The error log:
16:58:36.394 [com.github.anglind.App.main()] INFO o.a.c.c.g.p.GooglePubsubConsumer - Starting Google PubSub consumer for myproject/camel-test-topic-sub 16:58:36.450 [com.github.anglind.App.main()] INFO o.a.c.i.engine.AbstractCamelContext - Routes startup summary (total:1 started:1) 16:58:36.451 [com.github.anglind.App.main()] INFO o.a.c.i.engine.AbstractCamelContext - Started route1 (google-pubsub://myproject:camel-test-topic-sub) 16:58:36.452 [com.github.anglind.App.main()] INFO o.a.c.i.engine.AbstractCamelContext - Apache Camel 3.8.0 (camel-1) started in 308ms (build:42ms init:204ms start:62ms) 16:59:04.044 [Camel (camel-1) thread #0 - GooglePubsubConsumer[camel-test-topic-sub]] INFO route1 - MessageReceived! This is a message I published 18:23:12.719 [Camel (camel-1) thread #0 - GooglePubsubConsumer[camel-test-topic-sub]] ERROR o.a.c.c.g.p.GooglePubsubConsumer - Failure getting messages from PubSub com.google.api.gax.rpc.DeadlineExceededException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 23.965861200s. [buffered_nanos=7490862100, buffered_nanos=65424900, remote_addr=pubsub.googleapis.com/74.125.193.95:443] at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:51) at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1041) at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1215) at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:983) at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:771) at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563) at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533) at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:464) at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:428) at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:461) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:617) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:803) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:782) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57) at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112) at org.apache.camel.component.google.pubsub.GooglePubsubConsumer$SubscriberWrapper.synchronousPull(GooglePubsubConsumer.java:152) at org.apache.camel.component.google.pubsub.GooglePubsubConsumer$SubscriberWrapper.run(GooglePubsubConsumer.java:113) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 common frames omitted Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 23.965861200s. [buffered_nanos=7490862100, buffered_nanos=65424900, remote_addr=pubsub.googleapis.com/74.125.193.95:443] at io.grpc.Status.asRuntimeException(Status.java:533) ... 17 common frames omitted
I think the component should catch and continue pulling for all the 500 series error codes listed on this page: https://cloud.google.com/pubsub/docs/reference/error-codes
Attachments
Issue Links
- links to