Uploaded image for project: 'ActiveMQ .Net'
  1. ActiveMQ .Net
  2. AMQNET-474

DTC Consumer is forcibly closed if a transaction is in progress and connection to the broker is interrupted

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.6.2
    • 1.6.4, 1.7.0
    • ActiveMQ
    • None

    Description

      DTC Consumer is forcibly closed if a transaction is in progress and the connection to the broker is interrupted. This behavior is different to non DTC consumers. This happens with a fail over connection specified which is not the correct behavior as you would expect the fail over feature to reestablish the connection on behalf of the client.

      using System;
      using System.ServiceProcess;
      using System.Transactions;
      using Apache.NMS;
      using Apache.NMS.ActiveMQ;
      using Apache.NMS.Policies;
      using Apache.NMS.Util;
      using Common.Logging;
      using Common.Logging.Simple;
      using NUnit.Framework;
      
      namespace IntegrationTests.ApacheNms.Tests.Jira.DistributedTransaction
      {
          [TestFixture]
          public class BrokerRestartAndFailover
          {
              [Test, Explicit("After a broker restart the consumer is forcibly closed. This is not desirable as this behaviour is different to non dtc consumers.")]
              public void Should_rediliver_message_after_broker_restart()
              {
                  SendMessageToQueue("1");
      
                  var session = _connection.CreateSession(AcknowledgementMode.Transactional);
                  var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
      
                  var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
                  consumer.Receive(TimeSpan.FromSeconds(1));
      
                  StopService(ActiveMqMaster);
                  StartService(ActiveMqMaster);
                  transaction.Complete();
                  transaction.Dispose();
      
                  //try a few times to drain the queue
                  var messageRedilivered = 0;
                  for (var i = 0; i < 2; i++)
                  {
                      transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
                      try
                      {
                          var message = consumer.Receive(TimeSpan.FromSeconds(1));
                          transaction.Complete();
                          if (message != null)
                              messageRedilivered++;
                      }
                      catch (Exception ex)
                      {
                          LogManager.GetCurrentClassLogger().Error(ex);
                      }
                      finally
                      {
                          transaction.Dispose();
                      }
                  }
      
                  Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
                  Assert.That(messageRedilivered, Is.EqualTo(1));
              }
      
              public int CountMessagesInQueue(string queue)
              {
                  var factory = new ConnectionFactory(ConnectionString)
                  {
                      AcknowledgementMode = AcknowledgementMode.Transactional
                  };
                  
                  var count = 0;
                  using (var connection = factory.CreateConnection())
                  using (var session = connection.CreateSession())
                  using (var consumer = session.CreateConsumer(SessionUtil.GetQueue(session, queue)))
                  {
                      connection.Start();
                      while (true)
                      {
                          var message = consumer.Receive(TimeSpan.FromSeconds(1));
                          if (message == null)
                              break;
                          count++;
                      }
                  }
      
                  return count;
              }
      
              private void DeleteQueue(string queue)
              {
                  using (var session = _connection.CreateSession())
                  {
                      SessionUtil.DeleteDestination(session, queue);
                  }
              }
      
              private void SendMessageToQueue(string message)
              {
                  using (var session = _connection.CreateSession())
                  using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
                  using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
                  {
                      producer.Send(producer.CreateTextMessage(message));
                      scope.Complete();
                  }
                  Log.Debug("Primed Input Queue");
              }
      
              private void StartService(ServiceController service)
              {
                  if(service.Status != ServiceControllerStatus.Running)
                      service.Start();
                  service.WaitForStatus(ServiceControllerStatus.Running);
              }
      
              private void StopService(ServiceController service)
              {
                  if (service.Status != ServiceControllerStatus.Stopped)
                      service.Stop();
                  service.WaitForStatus(ServiceControllerStatus.Stopped);
              }
      
              [SetUp]
              public void TestSetup()
              {
                  LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
                  StartService(ActiveMqMaster);
                  StopService(ActiveMqSlave);
      
                  _connectionFactory = new NetTxConnectionFactory(ConnectionString)
                  {
                      AcknowledgementMode = AcknowledgementMode.Transactional,
                      RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 10, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
                      DispatchAsync = true,
                      AsyncSend = false,
                      PrefetchPolicy = new PrefetchPolicy { All = 10 },
                  };
      
                  _connection = _connectionFactory.CreateConnection();
                  _connection.ConnectionInterruptedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection interrupted");
                  _connection.ConnectionResumedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection resumed");
                  _connection.ExceptionListener += ex => LogManager.GetCurrentClassLogger().ErrorFormat("Connection exception: '{0}'", ex.ToString());
                  _connection.Start();
      
                  DeleteQueue(InQueue);
                  DeleteQueue(OutQueue);
              }
      
              [TearDown]
              public void TestTeardown()
              {
                  StartService(ActiveMqMaster);
                  StopService(ActiveMqSlave);
              }
      
              private const string ConnectionString = @"failover:(tcp://localhost:61616)";
              protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
              protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave");
              private IConnection _connection;
              private const string InQueue = "in-q";
              private const string OutQueue = "out-q";
              private static readonly ILog Log = LogManager.GetLogger(typeof(BrokerRestartAndFailover).Name);
              private NetTxConnectionFactory _connectionFactory;
          }
      }
      
      

      Attachments

        Activity

          People

            tabish Timothy A. Bish
            imran99 Imran
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: