Camel
  1. Camel
  2. CAMEL-5113

Parallel and fault tolerant message processing for SQS endpoints.

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.15.0
    • Component/s: camel-aws
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component:

      1. SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint.
      2. Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with.
      3. Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries.

      I propose the following solutions to these problems:

      1. Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor(). (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.)
      2. Replace maxMessagesPerPoll with maxInFlightMessages. Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve.
      3. In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility.

      How does this sound? I'm working on a patch. This is my first work on Camel, so if you see any problems with my approach let me know!

      Thanks,

      • Dan

        Activity

        Show
        Daniel Carleton added a comment - Related thread on mailing list: http://camel.465427.n5.nabble.com/Better-Way-to-Achieve-Parallel-Processing-of-SQS-Messages-td5578135.html
        Hide
        Claus Ibsen added a comment -

        Ad 1)
        +1
        This is fine

        Ad 2)
        -1
        No leave it as is, this is how it works with batch consumers in Camel

        Ad 3)
        +1
        Yeah if some rollback is possible, then that is fine

        Show
        Claus Ibsen added a comment - Ad 1) +1 This is fine Ad 2) -1 No leave it as is, this is how it works with batch consumers in Camel Ad 3) +1 Yeah if some rollback is possible, then that is fine
        Hide
        Claus Ibsen added a comment -

        Ad 2)
        The default of maxMessagesPerPoll should likely be changed from 10 to 1 as you say. But do not introduce more complexity with in flight semaphores and whatnot. People can use the throttler inflight policy for this kind of behavior.

        Show
        Claus Ibsen added a comment - Ad 2) The default of maxMessagesPerPoll should likely be changed from 10 to 1 as you say. But do not introduce more complexity with in flight semaphores and whatnot. People can use the throttler inflight policy for this kind of behavior.
        Hide
        Claus Ibsen added a comment -

        You can read about the throttle route policy here
        http://camel.apache.org/routepolicy

        And btw I also wonder a bit why some of the AWS component was created as a BatchConsumer. Maybe it was a copy/paste from another component. Or it was on purpose.
        http://camel.apache.org/batch-consumer.html

        Show
        Claus Ibsen added a comment - You can read about the throttle route policy here http://camel.apache.org/routepolicy And btw I also wonder a bit why some of the AWS component was created as a BatchConsumer. Maybe it was a copy/paste from another component. Or it was on purpose. http://camel.apache.org/batch-consumer.html
        Hide
        Daniel Carleton added a comment -

        Thanks for the feedback, Claus.

        Ok, sounds like I should apply #1 and #3, change the default for maxMessagesPerPoll to 1, and use ThrottlingInflightRoutePolicy to control concurrency.

        SQS does support grabbing multiple messages per request, so I suppose a BatchConsumer makes some sense. If there's only a single Camel context consuming from the SQS queue, then it doesn't make a difference. However if you have multiple processes polling the SQS queue, then each one should only reserve as many messages as it can process at a time. Otherwise messages will get queued and processing latency will increase.

        Show
        Daniel Carleton added a comment - Thanks for the feedback, Claus. Ok, sounds like I should apply #1 and #3, change the default for maxMessagesPerPoll to 1, and use ThrottlingInflightRoutePolicy to control concurrency. SQS does support grabbing multiple messages per request, so I suppose a BatchConsumer makes some sense. If there's only a single Camel context consuming from the SQS queue, then it doesn't make a difference. However if you have multiple processes polling the SQS queue, then each one should only reserve as many messages as it can process at a time. Otherwise messages will get queued and processing latency will increase.
        Hide
        Claus Ibsen added a comment -

        Daniel, yeah that will be good. Looking forward for the patche(s).

        Show
        Claus Ibsen added a comment - Daniel, yeah that will be good. Looking forward for the patche(s).
        Hide
        Daniel Carleton added a comment -

        I ended up not needing concurrency for my project, so setting maxMessagesPerPoll=1 was sufficient for my purposes. Others here are aware of the issue, though, and so hopefully at patch will result at some point!

        Show
        Daniel Carleton added a comment - I ended up not needing concurrency for my project, so setting maxMessagesPerPoll=1 was sufficient for my purposes. Others here are aware of the issue, though, and so hopefully at patch will result at some point!
        Hide
        Christian Posta added a comment -

        Daniel, any update on the patch you may have worked on? I realize it was a while ago, but if you post what you had I can take over for you if you'd like.

        Show
        Christian Posta added a comment - Daniel, any update on the patch you may have worked on? I realize it was a while ago, but if you post what you had I can take over for you if you'd like.
        Hide
        Oleg Kozlov added a comment -

        Is there any work planned for integrating this patch or implementing an alternative way of processing more than one messages from a queue in parallel using aws-sqs component?

        We have a hard requirement in our use case that requires this functionality. Limiting consumer to processing one message at a time seems to be significantly limiting this component's usability.

        Thank You!

        Show
        Oleg Kozlov added a comment - Is there any work planned for integrating this patch or implementing an alternative way of processing more than one messages from a queue in parallel using aws-sqs component? We have a hard requirement in our use case that requires this functionality. Limiting consumer to processing one message at a time seems to be significantly limiting this component's usability. Thank You!
        Hide
        Clark Dudek added a comment - - edited

        Christian Posta, looks like the patch is posted here. https://gist.github.com/dacc/2126164 I haven't tested this at all and looks like it only affects SqsConsumer.java by making it a AsyncProcessor.

        Show
        Clark Dudek added a comment - - edited Christian Posta , looks like the patch is posted here. https://gist.github.com/dacc/2126164 I haven't tested this at all and looks like it only affects SqsConsumer.java by making it a AsyncProcessor.
        Hide
        Tony Tiger added a comment -

        This ticket appears to duplicate CAMEL-6286 as the changes in this patch seem to match in functionality.

        Show
        Tony Tiger added a comment - This ticket appears to duplicate CAMEL-6286 as the changes in this patch seem to match in functionality.
        Hide
        Christian Posta added a comment -

        Patch here:
        https://github.com/christian-posta/camel/commit/43ad71e254107c8d9783018826936fe1050fce90

        Would be great if someone on core camel team could review it (Claus Ibsen), would be even better if someone could try it out. What this does is allow you to set a "concurrentConsumers" property that allows you to run multiple threads of the aws-sqs consumers.

        We have deployed this patch in a top e-retailer and processed many many millions of transactions during cyber Monday and it performed with ZERO flaws, while draining a very deep queue very fast... I know, vague, but cannot post the numbers

        Please give me feedback before I commit it.

        Show
        Christian Posta added a comment - Patch here: https://github.com/christian-posta/camel/commit/43ad71e254107c8d9783018826936fe1050fce90 Would be great if someone on core camel team could review it ( Claus Ibsen ), would be even better if someone could try it out. What this does is allow you to set a "concurrentConsumers" property that allows you to run multiple threads of the aws-sqs consumers. We have deployed this patch in a top e-retailer and processed many many millions of transactions during cyber Monday and it performed with ZERO flaws, while draining a very deep queue very fast... I know, vague, but cannot post the numbers Please give me feedback before I commit it.
        Hide
        Claus Ibsen added a comment -

        Christian Posta looks fine. I guess even a concurrent scheduled thread pool works fine since you tested this in real life

        Though would have though a boos and work pool approach would be able to react faster. For example with scheduling and the workers are constantly processing messages. Then when they are done they reschedule, eg sleep for 1/2 sec etc before being ready for next task. Where as a boos worker pool approach the boos is always working and putting new messages in the backlog queue for the worker theads. The drawback is the thread context-switch between boss/worker.

        But lets apply this patch - its a simple solution to this, and has been battle tested and works fast as you say.

        Show
        Claus Ibsen added a comment - Christian Posta looks fine. I guess even a concurrent scheduled thread pool works fine since you tested this in real life Though would have though a boos and work pool approach would be able to react faster. For example with scheduling and the workers are constantly processing messages. Then when they are done they reschedule, eg sleep for 1/2 sec etc before being ready for next task. Where as a boos worker pool approach the boos is always working and putting new messages in the backlog queue for the worker theads. The drawback is the thread context-switch between boss/worker. But lets apply this patch - its a simple solution to this, and has been battle tested and works fast as you say.
        Show
        Christian Posta added a comment - Patch applied with https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=b80021a1551213e155c4ec8b1464831e9a6ab1d3

          People

          • Assignee:
            Christian Posta
            Reporter:
            Daniel Carleton
          • Votes:
            2 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development