Minimal Repro steps -
- create a rabbitmq docker container.
docker run -it -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management
- create a queue named 'demo' in the rabbitmq
- Add 2 messages into the queue.
- Run the following code -
import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class RabitMqMain { public static void main(String[] args) throws Exception { CamelContext camelContext = new DefaultCamelContext(); camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { String endpoint = "rabbitmq:?" + "prefetchCount=1&" + "autoAck=false&" + "automaticRecoveryEnabled=true&" + "skipExchangeDeclare=true&" + "synchronous=true&" + "skipQueueDeclare=true&" + "skipQueueBind=true&" + "portNumber=5672&" + "prefetchSize=0&" + "vhost=%2F&" + "hostname=localhost&" + "password=guest&" + "requestedHeartbeat=60&" + "topologyRecoveryEnabled=false&" + "prefetchEnabled=true&" + "connectionTimeout=60000&" + "networkRecoveryInterval=10000&" + "concurrentConsumers=3&" + "queue=demo&" + "username=guest"; from(endpoint).routeId("rabit") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String msg = exchange.toString(); System.out.println("started processing message - "+ msg); Thread.sleep(25 * 1000); // 25sec System.out.println("finished processing message - "+ msg); } }); } }); camelContext.start(); Thread.sleep(10 * 1000); System.out.println("STOPPING"); camelContext.stop(); System.out.println("STOPPED"); } }
- Once you run this code, add one more message to the queue after you see the "STOPPING" log (do this in less than 15 sec).
- Now you will see that the message you added in previous step has also started to be processed.
Issue Links
- links to