Qpid
  1. Qpid
  2. QPID-4731

[Java Broker] unregistering a topic subscription with selector can silently prevent temporary queue deletion

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.6, 0.8, 0.10, 0.12, 0.14, 0.16, 0.18, 0.20
    • Fix Version/s: 0.22
    • Component/s: Java Broker
    • Labels:
      None

      Description

      When JMS selectors are used with topics, the attempt to deregister the topic subscriber's temporary queue can silently fail. This leaves the queue bound to the exchange, thereby allowing messages to continue being sent to it.

      This is shown in the following log excerpts from a v0.5 Broker:

      Initial set-up of topic with selector:

      2013-04-02 04:11:51,947 INFO  [pool-1-thread-22] rawloggers.Log4jMessageLogger (Log4jMessageLogger.java:69) - MESSAGE [con:199(xxxxx@/yyy.yyy.yyy.yyy:45309/ZZZ)/ch:1] [vh(/ZZZ)/qu(tmp_yyy.yyy.yyy.yyy45309_2)] QUE-1001 : Create : Owner: xxx AutoDelete Transient
      2013-04-02 04:11:51,949 INFO  [pool-1-thread-22] rawloggers.Log4jMessageLogger (Log4jMessageLogger.java:69) - MESSAGE [con:199(xxxxx@/yyy.yyy.yyy.yyy:45309/ZZZ)/ch:1] [vh(/ZZZ)/ex(direct/<<default>>)/qu(tmp_yyy.yyy.yyy.yyy45309_2)/rk(tmp_yyy.yyy.yyy.yyy45309_2)] BND-1001 : Create
      2013-04-02 04:11:51,957 INFO  [pool-1-thread-23] rawloggers.Log4jMessageLogger (Log4jMessageLogger.java:69) - MESSAGE [con:199(xxxxx@/yyy.yyy.yyy.yyy:45309/ZZZ)/ch:1] [vh(/ZZZ)/ex(topic/amq.topic)/qu(tmp_yyy.yyy.yyy.yyy45309_2)/rk(XXX_Topic)] BND-1001 : Create : Arguments : {x-filter-jms-selector=[LONG_STRING: XXX_GROUP='xxxxx']}
      2013-04-02 04:11:51,977 INFO  [pool-1-thread-15] rawloggers.Log4jMessageLogger (Log4jMessageLogger.java:69) - MESSAGE [con:199(xxxxx@/yyy.yyy.yyy.yyy:45309/ZZZ)/ch:1] [sub:4,117(vh(/ZZZ)/qu(tmp_yyy.yyy.yyy.yyy45309_2)] SUB-1001 : Create : Arguments : JMSSelector(XXX_GROUP='xxxxx')
      2013-04-02 14:14:02,663 INFO  [pool-1-thread-30] queue.AMQQueueMBean (AMQQueueMBean.java:336) - QUEUE_DEPTH_ALERT On Queue tmp_yyy.yyy.yyy.yyy45309_2 - 4137Kb : Maximum queue depth threshold (4136Kb) breached.
      

      Subsequent topic consumer close:

      2013-04-02 23:59:21,538 INFO  [pool-1-thread-23] rawloggers.Log4jMessageLogger (Log4jMessageLogger.java:69) - MESSAGE [con:199(xxxxx@/yyy.yyy.yyy.yyy:45309/ZZZ)] [sub:4,117(vh(/ZZZ)/qu(tmp_yyy.yyy.yyy.yyy45309_2)] SUB-1002 : Close
      2013-04-02 23:59:21,538 INFO  [pool-1-thread-23] rawloggers.Log4jMessageLogger (Log4jMessageLogger.java:69) - MESSAGE [con:199(xxxxx@/yyy.yyy.yyy.yyy:45309/ZZZ)] [vh(/ZZZ)/ex(topic/amq.topic)/qu(tmp_yyy.yyy.yyy.yyy45309_2)/rk(XXX_Topic)] BND-1002 : Deleted
      2013-04-02 23:59:21,538 INFO  [pool-1-thread-23] rawloggers.Log4jMessageLogger (Log4jMessageLogger.java:69) - MESSAGE [con:199(xxxxx@/yyy.yyy.yyy.yyy:45309/ZZZ)] [vh(/ZZZ)/ex(direct/<<default>>)/qu(tmp_yyy.yyy.yyy.yyy45309_2)/rk(tmp_yyy.yyy.yyy.yyy45309_2)] BND-1002 : Deleted
      2013-04-02 23:59:21,538 INFO  [pool-1-thread-23] rawloggers.Log4jMessageLogger (Log4jMessageLogger.java:69) - MESSAGE [con:199(xxxxx@/yyy.yyy.yyy.yyy:45309/ZZZ)] [vh(/ZZZ)/qu(tmp_yyy.yyy.yyy.yyy45309_2)] QUE-1002 : Deleted
      

      Queue depth alerts continue even though the queue should have been deleted. Note that the depth is actually increasing, indicating that message are still being enqueued.

      2013-04-03 05:22:34,463 INFO  [pool-1-thread-14] queue.AMQQueueMBean (AMQQueueMBean.java:336) - QUEUE_DEPTH_ALERT On Queue tmp_yyy.yyy.yyy.yyy45309_2 - 4136Kb : Maximum queue depth threshold (4136Kb) breached.
      2013-04-03 05:23:08,343 INFO  [pool-1-thread-11] queue.AMQQueueMBean (AMQQueueMBean.java:336) - QUEUE_DEPTH_ALERT On Queue tmp_yyy.yyy.yyy.yyy45309_2 - 8570Kb : Maximum queue depth threshold (4136Kb) breached.
      

        Activity

        Hide
        Philip Harvey added a comment -

        Heap dump analysis indicates that this is a defect relating to the caching of JMSSelectorFilter's by TopicExchange.

        The following code path is executed when closing a topic subscriber that has a JMS selector:

        removeFilteredQueue(AMQQueue, MessageFilter<RuntimeException>) : void - ..qpid.server.exchange.TopicExchange.TopicExchangeResult
                deregisterQueue(AMQShortString, AMQQueue, FieldTable) : void - ..qpid.server.exchange.TopicExchange
        

        The message filter passed to removeFilteredQueue is fetched from WeakHashMap TopicExchange._selectorCache and used for a look-up in TopicExchange.TopicExchangeResult._filteredQueues. I observed in a heap dump that the filters for the a particular selector string in the former and latter maps are different objects (presumably because of a previous weak reference eviction).

        This would have caused the look-up to fail in removeFilteredQueue because JMSSelectorFilter does not implement equals() and hashCode()

        Show
        Philip Harvey added a comment - Heap dump analysis indicates that this is a defect relating to the caching of JMSSelectorFilter's by TopicExchange. The following code path is executed when closing a topic subscriber that has a JMS selector: removeFilteredQueue(AMQQueue, MessageFilter<RuntimeException>) : void - ..qpid.server.exchange.TopicExchange.TopicExchangeResult deregisterQueue(AMQShortString, AMQQueue, FieldTable) : void - ..qpid.server.exchange.TopicExchange The message filter passed to removeFilteredQueue is fetched from WeakHashMap TopicExchange._selectorCache and used for a look-up in TopicExchange.TopicExchangeResult._filteredQueues. I observed in a heap dump that the filters for the a particular selector string in the former and latter maps are different objects (presumably because of a previous weak reference eviction). This would have caused the look-up to fail in removeFilteredQueue because JMSSelectorFilter does not implement equals() and hashCode()
        Hide
        Keith Wall added a comment -

        Patch applied at revs 1468815 and 1468816.

        Show
        Keith Wall added a comment - Patch applied at revs 1468815 and 1468816.
        Hide
        Keith Wall added a comment - - edited

        There were circumstances where this defect could cause a stack trace. For completeness I will describe this here.

        In a scenario where the Broker has no other queues defined, if, following the failed attempt to deregister the topic subscriber's temporary queue further messages are enqueued, the following stack trace results and the client's connection is abruptly closed.

        This occurs because ReferenceCountingExecutorService has shutdown the ThreadPoolExecutor (as no acquisitions remain). The erroneous enqueue tries to run a deliverAsync using the shutdown executor. This is rightfully rejected with the REE.

        There is no further work required to address this aspect. The changes made for this Jira and QPID-3997 (the belt and braces change to guard against enqueues to deleted queues) are sufficient.

        IoReceiver - /127.0.0.1:54939 2013-04-17 10:43:18,803 ERROR [qpid.server.protocol.AMQProtocolEngine] Unexpected exception when processing datablock
        java.util.concurrent.RejectedExecutionException
                at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
                at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
                at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
                at org.apache.qpid.server.queue.QueueRunner.execute(QueueRunner.java:118)
                at org.apache.qpid.server.queue.SimpleAMQQueue.deliverAsync(SimpleAMQQueue.java:1530)
                at org.apache.qpid.server.queue.SimpleAMQQueue.enqueue(SimpleAMQQueue.java:718)
                at org.apache.qpid.server.AMQChannel$MessageDeliveryAction.postCommit(AMQChannel.java:1190)
                at org.apache.qpid.server.AMQChannel$AsyncCommand.complete(AMQChannel.java:1583)
                at org.apache.qpid.server.AMQChannel.sync(AMQChannel.java:1553)
                at org.apache.qpid.server.AMQChannel.receivedComplete(AMQChannel.java:221)
                at org.apache.qpid.server.protocol.AMQProtocolEngine.receiveComplete(AMQProtocolEngine.java:267)
                at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:250)
                at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:81)
                at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:118)
                at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:37)
                at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161)
                at java.lang.Thread.run(Thread.java:662)
        
        
        Show
        Keith Wall added a comment - - edited There were circumstances where this defect could cause a stack trace. For completeness I will describe this here. In a scenario where the Broker has no other queues defined, if, following the failed attempt to deregister the topic subscriber's temporary queue further messages are enqueued, the following stack trace results and the client's connection is abruptly closed. This occurs because ReferenceCountingExecutorService has shutdown the ThreadPoolExecutor (as no acquisitions remain). The erroneous enqueue tries to run a deliverAsync using the shutdown executor. This is rightfully rejected with the REE. There is no further work required to address this aspect. The changes made for this Jira and QPID-3997 (the belt and braces change to guard against enqueues to deleted queues) are sufficient. IoReceiver - /127.0.0.1:54939 2013-04-17 10:43:18,803 ERROR [qpid.server.protocol.AMQProtocolEngine] Unexpected exception when processing datablock java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658) at org.apache.qpid.server.queue.QueueRunner.execute(QueueRunner.java:118) at org.apache.qpid.server.queue.SimpleAMQQueue.deliverAsync(SimpleAMQQueue.java:1530) at org.apache.qpid.server.queue.SimpleAMQQueue.enqueue(SimpleAMQQueue.java:718) at org.apache.qpid.server.AMQChannel$MessageDeliveryAction.postCommit(AMQChannel.java:1190) at org.apache.qpid.server.AMQChannel$AsyncCommand.complete(AMQChannel.java:1583) at org.apache.qpid.server.AMQChannel.sync(AMQChannel.java:1553) at org.apache.qpid.server.AMQChannel.receivedComplete(AMQChannel.java:221) at org.apache.qpid.server.protocol.AMQProtocolEngine.receiveComplete(AMQProtocolEngine.java:267) at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:250) at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:81) at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:118) at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:37) at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161) at java.lang.Thread.run(Thread.java:662)
        Hide
        Robbie Gemmell added a comment -

        r1468815 merged to 0.22 branch via: http://svn.apache.org/r1469909
        r1468816 merged to 0.22 branch via: http://svn.apache.org/r1469910

        Show
        Robbie Gemmell added a comment - r1468815 merged to 0.22 branch via: http://svn.apache.org/r1469909 r1468816 merged to 0.22 branch via: http://svn.apache.org/r1469910

          People

          • Assignee:
            Philip Harvey
            Reporter:
            Philip Harvey
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development