Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-6813

Consumer tasks may run on the Broker thread leading to deadlock

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • qpid-java-6.0
    • qpid-java-6.0
    • Broker-J
    • None

    Description

      Since QPID-6597, a consumer's tasks may run on the Broker's task executor rather than the Virtualhost's as was intended. This may lead to deadlock.

      The problem occurs because ACO discovers its task executor from its parents. If a ACO has multiple parents, the chosen parent is non-deterministic. This didn't used to matter because Consumers parents Queue and Session both shared a Virtualhost ancestor, so the Virtualhost task executor was always selected. However, QPID-6597, re-homed Connection (the parent of Session) to be a child of Broker. Since that point the executor used has been non-deterministic, Consumers may end up with a Broker or Virtualhost task executor.

      Testing with very large numbers of connections exposed a deadlock between the Broker and Virtualhost configuration thread.

      protected AbstractConfiguredObject(final Map<Class<? extends ConfiguredObject>, ConfiguredObject<?>> parents,
                                             Map<String, Object> attributes)
          {
              this(parents, attributes, parents.values().iterator().next().getChildExecutor());
          }
      

      (The VH thread running an addConsumer blocks opening a Consumer using the Broker thread, meanwhile the Broker thread running a consumer close tries to use the VH executor to remove a queue. Deadlock.)

      "VirtualHostNode-default-Config" prio=10 tid=0x00007f48701a0000 nid=0xd714 waiting on condition [0x00007f48ac728000]
         java.lang.Thread.State: WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000000e7ecd4a8> (a com.google.common.util.concurrent.AbstractFuture$Sync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
              at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:285)
              at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
              at org.apache.qpid.server.model.AbstractConfiguredObject.doSync(AbstractConfiguredObject.java:1937)
              at org.apache.qpid.server.model.AbstractConfiguredObject.open(AbstractConfiguredObject.java:452)
              at org.apache.qpid.server.queue.QueueConsumerImpl.<init>(QueueConsumerImpl.java:136)
              at org.apache.qpid.server.queue.AbstractQueue.addConsumerInternal(AbstractQueue.java:864)
              at org.apache.qpid.server.queue.AbstractQueue.access$200(AbstractQueue.java:119)
              at org.apache.qpid.server.queue.AbstractQueue$6.execute(AbstractQueue.java:695)
              at org.apache.qpid.server.queue.AbstractQueue$6.execute(AbstractQueue.java:691)
              at org.apache.qpid.server.configuration.updater.TaskExecutorImpl$TaskLoggingWrapper.execute(TaskExecutorImpl.java:270)
              at org.apache.qpid.server.configuration.updater.TaskExecutorImpl$CallableWrapper$1.run(TaskExecutorImpl.java:342)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:356)
              at org.apache.qpid.server.configuration.updater.TaskExecutorImpl$CallableWrapper.call(TaskExecutorImpl.java:335)
              at java.util.concurrent.FutureTask.run(FutureTask.java:262)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
       
         Locked ownable synchronizers:
              - <0x00000000e5953fb0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
       
      "Broker-Config" prio=10 tid=0x00007f49c8f67000 nid=0xd70d waiting on condition [0x00007f48ac95e000]
         java.lang.Thread.State: WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x00000000ea9a2940> (a com.google.common.util.concurrent.AbstractFuture$Sync)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
              at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:285)
              at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
              at org.apache.qpid.server.model.AbstractConfiguredObject.doSync(AbstractConfiguredObject.java:1937)
              at org.apache.qpid.server.virtualhost.AbstractVirtualHost.removeQueue(AbstractVirtualHost.java:734)
              at org.apache.qpid.server.queue.AbstractQueue$7.run(AbstractQueue.java:971)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:356)
              at org.apache.qpid.server.queue.AbstractQueue.unregisterConsumer(AbstractQueue.java:966)
              at org.apache.qpid.server.queue.QueueConsumerImpl.onClose(QueueConsumerImpl.java:282)
              at org.apache.qpid.server.model.AbstractConfiguredObject$5$1.run(AbstractConfiguredObject.java:666)
              at org.apache.qpid.server.model.AbstractConfiguredObject$19.onSuccess(AbstractConfiguredObject.java:2145)
              at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319)
              at org.apache.qpid.server.configuration.updater.TaskExecutorImpl$ImmediateIfSameThreadExecutor.execute(TaskExecutorImpl.java:422)
              at org.apache.qpid.server.configuration.updater.TaskExecutorImpl.execute(TaskExecutorImpl.java:172)
              at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
              at com.google.common.util.concurrent.ExecutionList.add(ExecutionList.java:101)
              at com.google.common.util.concurrent.AbstractFuture.addListener(AbstractFuture.java:170)
              at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1322)
              at org.apache.qpid.server.model.AbstractConfiguredObject.doAfter(AbstractConfiguredObject.java:2138)
              at org.apache.qpid.server.model.AbstractConfiguredObject$ChainedSettableFuture.then(AbstractConfiguredObject.java:2200)
              at org.apache.qpid.server.model.AbstractConfiguredObject$5.execute(AbstractConfiguredObject.java:654)
              at org.apache.qpid.server.model.AbstractConfiguredObject$5.execute(AbstractConfiguredObject.java:645)
              at org.apache.qpid.server.model.AbstractConfiguredObject$2.execute(AbstractConfiguredObject.java:520)
              at org.apache.qpid.server.model.AbstractConfiguredObject$2.execute(AbstractConfiguredObject.java:513)
              at org.apache.qpid.server.configuration.updater.TaskExecutorImpl$TaskLoggingWrapper.execute(TaskExecutorImpl.java:270)
              at org.apache.qpid.server.configuration.updater.TaskExecutorImpl$CallableWrapper$1.run(TaskExecutorImpl.java:342)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:356)
              at org.apache.qpid.server.configuration.updater.TaskExecutorImpl$CallableWrapper.call(TaskExecutorImpl.java:335)
              at java.util.concurrent.FutureTask.run(FutureTask.java:262)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
       
         Locked ownable synchronizers:
              - <0x00000000e59603c8> (a java.util.concurrent.ThreadPoolExecutor$Worker)
              - <0x00000000ea413e18> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
      

      Attachments

        Activity

          People

            rgodfrey Robert Godfrey
            kwall Keith Wall
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: