Camel
  1. Camel
  2. CAMEL-3211

Enrich and PollEnrich - Option to let it poll multiple messages

    Details

    • Type: Improvement Improvement
    • Status: In Progress
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: 2.4.0
    • Fix Version/s: 3.0.0
    • Component/s: camel-core
    • Labels:
      None

      Description

      Think about if we can enhance enrich and/or pollEnrich to be able to poll multiple messages.

      For example to drain a JMS queue, or a FTP site etc. Currently it polls only 1 message, and the continues.
      With some option to tell it to continue we can have it drain the queue until no more messages.

      Or maybe some expression to tell when to stop. Or have some option in the custom aggregator the end user can set/control if it should continue or poll more.

        Activity

        Hide
        Ben O'Day added a comment -

        r1358452: I added some basic support for polling multiple Exchanges from an endpoint a flag on pollEnrich(). This mode simply polls until empty (using either noWait or an explicit timeout) and creates a List of Exchanges to use as the body...

        That said, there are several different approaches to doing this. For now, I just picked a basic use case to get the ball rolling. If anyone has any other thoughts about other features they'd like to see implemented, let me know.

        Show
        Ben O'Day added a comment - r1358452: I added some basic support for polling multiple Exchanges from an endpoint a flag on pollEnrich(). This mode simply polls until empty (using either noWait or an explicit timeout) and creates a List of Exchanges to use as the body... That said, there are several different approaches to doing this. For now, I just picked a basic use case to get the ball rolling. If anyone has any other thoughts about other features they'd like to see implemented, let me know.
        Hide
        Claus Ibsen added a comment -

        This is more difficult than at first thought.

        I think a solution would require for the components/consumer(s) to support this more natively with an API change.
        So they can return exchanges in a batch. Currently what happens is that the PollingConsumer API fetches 1 exchange at a time.

        I would prefer to work on this in Camel 3.0 where API changes is easier to introduce. Also we should think about that these consumers should have an option to limit how many to consume, eg imagine a file/ftp system with 1000000 of files. For that we can use the maxMessagesPerPoll option. But this option is only available for polling batch consumers. So for a JMS endpoint, that does not have this option, there is no current way to limit the poll to max 100 messages etc. Also the though of having some way of stopping using a predicate/expression would be nice, so people can break out sooner, for example when a "special" message was consumed.

        There is also the aspect of TX. For example with a JMS endpoint. It would be nice to be able to poll a batch and act in a TX.

        Another aspect is that the polled exchanges may have on completion work associated (eg to delete/move a consumed file). And we need to ensure that these on completions gets propagated to the merged exchange, to be executed when its uow is done etc.

        And take note everytime the DSL is changed in camel-core, then you may have to adjust code in camel-scala to make the DSL in sync.

        Show
        Claus Ibsen added a comment - This is more difficult than at first thought. I think a solution would require for the components/consumer(s) to support this more natively with an API change. So they can return exchanges in a batch. Currently what happens is that the PollingConsumer API fetches 1 exchange at a time. I would prefer to work on this in Camel 3.0 where API changes is easier to introduce. Also we should think about that these consumers should have an option to limit how many to consume, eg imagine a file/ftp system with 1000000 of files. For that we can use the maxMessagesPerPoll option. But this option is only available for polling batch consumers. So for a JMS endpoint, that does not have this option, there is no current way to limit the poll to max 100 messages etc. Also the though of having some way of stopping using a predicate/expression would be nice, so people can break out sooner, for example when a "special" message was consumed. There is also the aspect of TX. For example with a JMS endpoint. It would be nice to be able to poll a batch and act in a TX. Another aspect is that the polled exchanges may have on completion work associated (eg to delete/move a consumed file). And we need to ensure that these on completions gets propagated to the merged exchange, to be executed when its uow is done etc. And take note everytime the DSL is changed in camel-core, then you may have to adjust code in camel-scala to make the DSL in sync.
        Hide
        Claus Ibsen added a comment -

        Oh and it would also be better if the poll could do this in a streaming mode with the aggregator, so we don't need to hold X exchanges in memory. But we can do the aggregation on-the-fly.

        Show
        Claus Ibsen added a comment - Oh and it would also be better if the poll could do this in a streaming mode with the aggregator, so we don't need to hold X exchanges in memory. But we can do the aggregation on-the-fly.
        Hide
        Claus Ibsen added a comment -

        We should also avoid adding to many pollEnrich methods to the DSL. Instead we should start to rely on using the fluent builder for that, now we have 7 methods, and some combos is not possible, eg poll multiple with an aggregation strategy etc.

        Since we have 5+ options now, we cannot have so many pollEnrich methods in the DSL. It just makes the list of methods too big when people press ctrl + space in their route builder.

        Show
        Claus Ibsen added a comment - We should also avoid adding to many pollEnrich methods to the DSL. Instead we should start to rely on using the fluent builder for that, now we have 7 methods, and some combos is not possible, eg poll multiple with an aggregation strategy etc. Since we have 5+ options now, we cannot have so many pollEnrich methods in the DSL. It just makes the list of methods too big when people press ctrl + space in their route builder.
        Hide
        Ben O'Day added a comment -

        Thanks for the feedback Claus...I agree that a comprehensive solution is preferable. I only took a stab at a restricted approach because I often see a need to drain low volume JMS errors queues to periodically retry messages and it seems a logical option for the pollEnrich pattern. This is the use case I targeted to avoid complicating it with other options (max messages, aggregator, etc). That said, it may be too specific/limiting as it stands.

        Either way, I'd like to see some basic form of this in a 2.X release if possible. Another thought is to avoid doing this with a DSL/API change and just provide a helper class that implements this batch polling consumer pattern based on constructor args (timeout, source, target, aggregator, etc.). This might bridge the gap a bit...

        Show
        Ben O'Day added a comment - Thanks for the feedback Claus...I agree that a comprehensive solution is preferable. I only took a stab at a restricted approach because I often see a need to drain low volume JMS errors queues to periodically retry messages and it seems a logical option for the pollEnrich pattern. This is the use case I targeted to avoid complicating it with other options (max messages, aggregator, etc). That said, it may be too specific/limiting as it stands. Either way, I'd like to see some basic form of this in a 2.X release if possible. Another thought is to avoid doing this with a DSL/API change and just provide a helper class that implements this batch polling consumer pattern based on constructor args (timeout, source, target, aggregator, etc.). This might bridge the gap a bit...
        Hide
        Claus Ibsen added a comment -

        Well I fear that when you offer a functionality that seems so easy, eg pollMultiple = true (eg just set a boolean to true), then people may fall into a trap, and think it just works as easy as well.

        Even if we document that this option has many caveat and you should use it with care, we see many people on the mailing list, that dont read javadoc / eip docs / and whatever. And this is not something we can do much about. Its just how the world works today.

        About your use case, to drain a JMS error queue, then the current code is actually worse, as the consumed messages will be acked on the JMS broker one by one, so you can lose messages if Camel crashes, or the likes.

        I think we should consider having a new EIP for this use-case as the current pollEnrich is designed to consume a single message, eg to be a counter-part to poll which is actually producing to an endpoint.

        Until we have a comprehensive implementation that can support a new EIP / DSL, I suggest to start with a helper class (as you say), and then work our ways there.

        The current code is too weak, and open up a can of worms for the end users.

        Show
        Claus Ibsen added a comment - Well I fear that when you offer a functionality that seems so easy, eg pollMultiple = true (eg just set a boolean to true), then people may fall into a trap, and think it just works as easy as well. Even if we document that this option has many caveat and you should use it with care, we see many people on the mailing list, that dont read javadoc / eip docs / and whatever. And this is not something we can do much about. Its just how the world works today. About your use case, to drain a JMS error queue, then the current code is actually worse, as the consumed messages will be acked on the JMS broker one by one, so you can lose messages if Camel crashes, or the likes. I think we should consider having a new EIP for this use-case as the current pollEnrich is designed to consume a single message, eg to be a counter-part to poll which is actually producing to an endpoint. Until we have a comprehensive implementation that can support a new EIP / DSL, I suggest to start with a helper class (as you say), and then work our ways there. The current code is too weak, and open up a can of worms for the end users.
        Hide
        Ben O'Day added a comment - - edited

        Claus, fair enough...I'll roll back these changes and instead enhance the documentation a bit in this area and possibly add a helper PollingConsumer class.

        Show
        Ben O'Day added a comment - - edited Claus, fair enough...I'll roll back these changes and instead enhance the documentation a bit in this area and possibly add a helper PollingConsumer class.

          People

          • Assignee:
            Unassigned
            Reporter:
            Claus Ibsen
          • Votes:
            4 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:

              Development