Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Done
-
1.6.2
-
None
-
None
Description
Rollback when using DTC will result in a duplicate message being received.
FailingTest
using System; using System.Configuration; using System.ServiceProcess; using System.Transactions; using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.Policies; using Apache.NMS.Util; using Common.Logging; using Common.Logging.Simple; using NUnit.Framework; namespace IntegrationTests.ApacheNms.Jira { [TestFixture] public class Dtc { [Test, Explicit("Bug in 1.6.2")] public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal() { SendMessageToQueue("1"); SendMessageToQueue("2"); var session = _connection.CreateSession(); var sessionTwo = _connection.CreateSession(); var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue)); var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue)); _log.Debug("Process message one and rollback"); var transaction = new TransactionScope(TransactionScopeOption.RequiresNew); var message = consumer.Receive(TimeSpan.FromSeconds(30)); producer.Send(message); transaction.Dispose(); _log.Debug("Processing message two and commit"); transaction = new TransactionScope(TransactionScopeOption.RequiresNew); message = consumer.Receive(TimeSpan.FromSeconds(30)); producer.Send(message); transaction.Complete(); transaction.Dispose(); _log.Debug("Processing message one replay and commit"); transaction = new TransactionScope(TransactionScopeOption.RequiresNew); message = consumer.Receive(TimeSpan.FromSeconds(30)); producer.Send(message); transaction.Complete(); transaction.Dispose(); _log.Debug("Process any repeats, there should be none"); transaction = new TransactionScope(TransactionScopeOption.RequiresNew); message = consumer.Receive(TimeSpan.FromSeconds(30)); if (message != null) producer.Send(message); transaction.Complete(); transaction.Dispose(); session.Dispose(); sessionTwo.Dispose(); Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0)); Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2)); } public static void TransactionCallback(object s, TransactionEventArgs e) { LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction {0}", e.Transaction.TransactionInformation.Status); } private int CountMessagesInQueue(string queue) { var count = 0; using (var session = _connection.CreateSession()) using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue))) using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { while (true) { var msg = consumerIn.Receive(TimeSpan.FromSeconds(2)); if (msg == null) break; count++; } } return count; } private void StartService(ServiceController service) { if (service.Status != ServiceControllerStatus.Running) service.Start(); service.WaitForStatus(ServiceControllerStatus.Running); _log.Debug("Started Broker"); } private void StopService(ServiceController service) { if (service.Status != ServiceControllerStatus.Stopped) service.Stop(); service.WaitForStatus(ServiceControllerStatus.Stopped); _log.Debug("Stopped Broker Broker"); } private void SendMessageToQueue(string message) { using (var session = _connection.CreateSession()) using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue))) using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { producer.Send(producer.CreateTextMessage(message)); scope.Complete(); } _log.Debug("Primed Input Queue"); } private void DeleteQueue(string queue) { using (var session = _connection.CreateSession()) { SessionUtil.DeleteDestination(session, queue); } } [SetUp] public void TestSetup() { LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); _log = LogManager.GetLogger(typeof(Dtc).Name); StartService(ActiveMqMaster); StopService(ActiveMqSlave); _factory = new NetTxConnectionFactory(ActiveMqConnectionString) { AcknowledgementMode = AcknowledgementMode.Transactional, RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, DispatchAsync = true, AsyncSend = false, PrefetchPolicy = new PrefetchPolicy { All = 5 }, }; _connection = _factory.CreateConnection(); _log.Debug("Starting connection"); _connection.Start(); _log.Debug("Connection established"); DeleteQueue(InQueue); DeleteQueue(OutQueue); //Tracer.Trace = new CommonLoggingTraceAdapter(); } [TearDown] public void TestTearDown() { _connection.Dispose(); } protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName); protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName); private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"]; private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString; private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"]; private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"]; private IConnection _connection; private const string InQueue = "integration-test-q-in"; private const string OutQueue = "integration-test-q-out"; private ILog _log; private NetTxConnectionFactory _factory; } }
Attachments
Attachments
Issue Links
- is superceded by
-
AMQNET-503 DTC Transactions do not respect rollbacks consistently leading to duplicated messages
- Resolved