Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-5854

Duplicate messages when failover is done during prepare phase of two phase commit.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.9.1, 5.10.2, 5.11.1
    • 5.13.0
    • Broker, JMS client
    • None

    Description

      Use case :
      With Spring DMLC, Read a jms message in a queue, produce a jms message in an output queue and write data in database.

      Problem description :

      Due to hight CPU usage, the inactity monitor closes connections between clients and broker while 16 messages were processed.

      2015-06-01 04:39:01,130 | WARN  | Transport Connection to: tcp://*** failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: tcp://*** | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ InactivityMonitor Worker
      

      15 messages are rolled back and redilevered to another consummer.

      In the log we got 15 warnings :

      ActiveMQMessageConsumer   |WARN |jmsContainer-173|rolling back transaction (XID:***) post failover recovery. 1 previously delivered message(s) not replayed to consumer: ***
      

      But one message is not rolled back (the transaction commit) and is also redileverd to another consummer. So it's processed twice by two different consummers (two inserts in database and two output JMS messages generated) and is not deduplicated.

      In the activeMq log we got the message :

      WARN  | Async error occurred:  | org.apache.activemq.broker.TransportConnection.Service | ActiveMQ Transport: tcp:///***
                             javax.jms.JMSException: Unmatched acknowledge: MessageAck {commandId = 6665, responseRequired = false, ackType = 2, consumerId = ID:***, firstMessageId = ID:***-50800-1433109620591-1:2:31356:1:1, lastMessageId = ID:***-50800-1433109620591-1:2:31356:1:1, destination = queue://***, transactionId = XID:[1096044365,globalId=47524f55505f3030303038736572766963657472616974656d656e7431363536373030343133,branchId=47524f55505f3030303038736572766963657472616974656d656e743137343737], messageCount = 1, poisonCause = null}; Could not find Message-ID ID:***-50800-1433109620591-1:2:31356:1:1 in dispatched-list (start of ack)
      

      For this duplicated message, the failover occur during prepare phase of commit :

      [{2015/06/01 04:39:50,322 |FailoverTransport         |WARN |jmsContainer-152|Transport (tcp://***) failed, reason:  , attempting to automatically reconnect}]
      org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: ***
                      at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:297)
                      at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
                      at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
                      at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
                      at org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:658)
                      at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
                      at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
                      at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1321)
                      at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1315)
                      at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1933)
                      at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2099)
                      at org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2094)
                      at org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:1083)
                      at org.apache.activemq.ActiveMQMessageConsumer$5.beforeEnd(ActiveMQMessageConsumer.java:1041)
                      at org.apache.activemq.TransactionContext.beforeEnd(TransactionContext.java:202)
                      at org.apache.activemq.TransactionContext.end(TransactionContext.java:409)
                      at com.atomikos.datasource.xa.XAResourceTransaction.suspend(XAResourceTransaction.java:457)
                      at com.atomikos.datasource.xa.XAResourceTransaction.prepare(XAResourceTransaction.java:608)
                      at com.atomikos.icatch.imp.PrepareMessage.send(PrepareMessage.java:61)
                      at com.atomikos.icatch.imp.PropagationMessage.submit(PropagationMessage.java:111)
                      at com.atomikos.icatch.imp.Propagator$PropagatorThread.run(Propagator.java:87)
                      at com.atomikos.icatch.imp.Propagator.submitPropagationMessage(Propagator.java:66)
                      at com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:173)
                      at com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:832)
                      at com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:1159)
                      at com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:92)
                      at com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:236)
                      at com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:498)
                      at com.atomikos.icatch.jta.UserTransactionImp.commit(UserTransactionImp.java:129)
                      at org.springframework.transaction.jta.JtaTransactionManager.doCommit(JtaTransactionManager.java:1011)
                      at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
                      at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
                      at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
                      at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1101)
                      at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:995)
                      at java.lang.Thread.run(Thread.java:761)
      

      Our analysis :

      We think that the duplicate message is caused by the failover during the prepare phase of the commit so we modify the source code to reproduce the case.

      Our modifications in config to produce failovers:
      broker : transport.useKeepAlive=false
      client : wireFormat.maxInactivityDuration=5000

      We add Thread.sleep in the source code of org.apache.activemq.ActiveMQMessageConsumer to force failover to be done exactly where we think it causes problems :

      org.apache.activemq.ActiveMQMessageConsumer#acknowledge()
                      
      
                          public void acknowledge() throws JMSException {
                                     clearDeliveredList();
                                     waitForRedeliveries();
                                     synchronized(deliveredMessages) {
      
                                         // BEGIN MODIFIED CODE
                                         LOG.warn("start sleeping 20 seconds to test failover");
                                         try{
                                             Thread.sleep(1000 * 20 );
                                         }catch (InterruptedException e){
                                             LOG.error("Exception :",e);
                                         }
                                         LOG.warn("end sleeping 20 seconds to test failover");
                                         // END MODIFIED CODE
      
                                         // Acknowledge all messages so far.
                                         MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                         if (ack == null)
                                             return; // no msgs
      
                                         if (session.getTransacted()) {
                                             rollbackOnFailedRecoveryRedelivery();
                                             session.doStartTransaction();
                                             ack.setTransactionId(session.getTransactionContext().getTransactionId());
                                         }
      
                                         pendingAck = null;
                                          session.sendAck(ack);
      
                                         // Adjust the counters
                                         deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
                                         additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
      
                                         if (!session.getTransacted()) {
                                             deliveredMessages.clear();
                                         }
                                     }
                          }
      

      With these changes on the configuration and the code, the problem is easily reproduced.

      We also try with transactedIndividualAck=true, and we add a Thread.sleep in the code :

      org.apache.activemq.ActiveMQMessageConsumer#registerSync()
                      
                          private void registerSync() throws JMSException {
                                     session.doStartTransaction();
                                     if (!synchronizationRegistered) {
                                         synchronizationRegistered = true;
                                         session.getTransactionContext().addSynchronization(new Synchronization() {
                                             @Override
                                             public void beforeEnd() throws Exception {
                                                 if (transactedIndividualAck) {
                                                     clearDeliveredList();
                                                     waitForRedeliveries();
                                                     synchronized(deliveredMessages) {
                                                         
                                                         // BEGIN MODIFIED CODE
                                                         LOG.warn("start sleeping 20 seconds to test failover");
                                                         try{
                                                             Thread.sleep(1000 * 20 );
                                                         }catch (InterruptedException e){
                                                             LOG.error("Exception :",e);
                                                         }
                                                         LOG.warn("end sleeping 20 seconds to test failover");
                                                         // END MODIFIED CODE                            
      
                                                         rollbackOnFailedRecoveryRedelivery();
                                                     }
                                                 } else {
                                                     acknowledge();
                                                 }
                                                 synchronizationRegistered = false;
                                             }
      
                                             @Override
                                             public void afterCommit() throws Exception {
                                                 commit();
                                                 synchronizationRegistered = false;
                                             }
      
                                             @Override
                                             public void afterRollback() throws Exception {
                                                 rollback();
                                                 synchronizationRegistered = false;
                                             }
                                         });
                                     }
                          }
      

      With these modifications, we still get duplicates messages.

      We think that the problem is that the statement synchronized(deliveredMessages) prevents the call of clearDeliveredList() by another ActiveMQConnection thread that clears messages in progress.
      By adding logs we observe that a thread is waiting deliveredMessages ‘s lock in clearDeliveredList() method.

      Question :

      We tried fixes described in https://issues.apache.org/jira/browse/AMQ-5068 and https://issues.apache.org/jira/browse/AMQ-3519 but it doesn’t help to solve our problem.
      Is there a workaround or a config parameter that can help to prevent this problem ?

      We are working on our side to find a correction. An option may be to force rolling back transaction if there is a failover during the prepare phase of commit in ConnectionStateTracker.restoreTransactions().

      Attachments

        1. ActiveMQMessageConsumer-5.10.2-ModifyWithThreadSleep.java
          67 kB
          hakim.acharifi
        2. ActiveMQMessageConsumer-5.11.1-ModifyWithThreadSleep.java
          67 kB
          hakim.acharifi
        3. amq5854.tar.gz
          3.06 MB
          hakim.acharifi

        Activity

          People

            gtully Gary Tully
            michael.s Michael
            Votes:
            1 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: