Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-17592

concurrentConsumers URI parameter not working with aws2-sqs endpoint

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.13.0
    • 3.11.6, 3.14.2, 3.16.0
    • camel-aws
    • None
    • <dependency>
      <groupId>org.apache.camel.springboot</groupId>
      <artifactId>camel-aws2-sqs-starter</artifactId>
      <version>3.13.0</version>
      </dependency>

    • Unknown

    Description

      The concurrentConsumers URI parameter is not taken into account by the endpoint.
      We can reproduce the issue with the following prototype:

      from("aws2-sqs://queuexxx?concurrentConsumers=5&amazonSQSClient=#sqsClient&
      waitTimeSeconds=20")
      .process(exchange ->

      {      System.out.println("Message received..."); }

      )
      .process(exchange -> {
           try

      {         Thread.sleep(5000);      }

      catch (InterruptedException e) {
              e.printStackTrace();
           }});

      With the above queue, if we send 3 messages, we have to wait
      5 seconds to see the second message ("Message received...") and 5 more
      seconds to see the third one. The expected behavior is to see the 3 messages
      consumed without delay since 3 threads will consume them in parallel.

      Turning on the Camel logs it seems that the next polling is done only after
      the Delete for the previous message is sent (which is with a delay of 5s).
      The messages are read 1 by 1 (=1 message per polling) so the above behavior is not seen because the message are consumed together by one polling and continue in the same thread.

      Note that, as commented by Narsi Reddy Nallamilli in the user mailing list, we obtain the expected behavior by adding a delay in the route as follow: 

      from("aws2-sqs://queuexxx?concurrentConsumers=5&amazonSQSClient=#sqsClient&
      waitTimeSeconds=20")
      .delay(1L)
      .process(exchange ->

      {     System.out.println("Message received..."); }

      )
      .process(exchange -> {
          try

      {        Thread.sleep(5000);     }

      catch (InterruptedException e) {
             e.printStackTrace();
          }});

      Attachments

        Activity

          People

            davsclaus Claus Ibsen
            arnaudlbcn Arnaud Level
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: