Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-10272

Unexpected behaviour in aggregator if recipient list is processed in parallel

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.16.3, 2.17.3
    • Fix Version/s: 2.19.0
    • Component/s: camel-core
    • Labels:
      None
    • Environment:

      MacOS 10.11.6, JRE 1.7.0_79

    • Estimated Complexity:
      Advanced

      Description

      Problem

      The oldExchange is null more than once in the aggregator if a recipient list is processed in parallel.

      Camel route

      In my Camel route, a recipient list is worked of in parallel:

       from("direct:start")
          .to("direct:pre")
          .recipientList().method(new MyRecipientListBuilder())
              .stopOnException()
              .aggregationStrategy(new MyAggregationStrategy())
              .parallelProcessing()
          .end()
          .bean(new MyPostProcessor());
      

      Snippet of MyAggregationStrategy:

      @Override
      @SuppressWarnings("unchecked")
      public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
          if (oldExchange == null) {
              // this is the case more than once which is not expected!
          }
          // ...
      

      oldExchange is null more than once which is not expected and which contradicts the contract with Camel.

      Analysis

      Unfortunately, I am not able to provide a (simple) unit test for comprehending the problem. Furthermore our (complex) unit tests are not deterministic due to the root cause of the problem.

      During the processing, Camel invokes ParallelAggregateTask.doAggregateInternal(). If aggregation is not done in parallel (as it is the case in our route), this is supposed to be done synchronously:

      protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) {
          if (strategy != null) {
              // prepare the exchanges for aggregation
              Exchange oldExchange = result.get();
              ExchangeHelper.prepareAggregation(oldExchange, exchange);
              result.set(strategy.aggregate(oldExchange, exchange));
          }
      } 
      

      However, is it possible that we face a race condition in doAggregateInternal even if this method is supposed to be invoked synchronously?

        Attachments

          Activity

            People

            • Assignee:
              davsclaus Claus Ibsen
              Reporter:
              Peter Keller Peter Keller
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: