Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.19.0
-
None
-
Unknown
Description
The message extension feature of the Sqs2Consumer can cause rejected execution exceptions such as:
2022-11-30 16:43:51.958 logLevel=WARN 10 --- [xxx] logger=o.a.c.component.aws2.sqs.Sqs2Consumer : Failed polling endpoint: aws2-sqs://arn:aws:sqs:eu-west-1:xxxxxxx:some_queue?delay=3000&extendMessageVisibility=true&greedy=true&visibilityTimeout=60&waitTimeSeconds=10. Will try again at next poll. Caused by: [java.util.concurrent.RejectedExecutionException - Task rejected due queue size limit reached] java.util.concurrent.RejectedExecutionException: Task rejected due queue size limit reached at org.apache.camel.util.concurrent.SizedScheduledExecutorService.scheduleAtFixedRate(SizedScheduledExecutorService.java:92) ~[camel-util-3.18.2.jar:3.18.2] at org.apache.camel.component.aws2.sqs.Sqs2Consumer.processBatch(Sqs2Consumer.java:183) ~[camel-aws2-sqs-3.18.2.jar:3.18.2] at org.apache.camel.component.aws2.sqs.Sqs2Consumer.poll(Sqs2Consumer.java:121) ~[camel-aws2-sqs-3.18.2.jar:3.18.2] at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.18.2.jar:3.18.2] at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.18.2.jar:3.18.2] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na] at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
The consumer is configured with a default ThreadPoolPofile, and thus has a maxQueueSize of 1000.
The message extender is running in its own scheduled executor which is instantiated within Sqs2Consumer:
this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager() .newSingleThreadScheduledExecutor(this, "SqsTimeoutExtender");
Thus, also using the default thread pool profile, and thus a maxQueueSize of 1000.
A slowdown of processing the extending tasks can lead to this inner queue being filled, causing the exceptions to be thrown (quickly flooding the logs).
Possible solutions that I can think of would be to set the maxQueueSize of the SqsTimeoutExtender to 2x of the consumer thread pool or set the maxQueueSize to unbound (-1).
The latter might be acceptable tasks are cancelled upon completing and thus cannot grow unbound.
I can contribute a PR, but would need some guidance as to which solution our be preferable.
Attachments
Issue Links
- links to