Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Won't Fix
-
2.22.1
-
None
-
None
-
Unknown
Description
/!\ Its about camel aggregator and error handling during shutdown phase
My case is about graceful shutdown and error handling in case of aggregator first one:
In this example i will get RejectedExecutionException during the shutdown and the exception is not catched by the onException or errorHandler methods as expected so i will just lost message.
@Component public class AmqRoute extends RouteBuilder { @Override public void configure() throws Exception { errors(); from("timer:foo?period=1000") .log("1") .transform().body(() -> "DATA " + RandomStringUtils.randomAlphanumeric(10)) .convertBodyTo(String.class) .to(amq()); from(amq()).to("direct:foo"); from("direct:foo") .delay(3000) .aggregate(constant(true), new GroupedBodyAggregationStrategy()) .completionSize(10) .forceCompletionOnStop() .log("${body}"); } public void errors() { onException(Exception.class) .useOriginalMessage() .to("activemq:recovery") .handled(true) .onRedelivery(exchange -> System.err.println("push to amq")); errorHandler(deadLetterChannel("log:dead?level=ERROR")); } private static String amq() { String amq = "activemq:data"; amq += "?transacted=true"; return amq; } }
Stacktrace:
java.util.concurrent.RejectedExecutionException: null at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435) ~[camel-core-2.22.1.jar:2.22.1] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[camel-core-2.22.1.jar:2.22.1] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[camel-core-2.22.1.jar:2.22.1] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) ~[camel-core-2.22.1.jar:2.22.1] at org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791) ~[camel-core-2.22.1.jar:2.22.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
In this one (using doTry before aggregate) there is no exception throwed, that the interesting point because i expect at least catched RejectedExecutionException and finally i didn't lost message in this case !
@Component public class AmqRoute extends RouteBuilder { @Override public void configure() throws Exception { // errors(); from("timer:foo?period=1000") .log("1") .transform().body(() -> "DATA " + RandomStringUtils.randomAlphanumeric(10)) .convertBodyTo(String.class) .to(amq()); from(amq()).to("direct:foo"); from("direct:foo") .delay(3000) .doTry() // Strange /!\ .aggregate(constant(true), new GroupedBodyAggregationStrategy()) .completionSize(10) .forceCompletionOnStop() .log("${body}"); } public void errors() { onException(Exception.class) .useOriginalMessage() .to("activemq:recovery") .handled(true) .onRedelivery(exchange -> System.err.println("push to amq")); errorHandler(deadLetterChannel("log:dead?level=ERROR")); } private static String amq() { String amq = "activemq:data"; amq += "?transacted=true"; return amq; } }