Camel
  1. Camel
  2. CAMEL-4650

NPE when using SEDA route and attaching an extra consumer

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 2.8.2
    • Fix Version/s: 2.8.3, 2.9.0
    • Component/s: camel-core
    • Labels:
      None
    • Environment:

      Java 6, Camel 2.8.2, Centos 4.

    • Estimated Complexity:
      Unknown

      Description

      I'm trying to construct a system for moving some seda queues over to
      ActiveMQ during system shutdown. What I did was create a Route that
      connects to some of my seda queues and then drains the queue to activemq.

      Basicly I got two routes, the drainer:

      from("seda:" + sedaId + "?size=1000")
      .routeId(routeName +
      sedaIs).noAutoStartup().to(activeMQFailuresQueue);

      And the main route:

      from("seda:" + sedaId + "?size=1000")
      .routeId(routeName + sedaIs).to(SomeProcessor);

      Now, sometimes the main route stalls for various reasons I need to
      restart the jvm process it is running in, so I start the first route.
      But when trying this in production, I got:

      java.lang.NullPointerException
      at
      org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:78)
      at
      org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:210)
      at
      org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:155)
      at
      org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)
      at
      java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:619)

      Bvahdat added a comment:
      just a very tiny pointer:

      Looking at the source it seems that the
      SedaEndpoint.getConsumerMulticastProcessor() method returns 'null' causing
      the NPE, as the condition:

      multicastStarted == false || consumerMulticastProcessor == null

      is true.

        Issue Links

          Activity

          Willem Jiang made changes -
          Fix Version/s 2.8.3 [ 12318649 ]
          Fix Version/s 2.8.4 [ 12319072 ]
          Willem Jiang made changes -
          Fix Version/s 2.8.4 [ 12319072 ]
          Fix Version/s 2.8.3 [ 12318649 ]
          Claus Ibsen made changes -
          Status Reopened [ 4 ] Resolved [ 5 ]
          Fix Version/s 2.8.3 [ 12318649 ]
          Fix Version/s 2.9.0 [ 12316374 ]
          Resolution Fixed [ 1 ]
          Hide
          Claus Ibsen added a comment -

          The NPE is now fixed and an error message is reported.

          I created a new ticket for the check if multiple consumers is supported when manually starting a route.

          Show
          Claus Ibsen added a comment - The NPE is now fixed and an error message is reported. I created a new ticket for the check if multiple consumers is supported when manually starting a route.
          Claus Ibsen <davsclaus@apache.org> committed b6c929c80b6d1f7d598dc3edda2e6e57017f65e5 (2 files)
          Claus Ibsen made changes -
          Link This issue relates to CAMEL-4680 [ CAMEL-4680 ]
          Claus Ibsen committed 1202148 (2 files)
          Claus Ibsen <davsclaus@apache.org> committed c80666c52c4cbc395fa11a31166927b1eb75d356 (2 files)
          Hide
          Babak Vahdat added a comment -

          I think that the problem/challenge resides in the fact that camel-core doesn't realize that there's already another route consuming from the same URI (in this case the Seda consumer with multipleConsumers not enabled) if one of the routes is not autoEnabled and the other one is already up and consuming and one starts that not-autoEnabled route afterwards through the provided API.

          However if both routes are already autoEnabled (which is the default) then this case gets caught correctly at the startup of the camel context:

          org.apache.camel.FailedToStartRouteException: Failed to start route route2 because of Multiple consumers for the same endpoint is not allowed: Endpoint[seda://foo?concurrentConsumers=5]
          	at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRouteConsumers(DefaultCamelContext.java:1916)
          	at org.apache.camel.impl.DefaultCamelContext.doStartRouteConsumers(DefaultCamelContext.java:1892)
          	at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:1820)
          	at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:1604)
          	at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:1494)
          	at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:1381)
          	at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:60)
          	at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:1359)
          	at org.apache.camel.ContextTestSupport.startCamelContext(ContextTestSupport.java:171)
          	at org.apache.camel.ContextTestSupport.setUp(ContextTestSupport.java:114)
          	at junit.framework.TestCase.runBare(TestCase.java:132)
          	at org.apache.camel.TestSupport.runBare(TestSupport.java:59)
          	at junit.framework.TestResult$1.protect(TestResult.java:110)
          	at junit.framework.TestResult.runProtected(TestResult.java:128)
          	at junit.framework.TestResult.run(TestResult.java:113)
          	at junit.framework.TestCase.run(TestCase.java:124)
          	at junit.framework.TestSuite.runTest(TestSuite.java:232)
          	at junit.framework.TestSuite.run(TestSuite.java:227)
          	at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:83)
          	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
          	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
          	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
          	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
          	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
          	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
          
          Show
          Babak Vahdat added a comment - I think that the problem/challenge resides in the fact that camel-core doesn't realize that there's already another route consuming from the same URI (in this case the Seda consumer with multipleConsumers not enabled) if one of the routes is not autoEnabled and the other one is already up and consuming and one starts that not-autoEnabled route afterwards through the provided API. However if both routes are already autoEnabled (which is the default) then this case gets caught correctly at the startup of the camel context: org.apache.camel.FailedToStartRouteException: Failed to start route route2 because of Multiple consumers for the same endpoint is not allowed: Endpoint[seda: //foo?concurrentConsumers=5] at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRouteConsumers(DefaultCamelContext.java:1916) at org.apache.camel.impl.DefaultCamelContext.doStartRouteConsumers(DefaultCamelContext.java:1892) at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:1820) at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:1604) at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:1494) at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:1381) at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:60) at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:1359) at org.apache.camel.ContextTestSupport.startCamelContext(ContextTestSupport.java:171) at org.apache.camel.ContextTestSupport.setUp(ContextTestSupport.java:114) at junit.framework.TestCase.runBare(TestCase.java:132) at org.apache.camel.TestSupport.runBare(TestSupport.java:59) at junit.framework.TestResult$1.protect(TestResult.java:110) at junit.framework.TestResult.runProtected(TestResult.java:128) at junit.framework.TestResult.run(TestResult.java:113) at junit.framework.TestCase.run(TestCase.java:124) at junit.framework.TestSuite.runTest(TestSuite.java:232) at junit.framework.TestSuite.run(TestSuite.java:227) at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:83) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
          Claus Ibsen made changes -
          Resolution Not A Problem [ 8 ]
          Status Closed [ 6 ] Reopened [ 4 ]
          Assignee Claus Ibsen [ davsclaus ]
          Hide
          Claus Ibsen added a comment -

          Ah sorry I can still reproduce the NPE. Will try to add some configuration check.

          Show
          Claus Ibsen added a comment - Ah sorry I can still reproduce the NPE. Will try to add some configuration check.
          Hide
          Claus Ibsen added a comment -

          The unit test above can be fixed if you start the route correctly using the startRoute method form CamelContext

          Show
          Claus Ibsen added a comment - The unit test above can be fixed if you start the route correctly using the startRoute method form CamelContext
          Hide
          Babak Vahdat added a comment -

          Absolutely agree on that.

          Show
          Babak Vahdat added a comment - Absolutely agree on that.
          Hide
          Claus Ibsen added a comment -

          Regardless if its an API misuse etc. then Camel should shield itself better and report a IllegalArgumentException or IllegalStateException etc. with a better error description what is wrong.

          We should generally avoid any NPE being thrown inside Camel code.

          Show
          Claus Ibsen added a comment - Regardless if its an API misuse etc. then Camel should shield itself better and report a IllegalArgumentException or IllegalStateException etc. with a better error description what is wrong. We should generally avoid any NPE being thrown inside Camel code.
          Tarjei Huse made changes -
          Field Original Value New Value
          Status Open [ 1 ] Closed [ 6 ]
          Resolution Not A Problem [ 8 ]
          Hide
          Tarjei Huse added a comment -

          Closed as API misuse.

          Show
          Tarjei Huse added a comment - Closed as API misuse.
          Hide
          Tarjei Huse added a comment -

          Babak, thank you for working on this issue.

          As it stands, it is an api misuse, and I will close the bug.

          T

          Show
          Tarjei Huse added a comment - Babak, thank you for working on this issue. As it stands, it is an api misuse, and I will close the bug. T
          Hide
          Babak Vahdat added a comment -

          I could reproduce your issue on the trunk through a 2 liner change of [1] as the following:

          public class SedaConcurrentConsumersTest extends ContextTestSupport {
          
              public void testSendToSeda() throws Exception {
                  MockEndpoint mock = getMockEndpoint("mock:result");
                  mock.expectedBodiesReceived("Hello World");
          
                  template.sendBody("seda:foo?concurrentConsumers=5", "Hello World");
          
                  assertMockEndpointsSatisfied();
                  
                  context.getRoutes().get(0).getConsumer().start();
                  template.sendBody("seda:foo?concurrentConsumers=5", "Claus absence today is a showstopper!");
              }
          
              @Override
              protected RouteBuilder createRouteBuilder() throws Exception {
                  return new RouteBuilder() {
                      @Override
                      public void configure() throws Exception {
                          from("seda:foo?concurrentConsumers=5").noAutoStartup().to("mock:result");
                          
                          from("seda:foo?concurrentConsumers=5").to("mock:result");
                      }
                  };
              }
          }
          

          Which blows up with a NPE exactly on the same place as yours:

          2011-11-10 17:58:42,108 [main           ] INFO  DefaultCamelContext            - Apache Camel  (CamelContext: camel-1) is starting
          2011-11-10 17:58:42,155 [main           ] INFO  AnnotationTypeConverterLoader  - Found 3 packages with 15 @Converter classes to load
          2011-11-10 17:58:42,217 [main           ] INFO  LazyLoadingTypeConverter       - Loaded 163 core type converters (total 163 type converters)
          2011-11-10 17:58:42,327 [main           ] INFO  DefaultCamelContext            - Cannot start route route1 as its configured with autoStartup=false
          2011-11-10 17:58:42,327 [main           ] INFO  DefaultCamelContext            - Route: route2 started and consuming from: Endpoint[seda://foo?concurrentConsumers=5]
          2011-11-10 17:58:42,327 [main           ] INFO  DefaultCamelContext            - Total 2 routes, of which 1 is started.
          2011-11-10 17:58:42,327 [main           ] INFO  DefaultCamelContext            - Apache Camel  (CamelContext: camel-1) started in 0.282 seconds
          2011-11-10 17:58:42,358 [main           ] INFO  MockEndpoint                   - Asserting: Endpoint[mock://result] is satisfied
          2011-11-10 17:58:42,358 [main           ] INFO  DefaultCamelContext            - Apache Camel  (CamelContext:camel-1) is shutting down
          2011-11-10 17:58:42,358 [main           ] INFO  DefaultShutdownStrategy        - Starting to graceful shutdown 1 routes (timeout 10 seconds)
          2011-11-10 17:58:42,374 [#1 - seda://foo] ERROR SedaConsumer                   - Error processing exchange. Exchange[Message: Second Hello World]. Caused by: [java.lang.NullPointerException - null]
          java.lang.NullPointerException
          	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[file:/D:/Data/eclipse-workspace/camel/camel-core/target/classes/:]
          	at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:210)[file:/D:/Data/eclipse-workspace/camel/camel-core/target/classes/:]
          	at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:155)[file:/D:/Data/eclipse-workspace/camel/camel-core/target/classes/:]
          	at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)[file:/D:/Data/eclipse-workspace/camel/camel-core/target/classes/:]
          	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_29]
          	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_29]
          	at java.lang.Thread.run(Thread.java:662)[:1.6.0_29]
          2011-11-10 17:58:43,374 [ - ShutdownTask] INFO  DefaultShutdownStrategy        - Route: route2 shutdown complete, was consuming from: Endpoint[seda://foo?concurrentConsumers=5]
          2011-11-10 17:58:43,374 [main           ] INFO  DefaultShutdownStrategy        - Graceful shutdown of 1 routes completed in 1 seconds
          2011-11-10 17:58:43,374 [main           ] INFO  DefaultInflightRepository      - Shutting down with no inflight exchanges.
          2011-11-10 17:58:43,374 [main           ] INFO  DefaultCamelContext            - Uptime: 1.329 seconds
          2011-11-10 17:58:43,374 [main           ] INFO  DefaultCamelContext            - Apache Camel  (CamelContext: camel-1) is shutdown in 1.016 seconds
          

          This is a clear evidence for me that while you start the drainer route, the main route is still running causing the NPE as a side effect.

          Again IMHO you misuse the Camel's provided Java-DSL. See my previous comments on this ticket for a possible fix of it.

          I leave the closing of this ticket to you, as to my understanding it's definitely not a bug but just a side effect of the API-misusage.

          Good luck
          Babak

          [1] https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentConsumersTest.java

          Show
          Babak Vahdat added a comment - I could reproduce your issue on the trunk through a 2 liner change of [1] as the following: public class SedaConcurrentConsumersTest extends ContextTestSupport { public void testSendToSeda() throws Exception { MockEndpoint mock = getMockEndpoint( "mock:result" ); mock.expectedBodiesReceived( "Hello World" ); template.sendBody( "seda:foo?concurrentConsumers=5" , "Hello World" ); assertMockEndpointsSatisfied(); context.getRoutes().get(0).getConsumer().start(); template.sendBody( "seda:foo?concurrentConsumers=5" , "Claus absence today is a showstopper!" ); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from( "seda:foo?concurrentConsumers=5" ).noAutoStartup().to( "mock:result" ); from( "seda:foo?concurrentConsumers=5" ).to( "mock:result" ); } }; } } Which blows up with a NPE exactly on the same place as yours: 2011-11-10 17:58:42,108 [main ] INFO DefaultCamelContext - Apache Camel (CamelContext: camel-1) is starting 2011-11-10 17:58:42,155 [main ] INFO AnnotationTypeConverterLoader - Found 3 packages with 15 @Converter classes to load 2011-11-10 17:58:42,217 [main ] INFO LazyLoadingTypeConverter - Loaded 163 core type converters (total 163 type converters) 2011-11-10 17:58:42,327 [main ] INFO DefaultCamelContext - Cannot start route route1 as its configured with autoStartup= false 2011-11-10 17:58:42,327 [main ] INFO DefaultCamelContext - Route: route2 started and consuming from: Endpoint[seda: //foo?concurrentConsumers=5] 2011-11-10 17:58:42,327 [main ] INFO DefaultCamelContext - Total 2 routes, of which 1 is started. 2011-11-10 17:58:42,327 [main ] INFO DefaultCamelContext - Apache Camel (CamelContext: camel-1) started in 0.282 seconds 2011-11-10 17:58:42,358 [main ] INFO MockEndpoint - Asserting: Endpoint[mock: //result] is satisfied 2011-11-10 17:58:42,358 [main ] INFO DefaultCamelContext - Apache Camel (CamelContext:camel-1) is shutting down 2011-11-10 17:58:42,358 [main ] INFO DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 10 seconds) 2011-11-10 17:58:42,374 [#1 - seda: //foo] ERROR SedaConsumer - Error processing exchange. Exchange[Message: Second Hello World]. Caused by: [java.lang.NullPointerException - null ] java.lang.NullPointerException at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[file:/D:/Data/eclipse-workspace/camel/camel-core/target/classes/:] at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:210)[file:/D:/Data/eclipse-workspace/camel/camel-core/target/classes/:] at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:155)[file:/D:/Data/eclipse-workspace/camel/camel-core/target/classes/:] at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:129)[file:/D:/Data/eclipse-workspace/camel/camel-core/target/classes/:] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_29] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_29] at java.lang. Thread .run( Thread .java:662)[:1.6.0_29] 2011-11-10 17:58:43,374 [ - ShutdownTask] INFO DefaultShutdownStrategy - Route: route2 shutdown complete, was consuming from: Endpoint[seda: //foo?concurrentConsumers=5] 2011-11-10 17:58:43,374 [main ] INFO DefaultShutdownStrategy - Graceful shutdown of 1 routes completed in 1 seconds 2011-11-10 17:58:43,374 [main ] INFO DefaultInflightRepository - Shutting down with no inflight exchanges. 2011-11-10 17:58:43,374 [main ] INFO DefaultCamelContext - Uptime: 1.329 seconds 2011-11-10 17:58:43,374 [main ] INFO DefaultCamelContext - Apache Camel (CamelContext: camel-1) is shutdown in 1.016 seconds This is a clear evidence for me that while you start the drainer route, the main route is still running causing the NPE as a side effect. Again IMHO you misuse the Camel's provided Java-DSL. See my previous comments on this ticket for a possible fix of it. I leave the closing of this ticket to you, as to my understanding it's definitely not a bug but just a side effect of the API-misusage. Good luck Babak [1] https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaConcurrentConsumersTest.java
          Hide
          Babak Vahdat added a comment - - edited

          Yeah try stopping the main route first before starting the drainer one and see if you can still reproduce the NPE. And before starting the drainer route make sure that the main route is really stopped.

          For example with a JMX client like JConsole you can check it through the getState() operation of the main route, of course I assume you have not disabled the Camel's JMX support, see:

          http://camel.apache.org/camel-jmx.html#CamelJMX-UsingJMXtomanageApacheCamel

          Show
          Babak Vahdat added a comment - - edited Yeah try stopping the main route first before starting the drainer one and see if you can still reproduce the NPE. And before starting the drainer route make sure that the main route is really stopped. For example with a JMX client like JConsole you can check it through the getState() operation of the main route, of course I assume you have not disabled the Camel's JMX support, see: http://camel.apache.org/camel-jmx.html#CamelJMX-UsingJMXtomanageApacheCamel
          Hide
          Tarjei Huse added a comment -

          Hi Babak,

          1. Yes both routes are in the same context.

          2. Yes the point is to consume from the same seda endpoint. Think of this endpoint as a buffer between processes that take differing amounts of time and should not happen sequentially.

          3. I got an errorhandler for shorter errors like this:

          .onException(java.net.ConnectException.class, SolrServerException.class, SolrException.class)
          .log(LoggingLevel.ERROR,routeName+ ":Connection down")
          .maximumRedeliveries(4)
          .backOffMultiplier(1)
          .redeliveryDelay(redeliveryDelay)
          .to("log:SolrRouteError?showAll=true&showStackTrace=true&showHeaders=true&showException=true")
          .to(getSolrFailuresQueue())
          .handled(true)
          .end()

          The problem is that on some errors the time it takes for a request can be quite long so the seda queue builds up.

          Maybe I should shut down the consuming route before starting the drainer - would that help?

          Show
          Tarjei Huse added a comment - Hi Babak, 1. Yes both routes are in the same context. 2. Yes the point is to consume from the same seda endpoint. Think of this endpoint as a buffer between processes that take differing amounts of time and should not happen sequentially. 3. I got an errorhandler for shorter errors like this: .onException(java.net.ConnectException.class, SolrServerException.class, SolrException.class) .log(LoggingLevel.ERROR,routeName+ ":Connection down") .maximumRedeliveries(4) .backOffMultiplier(1) .redeliveryDelay(redeliveryDelay) .to("log:SolrRouteError?showAll=true&showStackTrace=true&showHeaders=true&showException=true") .to(getSolrFailuresQueue()) .handled(true) .end() The problem is that on some errors the time it takes for a request can be quite long so the seda queue builds up. Maybe I should shut down the consuming route before starting the drainer - would that help?
          Hide
          Babak Vahdat added a comment - - edited

          @Tarjei, could you please give me some hints:

          Are your both routes (you called them the main & the drainer one) in the same camel context? If so I wonder how it should work at all as you would consume twice using exactly the same seda uri in the same camel context as soon as you start the drainer route (for example through jmx)! IMHO the camel behaviour in this case would be undefined / unpredictable when a new message would arrive at this time to the seda endpoint.

          The Java-DSL doesn't inhibit you to do that and the code would of course compile but at runtime using the from() clause you would concurrently consume twice using the same seda endpoint java object, again the uri by both routes seems to me to be exactly the same:

          from("seda:" + sedaId + "?size=1000")
          

          I propose to use the try/catch or onException clause to send the failed messsages (the not processed ones) to some endpoints (for example "direct:failed") and as soon as you start the drainer route you would consume from that endpoint in drainer route instead of consuming concurrently from the seda endpoint together with the main route at the same time.

          What do you think?

          Show
          Babak Vahdat added a comment - - edited @Tarjei, could you please give me some hints: Are your both routes (you called them the main & the drainer one) in the same camel context? If so I wonder how it should work at all as you would consume twice using exactly the same seda uri in the same camel context as soon as you start the drainer route (for example through jmx)! IMHO the camel behaviour in this case would be undefined / unpredictable when a new message would arrive at this time to the seda endpoint. The Java-DSL doesn't inhibit you to do that and the code would of course compile but at runtime using the from() clause you would concurrently consume twice using the same seda endpoint java object, again the uri by both routes seems to me to be exactly the same: from( "seda:" + sedaId + "?size=1000" ) I propose to use the try/catch or onException clause to send the failed messsages (the not processed ones) to some endpoints (for example "direct:failed") and as soon as you start the drainer route you would consume from that endpoint in drainer route instead of consuming concurrently from the seda endpoint together with the main route at the same time . What do you think?
          Hide
          Babak Vahdat added a comment -

          I did a typo in the user forum about the condition which holds, causing the NPE. Correctly it's:

          multicastStarted == true || consumerMulticastProcessor == null
          

          Looking at SedaConsumer.sendToConsumers() it's:

              protected void sendToConsumers(Exchange exchange) throws Exception {
                  int size = endpoint.getConsumers().size();
          
                  // if there are multiple consumers then multicast to them
                  if (size > 1) {
          
                      if (LOG.isDebugEnabled()) {
                          LOG.debug("Multicasting to {} consumers for Exchange: {}", endpoint.getConsumers().size(), exchange);
                      }
                     
                      // use a multicast processor to process it
                      MulticastProcessor mp = endpoint.getConsumerMulticastProcessor();
          
                      // and use the asynchronous routing engine to support it
                      AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
                          public void done(boolean doneSync) {
                              // noop
                          }
                      });
                  } else {
                      // use the regular processor and use the asynchronous routing engine to support it
                      AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() {
                          public void done(boolean doneSync) {
                              // noop
                          }
                      });
                  }
              }
          

          Which as you see does call:

          endpoint.getConsumerMulticastProcessor();
          

          But doesn't take into the account that the returned reference (of type MulticastProcessor) could be null. In your case it was null, as apparently the condition I mentioned in user forum holds.

          I would love to provide a patch for this, but the SedaEndpoint is to me one of those non-trivial camel's endpoint (concurrency was / is never trivial ).

          If some camel riders could give me a hint, I would assign the ticket to myself and would provide a patch (including a unit-test) proving the fix.

          Show
          Babak Vahdat added a comment - I did a typo in the user forum about the condition which holds, causing the NPE. Correctly it's: multicastStarted == true || consumerMulticastProcessor == null Looking at SedaConsumer.sendToConsumers() it's: protected void sendToConsumers(Exchange exchange) throws Exception { int size = endpoint.getConsumers().size(); // if there are multiple consumers then multicast to them if (size > 1) { if (LOG.isDebugEnabled()) { LOG.debug( "Multicasting to {} consumers for Exchange: {}" , endpoint.getConsumers().size(), exchange); } // use a multicast processor to process it MulticastProcessor mp = endpoint.getConsumerMulticastProcessor(); // and use the asynchronous routing engine to support it AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() { public void done( boolean doneSync) { // noop } }); } else { // use the regular processor and use the asynchronous routing engine to support it AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() { public void done( boolean doneSync) { // noop } }); } } Which as you see does call: endpoint.getConsumerMulticastProcessor(); But doesn't take into the account that the returned reference (of type MulticastProcessor) could be null. In your case it was null, as apparently the condition I mentioned in user forum holds. I would love to provide a patch for this, but the SedaEndpoint is to me one of those non-trivial camel's endpoint (concurrency was / is never trivial ). If some camel riders could give me a hint, I would assign the ticket to myself and would provide a patch (including a unit-test) proving the fix.
          Tarjei Huse created issue -

            People

            • Assignee:
              Claus Ibsen
              Reporter:
              Tarjei Huse
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development