Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.19.0
-
None
-
Unknown
Description
If you want to discard messages and use a policy of
.to("reactive-streams:inbox?backpressureStrategy=LATEST");
Then Camel will thrown an exception if its discarded
java.lang.IllegalStateException: Exchange Exchange[ID-davsclaus-air-52789-1491737146614-0-450] discarded by backpressure strategy LATEST at org.apache.camel.component.reactive.streams.engine.CamelSubscription.publish(CamelSubscription.java:235) at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:98) at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:123) at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:44) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) at org.apache.camel.processor.DelayProcessorSupport.processDelay(DelayProcessorSupport.java:100) at org.apache.camel.processor.DelayProcessorSupport.process(DelayProcessorSupport.java:168) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198) at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:298) at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:207) at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:154) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Which is likely not something you want as you told it to just keep latest.
We should allow the user to deal with the discarded messages if he/she needs to in some other way.
And we should maybe also keep runtime statistics of the number of discarded messages so this can be used for monitoring.
Also we should introduce a specific CamelReactiveException so you can configure Camels error handler to react on it.