Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-12906

Strange comportement with aggregator

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Won't Fix
    • 2.22.1
    • None
    • camel-activemq
    • None
    • Unknown

    Description

      /!\ Its about camel aggregator and error handling during shutdown phase

      My case is about graceful shutdown and error handling in case of aggregator first one:

      In this example i will get RejectedExecutionException during the shutdown and the exception is not catched by the onException or errorHandler methods as expected so i will just lost message.

      @Component
      public class AmqRoute extends RouteBuilder {
      
          @Override
          public void configure() throws Exception {
              errors();
      
              from("timer:foo?period=1000")
                      .log("1")
                      .transform().body(() -> "DATA " + RandomStringUtils.randomAlphanumeric(10))
                      .convertBodyTo(String.class)
                      .to(amq());
      
              from(amq()).to("direct:foo");
      
              from("direct:foo")
                      .delay(3000)
                      .aggregate(constant(true), new GroupedBodyAggregationStrategy())
                      .completionSize(10)
                      .forceCompletionOnStop()
                      .log("${body}");
          }
      
          public void errors() {
              onException(Exception.class)
                      .useOriginalMessage()
                      .to("activemq:recovery")
                      .handled(true)
                      .onRedelivery(exchange -> System.err.println("push to amq"));
      
              errorHandler(deadLetterChannel("log:dead?level=ERROR"));
          }
      
          private static String amq() {
              String amq = "activemq:data";
      
              amq += "?transacted=true";
      
              return amq;
          }
      
      }
      

      Stacktrace:

      java.util.concurrent.RejectedExecutionException: null
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435) ~[camel-core-2.22.1.jar:2.22.1]
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[camel-core-2.22.1.jar:2.22.1]
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[camel-core-2.22.1.jar:2.22.1]
      	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) ~[camel-core-2.22.1.jar:2.22.1]
      	at org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791) ~[camel-core-2.22.1.jar:2.22.1]
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
      	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
      

      In this one (using doTry before aggregate) there is no exception throwed, that the interesting point because i expect at least catched RejectedExecutionException and finally i didn't lost message in this case !

      @Component
      public class AmqRoute extends RouteBuilder {
      
          @Override
          public void configure() throws Exception {
             // errors();
      
              from("timer:foo?period=1000")
                      .log("1")
                      .transform().body(() -> "DATA " + RandomStringUtils.randomAlphanumeric(10))
                      .convertBodyTo(String.class)
                      .to(amq());
      
              from(amq()).to("direct:foo");
      
              from("direct:foo")
                      .delay(3000)
                      .doTry() // Strange /!\
                      .aggregate(constant(true), new GroupedBodyAggregationStrategy())
                      .completionSize(10)
                      .forceCompletionOnStop()
                      .log("${body}");
          }
      
          public void errors() {
              onException(Exception.class)
                      .useOriginalMessage()
                      .to("activemq:recovery")
                      .handled(true)
                      .onRedelivery(exchange -> System.err.println("push to amq"));
      
              errorHandler(deadLetterChannel("log:dead?level=ERROR"));
          }
      
          private static String amq() {
              String amq = "activemq:data";
      
              amq += "?transacted=true";
      
              return amq;
          }
      
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            michael992 michael elbaz
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: