Details
Description
When TimeToLive expires while a listener is in a redeliver loop, the redelivery never stops. The following unit tests show this. The first unit test uses Receive and it works fine. The second uses a listener and it fails.
I added these tests to AMQRedeliveryPolicyTests
[Test] public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLive() { using(Connection connection = (Connection) CreateConnection()) { IRedeliveryPolicy policy = connection.RedeliveryPolicy; policy.MaximumRedeliveries = -1; policy.InitialRedeliveryDelay = 500; policy.UseExponentialBackOff = false; connection.Start(); ISession session = connection.CreateSession(AcknowledgementMode.Transactional); IDestination destination = session.CreateTemporaryQueue(); IMessageProducer producer = session.CreateProducer(destination); IMessageConsumer consumer = session.CreateConsumer(destination); // Send the messages ITextMessage textMessage = session.CreateTextMessage("1st"); textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0); producer.Send(textMessage); session.Commit(); ITextMessage m; m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); Assert.IsNotNull(m); Assert.AreEqual("1st", m.Text); session.Rollback(); // No delay on first Rollback.. m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); Assert.IsNotNull(m); session.Rollback(); // Show subsequent re-delivery delay is incrementing. m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); Assert.IsNull(m); m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); Assert.IsNotNull(m); Assert.AreEqual("1st", m.Text); session.Rollback(); // The message gets redelivered after 500 ms every time since // we are not using exponential backoff. m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); Assert.IsNull(m); } } [Test] public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLiveCallback() { using(Connection connection = (Connection) CreateConnection()) { IRedeliveryPolicy policy = connection.RedeliveryPolicy; policy.MaximumRedeliveries = -1; policy.InitialRedeliveryDelay = 500; policy.UseExponentialBackOff = false; connection.Start(); ISession session = connection.CreateSession(AcknowledgementMode.Transactional); IDestination destination = session.CreateTemporaryQueue(); IMessageProducer producer = session.CreateProducer(destination); IMessageConsumer consumer = session.CreateConsumer(destination); CallbackClass cc = new CallbackClass(session); consumer.Listener += new MessageListener(cc.consumer_Listener); // Send the messages ITextMessage textMessage = session.CreateTextMessage("1st"); textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0); producer.Send(textMessage); session.Commit(); // sends normal message, then immediate retry, then retry after 500 ms, then expire. Thread.Sleep(2000); Assert.AreEqual(3, cc.numReceived); } } class CallbackClass { private ISession session; public int numReceived = 0; public CallbackClass(ISession session) { this.session = session; } public void consumer_Listener(IMessage message) { numReceived++; ITextMessage m = message as ITextMessage; Assert.IsNotNull(m); Assert.AreEqual("1st", m.Text); session.Rollback(); } }