Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-10229

camel-rabbitmq - Race condition when stopping context with autoack=false

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Unknown

    Description

      Run the following code and hit enter while one message is in unacked state (see RabbitMQ console):

      public static void main(String[] args) throws Exception {
      	CamelContext context = new DefaultCamelContext();
      
      	context.addRoutes(new RouteBuilder() {
      		@Override
      		public void configure() {
      			from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1")
      					.delayer(5000)
      					.setHeader("rabbitmq.ROUTING_KEY", constant("destinationQueue"))
      					.to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false")
      					.routeId("myRoute");
      		}
      	});
      	context.start();
      	new BufferedReader(new InputStreamReader(System.in)).readLine();
      	context.stop();
      }
      

      you get the following exception:

      com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e (amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1):
      com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
      	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
      	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
      	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303)
      	at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043)
      	at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108)
      	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
      	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      I think that this is caused by a race condition between the main thread that runs channel.close() immediately after channel.basicCancel(tag) (see org.apache.camel.component.rabbitmq.RabbitConsumer) without waiting the channel.basicAck(deliveryTag, false) in handleDelivery().

      Another bad side effect is that you'll find a duplicate of a message on the destinationQueue. For example if you have 10 initial messages in sourceQueue and you hit enter while it's processing the third one, you'll get 7 messages in sourceQueue and 4 messages in destinationQueue.

      The correct behaviour should be the following:
      1) Stop consumer: channel.basicCancel(tag)
      2) Wait if there is a running consumer
      3) The consumer acks the previous message
      4) Close the channel

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            davsclaus Claus Ibsen
            pcan Piero Cangianiello
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment