Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-10229

camel-rabbitmq - Race condition when stopping context with autoack=false

    XMLWordPrintableJSON

Details

    • Unknown

    Description

      Run the following code and hit enter while one message is in unacked state (see RabbitMQ console):

      public static void main(String[] args) throws Exception {
      	CamelContext context = new DefaultCamelContext();
      
      	context.addRoutes(new RouteBuilder() {
      		@Override
      		public void configure() {
      			from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1")
      					.delayer(5000)
      					.setHeader("rabbitmq.ROUTING_KEY", constant("destinationQueue"))
      					.to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false")
      					.routeId("myRoute");
      		}
      	});
      	context.start();
      	new BufferedReader(new InputStreamReader(System.in)).readLine();
      	context.stop();
      }
      

      you get the following exception:

      com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e (amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1):
      com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
      	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
      	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
      	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303)
      	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043)
      	at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108)
      	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
      	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      I think that this is caused by a race condition between the main thread that runs channel.close() immediately after channel.basicCancel(tag) (see org.apache.camel.component.rabbitmq.RabbitConsumer) without waiting the channel.basicAck(deliveryTag, false) in handleDelivery().

      Another bad side effect is that you'll find a duplicate of a message on the destinationQueue. For example if you have 10 initial messages in sourceQueue and you hit enter while it's processing the third one, you'll get 7 messages in sourceQueue and 4 messages in destinationQueue.

      The correct behaviour should be the following:
      1) Stop consumer: channel.basicCancel(tag)
      2) Wait if there is a running consumer
      3) The consumer acks the previous message
      4) Close the channel

      Attachments

        Issue Links

          Activity

            People

              davsclaus Claus Ibsen
              pcan Piero Cangianiello
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: