Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-11124

camel-reactive-streams - Allow to silently ignore discarded messages

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.19.0
    • 2.19.0
    • camel-reactive-streams
    • None
    • Unknown

    Description

      If you want to discard messages and use a policy of

      .to("reactive-streams:inbox?backpressureStrategy=LATEST");
      

      Then Camel will thrown an exception if its discarded

      java.lang.IllegalStateException: Exchange Exchange[ID-davsclaus-air-52789-1491737146614-0-450] discarded by backpressure strategy LATEST
      	at org.apache.camel.component.reactive.streams.engine.CamelSubscription.publish(CamelSubscription.java:235)
      	at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:98)
      	at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:123)
      	at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:44)
      	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
      	at org.apache.camel.processor.DelayProcessorSupport.processDelay(DelayProcessorSupport.java:100)
      	at org.apache.camel.processor.DelayProcessorSupport.process(DelayProcessorSupport.java:168)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:298)
      	at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:207)
      	at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:154)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      Which is likely not something you want as you told it to just keep latest.

      We should allow the user to deal with the discarded messages if he/she needs to in some other way.

      And we should maybe also keep runtime statistics of the number of discarded messages so this can be used for monitoring.

      Also we should introduce a specific CamelReactiveException so you can configure Camels error handler to react on it.

      Attachments

        Activity

          People

            davsclaus Claus Ibsen
            davsclaus Claus Ibsen
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: