Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
2.7.0
-
None
Description
Using ThrottlingInflightRoutePolicy can deadlock a route in some situations. The unit test pasted in below shows one such situation.
What happens is that the bottom route processes its first exchange, then suspends. Since it is suspended it will not take the next exchange from the seda queue, and so it will never check whether it should re-enable the route.
Perhaps it will work by putting the check to re-enable the route in the onExchangeBegin method, if that is called even when the route is suspended?
import org.apache.camel.Exchange; import org.apache.camel.Produce; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultInflightRepository; import org.apache.camel.impl.ThrottlingInflightRoutePolicy; import org.apache.camel.impl.ThrottlingInflightRoutePolicy.ThrottlingScope; import org.apache.camel.test.CamelTestSupport; public class ThrottleTest extends CamelTestSupport { @Produce(uri = "direct:input") protected ProducerTemplate input; protected MockEndpoint resultEndpoint; @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() { resultEndpoint = new MockEndpoint("mock:result"); resultEndpoint.setCamelContext(getContext()); getContext().setInflightRepository(new DefaultInflightRepository() { @Override public void add(Exchange exchange) { super.add(exchange); System.out.println(" add " + this.size()); } @Override public void remove(Exchange exchange) { super.remove(exchange); System.out.println(" remove " + this.size()); } }); ThrottlingInflightRoutePolicy throttler = new ThrottlingInflightRoutePolicy(); throttler.setMaxInflightExchanges(1); throttler.setScope(ThrottlingScope.Context); from("direct:input") .inOnly("seda:hey", "seda:hey", "seda:hey", "seda:hey", "seda:hey") .delay(1000) .inOnly("log:inputDone"); from("seda:hey") .routePolicy(throttler) .inOut("log:outputDone") .to(resultEndpoint); } }; } public void testThatAllExchangesAreReceived() throws Exception { input.sendBody("hello"); resultEndpoint.expectedMessageCount(5); resultEndpoint.assertIsSatisfied(); } }