Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-18780

Sqs2Consumer message extended causing rejected execution exception when used with threads EIP

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.19.0
    • 3.20.2, 3.21.0, 4.0-M1, 4.0.0
    • camel-aws2
    • None
    • Unknown

    Description

      The message extension feature of the Sqs2Consumer can cause rejected execution exceptions such as:

      2022-11-30 16:43:51.958 logLevel=WARN 10 --- [xxx] logger=o.a.c.component.aws2.sqs.Sqs2Consumer    : Failed polling endpoint: aws2-sqs://arn:aws:sqs:eu-west-1:xxxxxxx:some_queue?delay=3000&extendMessageVisibility=true&greedy=true&visibilityTimeout=60&waitTimeSeconds=10. Will try again at next poll. Caused by: [java.util.concurrent.RejectedExecutionException - Task rejected due queue size limit reached]
      
      java.util.concurrent.RejectedExecutionException: Task rejected due queue size limit reached
      	at org.apache.camel.util.concurrent.SizedScheduledExecutorService.scheduleAtFixedRate(SizedScheduledExecutorService.java:92) ~[camel-util-3.18.2.jar:3.18.2]
      	at org.apache.camel.component.aws2.sqs.Sqs2Consumer.processBatch(Sqs2Consumer.java:183) ~[camel-aws2-sqs-3.18.2.jar:3.18.2]
      	at org.apache.camel.component.aws2.sqs.Sqs2Consumer.poll(Sqs2Consumer.java:121) ~[camel-aws2-sqs-3.18.2.jar:3.18.2]
      	at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.18.2.jar:3.18.2]
      	at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.18.2.jar:3.18.2]
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
      	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
      	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
      	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
      

      The consumer is configured with a default ThreadPoolPofile, and thus has a maxQueueSize of 1000.

      The message extender is running in its own scheduled executor which is instantiated within Sqs2Consumer:

      this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager()
                          .newSingleThreadScheduledExecutor(this, "SqsTimeoutExtender");
      

      Thus, also using the default thread pool profile, and thus a maxQueueSize of 1000.

      A slowdown of processing the extending tasks can lead to this inner queue being filled, causing the exceptions to be thrown (quickly flooding the logs).

      Possible solutions that I can think of would be to set the maxQueueSize of the SqsTimeoutExtender to 2x of the consumer thread pool or set the maxQueueSize to unbound (-1).

      The latter might be acceptable tasks are cancelled upon completing and thus cannot grow unbound.

      I can contribute a PR, but would need some guidance as to which solution our be preferable.

      Attachments

        Issue Links

          Activity

            People

              smox Simon Rasmussen
              smox Simon Rasmussen
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: