Uploaded image for project: 'ActiveMQ .Net'
  1. ActiveMQ .Net
  2. AMQNET-472

Synchronous DTC Consumer will experience duplicates on transaction rollback

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Done
    • 1.6.2
    • None
    • ActiveMQ
    • None

    Description

      Rollback when using DTC will result in a duplicate message being received.

      FailingTest
       
      using System;
      using System.Configuration;
      using System.ServiceProcess;
      using System.Transactions;
      using Apache.NMS;
      using Apache.NMS.ActiveMQ;
      using Apache.NMS.Policies;
      using Apache.NMS.Util;
      using Common.Logging;
      using Common.Logging.Simple;
      using NUnit.Framework;
      
      namespace IntegrationTests.ApacheNms.Jira
      {
          [TestFixture]
          public class Dtc
          {
              [Test, Explicit("Bug in 1.6.2")]
              public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal()
              {
                  SendMessageToQueue("1");
                  SendMessageToQueue("2");
                  var session = _connection.CreateSession();
                  var sessionTwo = _connection.CreateSession();
                  var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
                  var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue));
      
                  _log.Debug("Process message one and rollback");
                  var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
                  var message = consumer.Receive(TimeSpan.FromSeconds(30));
                  producer.Send(message);
                  transaction.Dispose();
      
                  _log.Debug("Processing message two and commit");
                  transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
                  message = consumer.Receive(TimeSpan.FromSeconds(30));
                  producer.Send(message);
                  transaction.Complete();
                  transaction.Dispose();
      
                  _log.Debug("Processing message one replay and commit");
                  transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
                  message = consumer.Receive(TimeSpan.FromSeconds(30));
                  producer.Send(message);
                  transaction.Complete();
                  transaction.Dispose();
      
                  _log.Debug("Process any repeats, there should be none");
                  transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
                  message = consumer.Receive(TimeSpan.FromSeconds(30));
                  if (message != null)
                      producer.Send(message);
                  transaction.Complete();
                  transaction.Dispose();
      
                  session.Dispose();
                  sessionTwo.Dispose();
      
                  Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
                  Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2));
              }
      
              public static void TransactionCallback(object s, TransactionEventArgs e)
              {
                  LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction  {0}", e.Transaction.TransactionInformation.Status);
              }
      
              private int CountMessagesInQueue(string queue)
              {
                  var count = 0;
                  using (var session = _connection.CreateSession())
                  using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
                  using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
                  {
                      while (true)
                      {
                          var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
                          if (msg == null)
                              break;
                          count++;
                      }
                  }
      
                  return count;
              }
      
              private void StartService(ServiceController service)
              {
                  if (service.Status != ServiceControllerStatus.Running)
                      service.Start();
                  service.WaitForStatus(ServiceControllerStatus.Running);
                  _log.Debug("Started Broker");
              }
      
              private void StopService(ServiceController service)
              {
                  if (service.Status != ServiceControllerStatus.Stopped)
                      service.Stop();
                  service.WaitForStatus(ServiceControllerStatus.Stopped);
                  _log.Debug("Stopped Broker Broker");
              }
      
              private void SendMessageToQueue(string message)
              {
                  using (var session = _connection.CreateSession())
                  using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
                  using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
                  {
                      producer.Send(producer.CreateTextMessage(message));
                      scope.Complete();
                  }
                  _log.Debug("Primed Input Queue");
              }
      
              private void DeleteQueue(string queue)
              {
                  using (var session = _connection.CreateSession())
                  {
                      SessionUtil.DeleteDestination(session, queue);
                  }
              }
      
              [SetUp]
              public void TestSetup()
              {
                  LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
                  _log = LogManager.GetLogger(typeof(Dtc).Name);
                  StartService(ActiveMqMaster);
                  StopService(ActiveMqSlave);
                  _factory = new NetTxConnectionFactory(ActiveMqConnectionString)
                  {
                      AcknowledgementMode = AcknowledgementMode.Transactional,
                      RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
                      DispatchAsync = true,
                      AsyncSend = false,
                      PrefetchPolicy = new PrefetchPolicy { All = 5 },
                  };
                  _connection = _factory.CreateConnection();
                  _log.Debug("Starting connection");
                  _connection.Start();
                  _log.Debug("Connection established");
      
                  DeleteQueue(InQueue);
                  DeleteQueue(OutQueue);
                  //Tracer.Trace = new CommonLoggingTraceAdapter();
              }
      
              [TearDown]
              public void TestTearDown()
              {
                  _connection.Dispose();
              }
      
              protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
              protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
              private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
              private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
              private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
              private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
              private IConnection _connection;
              private const string InQueue = "integration-test-q-in";
              private const string OutQueue = "integration-test-q-out";
              private ILog _log;
              private NetTxConnectionFactory _factory;
          }
      }
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            jgomes Jim Gomes
            imran99 Imran
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Issue deployment