Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-5212

Deadlock with duplicate detection and dlq processing in kahadb

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 5.9.0, 5.9.1
    • Fix Version/s: 5.10.1, 5.11.0
    • Component/s: Broker, Message Store
    • Labels:
      None

      Description

      Contention between access to the destination map from store duplicate processing and dlq destination creation from the cursor. But could be between any destination creation/deletion.
      From the test case:

      Found one Java-level deadlock:
      =============================
      "ActiveMQ Transport: tcp://27.0.0.1:60895@60852":
        waiting for ownable synchronizer 7df695ea8, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
        which is held by "ActiveMQ Transport: tcp://27.0.0.1:60894@60852"
      "ActiveMQ Transport: tcp://27.0.0.1:60894@60852":
        waiting for ownable synchronizer 7df605fd8, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
        which is held by "ConcurrentQueueStoreAndDispatch"
      "ConcurrentQueueStoreAndDispatch":
        waiting for ownable synchronizer 7df695ea8, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
        which is held by "ActiveMQ Transport: tcp://27.0.0.1:60894@60852"
      
      Java stack information for the threads listed above:
      ===================================================
      "ActiveMQ Transport: tcp://27.0.0.1:60895@60852":
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <7df695ea8> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
      	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:890)
      	at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:129)
      	at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:334)
      	at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:172)
      	at org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:184)
      	at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:172)
      	at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:172)
      	at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:177)
      	at org.apache.activemq.broker.region.RegionBroker.addProducer(RegionBroker.java:384)
      	at org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:107)
      	at org.apache.activemq.advisory.AdvisoryBroker.addProducer(AdvisoryBroker.java:172)
      	at org.apache.activemq.broker.CompositeDestinationBroker.addProducer(CompositeDestinationBroker.java:56)
      	at org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:107)
      	at org.apache.activemq.broker.MutableBrokerFilter.addProducer(MutableBrokerFilter.java:112)
      	at org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:565)
      	at org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:108)
      	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
      	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
      	at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
      	at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
      	at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
      	at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
      	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
      	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
      	at java.lang.Thread.run(Thread.java:695)
      "ActiveMQ Transport: tcp://27.0.0.1:60894@60852":
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <7df605fd8> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
      	at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:890)
      	at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore.getMessageCount(KahaDBStore.java:490)
      	at org.apache.activemq.store.ProxyMessageStore.getMessageCount(ProxyMessageStore.java:101)
      	at org.apache.activemq.broker.region.Queue.initialize(Queue.java:389)
      	at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:87)
      	at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:546)
      	at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:135)
      	at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:334)
      	at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:172)
      	at org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:184)
      	at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:172)
      	at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:172)
      	at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:177)
      	at org.apache.activemq.broker.region.RegionBroker.addProducer(RegionBroker.java:384)
      	at org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:107)
      	at org.apache.activemq.advisory.AdvisoryBroker.addProducer(AdvisoryBroker.java:172)
      	at org.apache.activemq.broker.CompositeDestinationBroker.addProducer(CompositeDestinationBroker.java:56)
      	at org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:107)
      	at org.apache.activemq.broker.MutableBrokerFilter.addProducer(MutableBrokerFilter.java:112)
      	at org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:565)
      	at org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:108)
      	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
      	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
      	at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
      	at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
      	at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
      	at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
      	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
      	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
      	at java.lang.Thread.run(Thread.java:695)
      "ConcurrentQueueStoreAndDispatch":
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <7df695ea8> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
      	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:941)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1261)
      	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:677)
      	at org.apache.activemq.broker.region.AbstractRegion.getDestinations(AbstractRegion.java:243)
      	at org.apache.activemq.broker.region.RegionBroker.getDestinations(RegionBroker.java:149)
      	at org.apache.activemq.store.kahadb.KahaDBStore.rollbackStatsOnDuplicate(KahaDBStore.java:286)
      	at org.apache.activemq.store.kahadb.MessageDatabase.updateIndex(MessageDatabase.java:1316)
      	at org.apache.activemq.store.kahadb.MessageDatabase$11.execute(MessageDatabase.java:1140)
      	at org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779)
      	at org.apache.activemq.store.kahadb.MessageDatabase.process(MessageDatabase.java:1137)
      	at org.apache.activemq.store.kahadb.MessageDatabase$10.visit(MessageDatabase.java:1074)
      	at org.apache.activemq.store.kahadb.data.KahaAddMessageCommand.visit(KahaAddMessageCommand.java:241)
      	at org.apache.activemq.store.kahadb.MessageDatabase.process(MessageDatabase.java:1071)
      	at org.apache.activemq.store.kahadb.MessageDatabase.store(MessageDatabase.java:978)
      	at org.apache.activemq.store.kahadb.MessageDatabase.store(MessageDatabase.java:958)
      	at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore.addMessage(KahaDBStore.java:426)
      	at org.apache.activemq.store.kahadb.KahaDBStore$StoreQueueTask.run(KahaDBStore.java:1281)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
      	at java.lang.Thread.run(Thread.java:695)
      
      Found 1 deadlock.

        Issue Links

          Activity

          Hide
          gtully Gary Tully added a comment -
          Show
          gtully Gary Tully added a comment - fix and test in further refinement in http://git-wip-us.apache.org/repos/asf/activemq/commit/27b3a7c3

            People

            • Assignee:
              gtully Gary Tully
              Reporter:
              gtully Gary Tully
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development