Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-11124

camel-reactive-streams - Allow to silently ignore discarded messages

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.19.0
    • Fix Version/s: 2.19.0
    • Component/s: camel-reactive-streams
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      If you want to discard messages and use a policy of

      .to("reactive-streams:inbox?backpressureStrategy=LATEST");
      

      Then Camel will thrown an exception if its discarded

      java.lang.IllegalStateException: Exchange Exchange[ID-davsclaus-air-52789-1491737146614-0-450] discarded by backpressure strategy LATEST
      	at org.apache.camel.component.reactive.streams.engine.CamelSubscription.publish(CamelSubscription.java:235)
      	at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:98)
      	at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:123)
      	at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:44)
      	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
      	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
      	at org.apache.camel.processor.DelayProcessorSupport.processDelay(DelayProcessorSupport.java:100)
      	at org.apache.camel.processor.DelayProcessorSupport.process(DelayProcessorSupport.java:168)
      	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
      	at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:298)
      	at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:207)
      	at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:154)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      Which is likely not something you want as you told it to just keep latest.

      We should allow the user to deal with the discarded messages if he/she needs to in some other way.

      And we should maybe also keep runtime statistics of the number of discarded messages so this can be used for monitoring.

      Also we should introduce a specific CamelReactiveException so you can configure Camels error handler to react on it.

        Attachments

          Activity

            People

            • Assignee:
              davsclaus Claus Ibsen
              Reporter:
              davsclaus Claus Ibsen
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: