Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-4149

ThrottlingInflightRoutePolicy can deadlock

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Fixed
    • 2.7.0
    • 2.7.3, 2.8.0
    • camel-core
    • None

    Description

      Using ThrottlingInflightRoutePolicy can deadlock a route in some situations. The unit test pasted in below shows one such situation.

      What happens is that the bottom route processes its first exchange, then suspends. Since it is suspended it will not take the next exchange from the seda queue, and so it will never check whether it should re-enable the route.

      Perhaps it will work by putting the check to re-enable the route in the onExchangeBegin method, if that is called even when the route is suspended?

      import org.apache.camel.Exchange;
      import org.apache.camel.Produce;
      import org.apache.camel.ProducerTemplate;
      import org.apache.camel.builder.RouteBuilder;
      import org.apache.camel.component.mock.MockEndpoint;
      import org.apache.camel.impl.DefaultInflightRepository;
      import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
      import org.apache.camel.impl.ThrottlingInflightRoutePolicy.ThrottlingScope;
      import org.apache.camel.test.CamelTestSupport;
      
      public class ThrottleTest extends CamelTestSupport {
      
      	@Produce(uri = "direct:input")
      	protected ProducerTemplate input;
      	protected MockEndpoint resultEndpoint;
      
      	@Override
      	protected RouteBuilder createRouteBuilder() throws Exception {
      	    return new RouteBuilder() {
                  public void configure() {
                  	resultEndpoint = new MockEndpoint("mock:result");
                  	resultEndpoint.setCamelContext(getContext());
                  	
                  	getContext().setInflightRepository(new DefaultInflightRepository() {
                  		@Override
                  	    public void add(Exchange exchange) {
                  			super.add(exchange);
                  			System.out.println("                        add " + this.size());
                  	    }
                  		@Override
                 	        public void remove(Exchange exchange) {
                  			super.remove(exchange);
                  			System.out.println("                     remove " + this.size());
                 	        }
                  		
                  	});
                  	
                  	ThrottlingInflightRoutePolicy throttler = new ThrottlingInflightRoutePolicy();
                  	
                  	throttler.setMaxInflightExchanges(1);
                  	throttler.setScope(ThrottlingScope.Context);
      
                  	from("direct:input")
                  		.inOnly("seda:hey", "seda:hey", "seda:hey", "seda:hey", "seda:hey")
                  		.delay(1000)
              		.inOnly("log:inputDone");
                  	
                  	from("seda:hey")
                  		.routePolicy(throttler)
                  		.inOut("log:outputDone")
                  		.to(resultEndpoint);
                  }
              };
      	}
      	
      	public void testThatAllExchangesAreReceived() throws Exception {
      		input.sendBody("hello");
      		
      		resultEndpoint.expectedMessageCount(5);
      		resultEndpoint.assertIsSatisfied();
      	}
      }
      

      Attachments

        Activity

          People

            davsclaus Claus Ibsen
            sm Søren Markert
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: