Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-19575

camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown is triggered.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.21.0
    • 3.14.10, 3.20.7, 3.21.1, 3.22.0
    • None
    • Unknown

    Description

      Minimal Repro steps - 

      • create a rabbitmq docker container.
        • docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management 
      • create a queue named 'demo' in the rabbitmq
      • Add 2 messages into the queue.
      • Run the following code - 
        • import org.apache.camel.CamelContext;
          import org.apache.camel.Exchange;
          import org.apache.camel.Processor;
          import org.apache.camel.builder.RouteBuilder;
          import org.apache.camel.impl.DefaultCamelContext;
          
          public class RabitMqMain
          {
             public static void main(String[] args) throws Exception
             {
                CamelContext camelContext = new DefaultCamelContext();
                camelContext.addRoutes(new RouteBuilder()
                {
                   @Override
                   public void configure() throws Exception
                   {
                      String endpoint = "rabbitmq:?" +
                            "prefetchCount=1&" +
                            "autoAck=false&" +
                            "automaticRecoveryEnabled=true&" +
                            "skipExchangeDeclare=true&" +
                            "synchronous=true&" +
                            "skipQueueDeclare=true&" +
                            "skipQueueBind=true&" +
                            "portNumber=5672&" +
                            "prefetchSize=0&" +
                            "vhost=%2F&" +
                            "hostname=localhost&" +
                            "password=guest&" +
                            "requestedHeartbeat=60&" +
                            "topologyRecoveryEnabled=false&" +
                            "prefetchEnabled=true&" +
                            "connectionTimeout=60000&" +
                            "networkRecoveryInterval=10000&" +
                            "concurrentConsumers=3&" +
                            "queue=demo&" +
                            "username=guest";
                      from(endpoint).routeId("rabit")
                            .process(new Processor()
                            {
                               @Override
                               public void process(Exchange exchange) throws Exception
                               {
                                  String msg = exchange.toString();
                                  System.out.println("started processing message - "+ msg);
                                  Thread.sleep(25 * 1000); // 25sec
                                  System.out.println("finished processing message - "+ msg);
                               }
                            });
                   }
                });
                camelContext.start();
                Thread.sleep(10 * 1000);
                System.out.println("STOPPING");
                camelContext.stop();
                System.out.println("STOPPED");
          
             }
          }
          
      • Once you run this code, add one more message to the queue after you see the "STOPPING" log (do this in less than 15 sec).
      • Now you will see that the message you added in previous step has also started to be processed.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              nkg447 Nikunj Kumar Gupta
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: