ActiveMQ
  1. ActiveMQ
  2. AMQ-2868

NegativeQueueTest and JDBC variant - intermittent failure - missing message when cache exhausted

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 5.4.0
    • Fix Version/s: 5.6.0
    • Component/s: None
    • Labels:
      None
    • Regression:
      Regression

      Description

      Test fails trying to consume all messages and misses one message on occasion.
      Problem, concurrent transaction completion leaves messages out of order in the cursor w.r.t to the store. When the cursor is exhausted, the cache memory limit is reached and subsequent messages are not cached so the next message needs to be recovered from the store, the point at which we start reading from the store is important. If, at the point at which the cache is full, the cursor is out of order, it can skip a message in the store.
      Previously, the entire store was replayed, as if it was never cached and these messages are suppressed by the cursor as duplicates, but there is a size limit and producers spread limit on the duplicate suppression that means messages can avoid duplicate detection. Also, in the case of consumer transactions that rollback, duplicate delivery is required so out of order messages may arrive on a subsequent dispatch.
      setBatch, ensuring that messages are replayed form the correct point in the store is important to give ordering guarantees with failover and memory limits, so synchronization of the store and cursor w.r.t concurrent transactions is also needed in support of setBatch.

      Store commit and the after completions that update the cursor need to be serialized for a destination to keep make this totally deterministic.
      recap, memory limits such that a cache will be filled, concurrent send transaction completion so that store updates and cursor updated can overlap with each other and with cache invalidation. setBatch trying to reduce the replay of messages.

      Outstanding question:

      • do we make the use of setBatch and transaction sync with store and cursor configurable. If setBatch is off, don't sync. Then at the mercy of consumer transactions and duplicate suppression in the event of failover. An alternative is to make the sync conditional on the use of the cache for a destination. Very reliable but slow; slow is a very relative determination!
        Also, may need to be disabled for all destinations as a transaction can span many destinations.

        Issue Links

          Activity

          Hide
          Gary Tully added a comment -

          Added the sync that serialises transaction updated to the store and cursor such that the cursor is always in order w.r.t to the store and setBatch can revert to just where it needs to so that there are no duplicates replayed to the cursor.
          r985155
          This is the bullet proof approach.
          A variant could acquire per destination locks rather than the transaction store lock that is currently used. This would allow more per destination concurrency.

          Show
          Gary Tully added a comment - Added the sync that serialises transaction updated to the store and cursor such that the cursor is always in order w.r.t to the store and setBatch can revert to just where it needs to so that there are no duplicates replayed to the cursor. r985155 This is the bullet proof approach. A variant could acquire per destination locks rather than the transaction store lock that is currently used. This would allow more per destination concurrency.
          Hide
          Gary Tully added a comment -

          sticking with the shared lock for the time being.

          Show
          Gary Tully added a comment - sticking with the shared lock for the time being.
          Hide
          Gary Tully added a comment -

          This sync is only needed for sends, but it impacts acks so it needs a revisit. it is too heavy handed.
          It breaks concurrent consumption on a destination.

          Show
          Gary Tully added a comment - This sync is only needed for sends, but it impacts acks so it needs a revisit. it is too heavy handed. It breaks concurrent consumption on a destination.
          Hide
          Gary Tully added a comment -

          think setBatch needs to take the sync hit as it is the rare event. It may need to wait till store and cursor are in sync.

          Show
          Gary Tully added a comment - think setBatch needs to take the sync hit as it is the rare event. It may need to wait till store and cursor are in sync.
          Hide
          Gary Tully added a comment -

          Removed the sync and replaced cursor updates with a stack so that they occur in the same order as the index update, but without the need for the index lock or serial commit execution. Concurrent transactions can now wait on a batch write to the journal for their commit record.

          http://svn.apache.org/viewvc?rev=1163613&view=rev

          Show
          Gary Tully added a comment - Removed the sync and replaced cursor updates with a stack so that they occur in the same order as the index update, but without the need for the index lock or serial commit execution. Concurrent transactions can now wait on a batch write to the journal for their commit record. http://svn.apache.org/viewvc?rev=1163613&view=rev
          Hide
          Gary Tully added a comment -

          A failed Ack transaction, one that results in UnMatched ack exceptions for example, if this interleaves with non empty transactions, it can cause the non empty transaction to block. Thread stack

          ActiveMQ Transport: tcp:///xx:34724" daemon prio=10 tid=0x00002aaab8187000 nid=0xfd9 waiting on condition [0x000000005a13d000]
             java.lang.Thread.State: WAITING (parking)
          	at sun.misc.Unsafe.park(Native Method)
          	- parking to wait for  <0x0000000794062330> (a java.util.concurrent.FutureTask$Sync)
          	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
          	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
          	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
          	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
          	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:218)
          	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
          	at org.apache.activemq.transaction.Transaction.waitPostCommitDone(Transaction.java:146)
          	at org.apache.activemq.transaction.LocalTransaction.commit(LocalTransaction.java:73)
          	at org.apache.activemq.broker.TransactionBroker.commitTransaction(TransactionBroker.java:252)
          	at org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(MutableBrokerFilter.java:103)
          	at org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransportConnection.java:432)
          	at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:100)
          	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318)
          	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181)
          	at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
          	at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
          	at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)

          The problem is that the empty transaction completion does not get pushed, it pops the valid transaction completion and runs it. The valid transaction pops nothing and tries again to run the after completion. The empty transaction hangs awaiting completion.
          The fix is to run the after completion once we determine that there is no transaction work to be done.

          This is a tough one to unit test due the the fact that two commit threads need to cross over in the broker.

          Show
          Gary Tully added a comment - A failed Ack transaction, one that results in UnMatched ack exceptions for example, if this interleaves with non empty transactions, it can cause the non empty transaction to block. Thread stack ActiveMQ Transport: tcp: ///xx:34724" daemon prio=10 tid=0x00002aaab8187000 nid=0xfd9 waiting on condition [0x000000005a13d000] java.lang. Thread .State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000794062330> (a java.util.concurrent.FutureTask$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:218) at java.util.concurrent.FutureTask.get(FutureTask.java:83) at org.apache.activemq.transaction.Transaction.waitPostCommitDone(Transaction.java:146) at org.apache.activemq.transaction.LocalTransaction.commit(LocalTransaction.java:73) at org.apache.activemq.broker.TransactionBroker.commitTransaction(TransactionBroker.java:252) at org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(MutableBrokerFilter.java:103) at org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransportConnection.java:432) at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:100) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) The problem is that the empty transaction completion does not get pushed, it pops the valid transaction completion and runs it. The valid transaction pops nothing and tries again to run the after completion. The empty transaction hangs awaiting completion. The fix is to run the after completion once we determine that there is no transaction work to be done. This is a tough one to unit test due the the fact that two commit threads need to cross over in the broker.
          Show
          Gary Tully added a comment - fix in http://svn.apache.org/viewvc?rev=1207693&view=rev
          Hide
          Dejan Bosanac added a comment -

          Removed synchronisation in MemoryTransactionStore.commit() with 1345023. We don't need it anymore as JDBC message store doesn't have a problem with order or writes and message sequence ids.

          Show
          Dejan Bosanac added a comment - Removed synchronisation in MemoryTransactionStore.commit() with 1345023. We don't need it anymore as JDBC message store doesn't have a problem with order or writes and message sequence ids.

            People

            • Assignee:
              Gary Tully
              Reporter:
              Gary Tully
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development