Description
DTC Consumer is forcibly closed if a transaction is in progress and the connection to the broker is interrupted. This behavior is different to non DTC consumers. This happens with a fail over connection specified which is not the correct behavior as you would expect the fail over feature to reestablish the connection on behalf of the client.
using System; using System.ServiceProcess; using System.Transactions; using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.Policies; using Apache.NMS.Util; using Common.Logging; using Common.Logging.Simple; using NUnit.Framework; namespace IntegrationTests.ApacheNms.Tests.Jira.DistributedTransaction { [TestFixture] public class BrokerRestartAndFailover { [Test, Explicit("After a broker restart the consumer is forcibly closed. This is not desirable as this behaviour is different to non dtc consumers.")] public void Should_rediliver_message_after_broker_restart() { SendMessageToQueue("1"); var session = _connection.CreateSession(AcknowledgementMode.Transactional); var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue)); var transaction = new TransactionScope(TransactionScopeOption.RequiresNew); consumer.Receive(TimeSpan.FromSeconds(1)); StopService(ActiveMqMaster); StartService(ActiveMqMaster); transaction.Complete(); transaction.Dispose(); //try a few times to drain the queue var messageRedilivered = 0; for (var i = 0; i < 2; i++) { transaction = new TransactionScope(TransactionScopeOption.RequiresNew); try { var message = consumer.Receive(TimeSpan.FromSeconds(1)); transaction.Complete(); if (message != null) messageRedilivered++; } catch (Exception ex) { LogManager.GetCurrentClassLogger().Error(ex); } finally { transaction.Dispose(); } } Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0)); Assert.That(messageRedilivered, Is.EqualTo(1)); } public int CountMessagesInQueue(string queue) { var factory = new ConnectionFactory(ConnectionString) { AcknowledgementMode = AcknowledgementMode.Transactional }; var count = 0; using (var connection = factory.CreateConnection()) using (var session = connection.CreateSession()) using (var consumer = session.CreateConsumer(SessionUtil.GetQueue(session, queue))) { connection.Start(); while (true) { var message = consumer.Receive(TimeSpan.FromSeconds(1)); if (message == null) break; count++; } } return count; } private void DeleteQueue(string queue) { using (var session = _connection.CreateSession()) { SessionUtil.DeleteDestination(session, queue); } } private void SendMessageToQueue(string message) { using (var session = _connection.CreateSession()) using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue))) using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { producer.Send(producer.CreateTextMessage(message)); scope.Complete(); } Log.Debug("Primed Input Queue"); } private void StartService(ServiceController service) { if(service.Status != ServiceControllerStatus.Running) service.Start(); service.WaitForStatus(ServiceControllerStatus.Running); } private void StopService(ServiceController service) { if (service.Status != ServiceControllerStatus.Stopped) service.Stop(); service.WaitForStatus(ServiceControllerStatus.Stopped); } [SetUp] public void TestSetup() { LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); StartService(ActiveMqMaster); StopService(ActiveMqSlave); _connectionFactory = new NetTxConnectionFactory(ConnectionString) { AcknowledgementMode = AcknowledgementMode.Transactional, RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 10, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, DispatchAsync = true, AsyncSend = false, PrefetchPolicy = new PrefetchPolicy { All = 10 }, }; _connection = _connectionFactory.CreateConnection(); _connection.ConnectionInterruptedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection interrupted"); _connection.ConnectionResumedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection resumed"); _connection.ExceptionListener += ex => LogManager.GetCurrentClassLogger().ErrorFormat("Connection exception: '{0}'", ex.ToString()); _connection.Start(); DeleteQueue(InQueue); DeleteQueue(OutQueue); } [TearDown] public void TestTeardown() { StartService(ActiveMqMaster); StopService(ActiveMqSlave); } private const string ConnectionString = @"failover:(tcp://localhost:61616)"; protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ"); protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave"); private IConnection _connection; private const string InQueue = "in-q"; private const string OutQueue = "out-q"; private static readonly ILog Log = LogManager.GetLogger(typeof(BrokerRestartAndFailover).Name); private NetTxConnectionFactory _connectionFactory; } }