Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-20214

camel-core - Timeout tasks of parallel splitter block further message processing



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.14.10, 3.21.2
    • 3.21.4, 3.22.0, 4.0.4, 4.3.0
    • came-core
    • None
    • Unknown


      After an update from Camel 2.x to Camel 3.14.7 we noticed the following issue in all newer Camel 3 versions:

      A parallel splitter uses per default a ThreadPoolProfile with maxQueueSize = 1000.
      If the route is called 1001 times within the configured splitter timeout one message failes with "java.util.concurrent.RejectedExecutionException: Task rejected due queue size limit reached" which is thrown by the SizedScheduledExecutorService class.

      For each call of the route one "timeout" task is added to the DelayedWorkQueue which is used by the "Splitter- AggregateTask" thread. Each of this "timeout" tasks wait until its timeout is reached although the message processing is already completed. That means the 1000 messages are already processed sucessfully but the 1000 "timeout" tasks are still in the DelayedWorkQueue and block further message processing (until the timeout is reached) because the queue is full.

      We found some comments in the MulticastProcessor.doStart() method that an unbounded thread pool has to be used for the "Splitter-AggregateTask" thread to avoid issues.

      /* use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread and run the tasks when the task is submitted. If not then the aggregate task may not be able to run and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing */

      Therefore we assume the maxQueueSize should also be unlimited when the thread pool is created in MulticastProcessor.createAggregateExcutorService().

      A short test to reproduce the mentioned issue (maxQueueSize is set to 1 to reproduce the issue with only two calls):


      public class SplitterTest extends CamelTestSupport {    String payload1 = "<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
          String payload2 = "<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";
          public void testSplitter() throws InterruptedException, IOException {
              MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
              mockEndpoint.expectedMessageCount(4);        template.sendBody("direct:start", payload1);
              template.sendBody("direct:start", payload2);        assertMockEndpointsSatisfied();
          }    @Override
          protected RouteBuilder createRouteBuilder() throws Exception {
              return new RouteBuilder() {
                  public void configure() throws Exception {
                      ThreadPoolProfile myThreadPoolProfile = new ThreadPoolProfile("testProfile");
                      getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);                from("direct:start")




        Issue Links



              davsclaus Claus Ibsen
              AWeickel Andre Weickel
              0 Vote for this issue
              2 Start watching this issue