Camel
  1. Camel
  2. CAMEL-5113

Parallel and fault tolerant message processing for SQS endpoints.

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: camel-aws
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:

      1. SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
      2. Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
      3. Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries.

      I propose the following solutions to these problems:

      1. Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor(). (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
      2. Replace maxMessagesPerPoll with maxInFlightMessages. Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
      3. In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.

      How does this sound? I'm working on a patch. This is my first work on Camel, so if you see any problems with my approach let me know!

      Thanks,

      • Dan

        Activity

        Show
        Daniel Carleton added a comment - Related thread on mailing list: http://camel.465427.n5.nabble.com/Better-Way-to-Achieve-Parallel-Processing-of-SQS-Messages-td5578135.html
        Hide
        Claus Ibsen added a comment -

        Ad 1)
        +1
        This is fine

        Ad 2)
        -1
        No leave it as is, this is how it works with batch consumers in Camel

        Ad 3)
        +1
        Yeah if some rollback is possible, then that is fine

        Show
        Claus Ibsen added a comment - Ad 1) +1 This is fine Ad 2) -1 No leave it as is, this is how it works with batch consumers in Camel Ad 3) +1 Yeah if some rollback is possible, then that is fine
        Hide
        Claus Ibsen added a comment -

        Ad 2)
        The default of maxMessagesPerPoll should likely be changed from 10 to 1 as you say. But do not introduce more complexity with in flight semaphores and whatnot. People can use the throttler inflight policy for this kind of behavior.

        Show
        Claus Ibsen added a comment - Ad 2) The default of maxMessagesPerPoll should likely be changed from 10 to 1 as you say. But do not introduce more complexity with in flight semaphores and whatnot. People can use the throttler inflight policy for this kind of behavior.
        Hide
        Claus Ibsen added a comment -

        You can read about the throttle route policy here
        http://camel.apache.org/routepolicy

        And btw I also wonder a bit why some of the AWS component was created as a BatchConsumer. Maybe it was a copy/paste from another component. Or it was on purpose.
        http://camel.apache.org/batch-consumer.html

        Show
        Claus Ibsen added a comment - You can read about the throttle route policy here http://camel.apache.org/routepolicy And btw I also wonder a bit why some of the AWS component was created as a BatchConsumer. Maybe it was a copy/paste from another component. Or it was on purpose. http://camel.apache.org/batch-consumer.html
        Hide
        Daniel Carleton added a comment -

        Thanks for the feedback, Claus.

        Ok, sounds like I should apply #1 and #3, change the default for maxMessagesPerPoll to 1, and use ThrottlingInflightRoutePolicy to control concurrency.

        SQS does support grabbing multiple messages per request, so I suppose a BatchConsumer makes some sense. If there's only a single Camel context consuming from the SQS queue, then it doesn't make a difference. However if you have multiple processes polling the SQS queue, then each one should only reserve as many messages as it can process at a time. Otherwise messages will get queued and processing latency will increase.

        Show
        Daniel Carleton added a comment - Thanks for the feedback, Claus. Ok, sounds like I should apply #1 and #3, change the default for maxMessagesPerPoll to 1, and use ThrottlingInflightRoutePolicy to control concurrency. SQS does support grabbing multiple messages per request, so I suppose a BatchConsumer makes some sense. If there's only a single Camel context consuming from the SQS queue, then it doesn't make a difference. However if you have multiple processes polling the SQS queue, then each one should only reserve as many messages as it can process at a time. Otherwise messages will get queued and processing latency will increase.
        Hide
        Claus Ibsen added a comment -

        Daniel, yeah that will be good. Looking forward for the patche(s).

        Show
        Claus Ibsen added a comment - Daniel, yeah that will be good. Looking forward for the patche(s).
        Hide
        Daniel Carleton added a comment -

        I ended up not needing concurrency for my project, so setting maxMessagesPerPoll=1 was sufficient for my purposes. Others here are aware of the issue, though, and so hopefully at patch will result at some point!

        Show
        Daniel Carleton added a comment - I ended up not needing concurrency for my project, so setting maxMessagesPerPoll=1 was sufficient for my purposes. Others here are aware of the issue, though, and so hopefully at patch will result at some point!
        Hide
        Christian Posta added a comment -

        Daniel, any update on the patch you may have worked on? I realize it was a while ago, but if you post what you had I can take over for you if you'd like.

        Show
        Christian Posta added a comment - Daniel, any update on the patch you may have worked on? I realize it was a while ago, but if you post what you had I can take over for you if you'd like.
        Hide
        Oleg Kozlov added a comment -

        Is there any work planned for integrating this patch or implementing an alternative way of processing more than one messages from a queue in parallel using aws-sqs component?

        We have a hard requirement in our use case that requires this functionality. Limiting consumer to processing one message at a time seems to be significantly limiting this component's usability.

        Thank You!

        Show
        Oleg Kozlov added a comment - Is there any work planned for integrating this patch or implementing an alternative way of processing more than one messages from a queue in parallel using aws-sqs component? We have a hard requirement in our use case that requires this functionality. Limiting consumer to processing one message at a time seems to be significantly limiting this component's usability. Thank You!
        Hide
        Clark Dudek added a comment - - edited

        Christian Posta, looks like the patch is posted here. https://gist.github.com/dacc/2126164 I haven't tested this at all and looks like it only affects SqsConsumer.java by making it a AsyncProcessor.

        Show
        Clark Dudek added a comment - - edited Christian Posta , looks like the patch is posted here. https://gist.github.com/dacc/2126164 I haven't tested this at all and looks like it only affects SqsConsumer.java by making it a AsyncProcessor.

          People

          • Assignee:
            Unassigned
            Reporter:
            Daniel Carleton
          • Votes:
            2 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development