Camel
  1. Camel
  2. CAMEL-3142

JpaPollingConsumer - So you can more easily work with pollEnrich

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: 2.4.0
    • Fix Version/s: Future
    • Component/s: camel-jpa
    • Labels:
      None

      Description

      See
      http://fusesource.com/forums/thread.jspa?threadID=2256&tstart=0

      The best solution is to add a JpaPollingConsumer implementation so it works better with pollEnrich

      1. camel-jpa_initialPolish.patch
        39 kB
        Mathieu Lalonde
      2. camel-jpa_JpaPollingConsumer_forReview.tar
        40 kB
        Mathieu Lalonde

        Activity

        Hide
        Claus Ibsen added a comment -

        This is a bit better now as the scheduled consumer will be suspended/resumed automatic.

        But I guess if the JPA returns X rows, we ought to set that as a List<X> in the message body.
        So we need a JpaPollingConsumer class to handle this.

        Show
        Claus Ibsen added a comment - This is a bit better now as the scheduled consumer will be suspended/resumed automatic. But I guess if the JPA returns X rows, we ought to set that as a List<X> in the message body. So we need a JpaPollingConsumer class to handle this.
        Hide
        Mathieu Lalonde added a comment -

        I'll have time this (long) weekend to work on this.

        Show
        Mathieu Lalonde added a comment - I'll have time this (long) weekend to work on this.
        Hide
        Mathieu Lalonde added a comment -

        I've attached some minor changes to fix the warnings.

        I @deprecated the JpaEndpoint constructors that didn't have required parameters to call their non-deprecated super(...). Let me know if that's ok or if you spot other problems with this.

        Show
        Mathieu Lalonde added a comment - I've attached some minor changes to fix the warnings. I @deprecated the JpaEndpoint constructors that didn't have required parameters to call their non-deprecated super(...). Let me know if that's ok or if you spot other problems with this.
        Hide
        Claus Ibsen added a comment -

        Thanks for the polish, I have committed the patch.

        Show
        Claus Ibsen added a comment - Thanks for the polish, I have committed the patch.
        Hide
        Mathieu Lalonde added a comment - - edited

        Status Update (with less confusion this time)

        At the moment I have a JpaPollingConsumer that returns a list of polled entities. It supports all the PollingConsumer interface returning null when it couldn't poll anything. It supports maxMessagesPerPoll but maybe it shouldn't since it puts all the entities as a List in one message.
        Here is a snippet that shows how I implemented receive(timeout). Added delay (consumer.delay works as well) as an option with 500 ms as default. It is used as well by the ScheduledPolled JPA consumer. Let me know if you'd prefer a new option altogether for JpaPollingConsumer.

            public Exchange receive(long timeout) {
                final StopWatch stopWatch = new StopWatch();
                stopWatch.restart();
                
                List<?> polledEntities = doReceive();
                while (polledEntities == null && stopWatch.taken() < timeout) {
                    try {
                        Thread.sleep(delay);
                        polledEntities = doReceive();
                    } catch (InterruptedException e) {
                        log.trace("received(long timeout) interrupted after {} ms: Are we stopping: {}", stopWatch.taken(), isStopping());
                    }
                }
                
                return createExchange(polledEntities);
            }
        
        Show
        Mathieu Lalonde added a comment - - edited Status Update (with less confusion this time) At the moment I have a JpaPollingConsumer that returns a list of polled entities. It supports all the PollingConsumer interface returning null when it couldn't poll anything. It supports maxMessagesPerPoll but maybe it shouldn't since it puts all the entities as a List in one message. Here is a snippet that shows how I implemented receive(timeout). Added delay (consumer.delay works as well) as an option with 500 ms as default. It is used as well by the ScheduledPolled JPA consumer. Let me know if you'd prefer a new option altogether for JpaPollingConsumer. public Exchange receive( long timeout) { final StopWatch stopWatch = new StopWatch(); stopWatch.restart(); List<?> polledEntities = doReceive(); while (polledEntities == null && stopWatch.taken() < timeout) { try { Thread .sleep(delay); polledEntities = doReceive(); } catch (InterruptedException e) { log.trace( "received( long timeout) interrupted after {} ms: Are we stopping: {}" , stopWatch.taken(), isStopping()); } } return createExchange(polledEntities); }
        Hide
        Mathieu Lalonde added a comment -

        The attached patch adds a JpaPollingConsumer along with some unit tests.

        I wasn't sure how to best factor out reusable logic between JpaConsumer & JpaPollingConsumer so I created JpaHelper (see TODO comment) & QueryFactoryBuilder. I didn't think inheritance was right there.

        The patch only contains changes to JpaEndpoint. If you like the JpaHelper, I can patch JpaConsumer to make use of it as well.

        Thanks for your patience,
        Mathieu

        Show
        Mathieu Lalonde added a comment - The attached patch adds a JpaPollingConsumer along with some unit tests. I wasn't sure how to best factor out reusable logic between JpaConsumer & JpaPollingConsumer so I created JpaHelper (see TODO comment) & QueryFactoryBuilder. I didn't think inheritance was right there. The patch only contains changes to JpaEndpoint. If you like the JpaHelper, I can patch JpaConsumer to make use of it as well. Thanks for your patience, Mathieu

          People

          • Assignee:
            Unassigned
            Reporter:
            Claus Ibsen
          • Votes:
            1 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:

              Development