Qpid
  1. Qpid
  2. QPID-3895

exception thrown when closing connections with sessions blocked by producer side flow control

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.6, 0.7, 0.8, 0.9, 0.10, 0.11, 0.12, 0.13, 0.14, 0.15, 0.16
    • Fix Version/s: 0.17
    • Component/s: Java Broker
    • Labels:
      None

      Description

      A reference to AMQChannel object is not removed from SimpleAMQQueue#_blockedChannels map on closing connections with producer side flow control when flow is blocked.

      This can result in memory leaks. Also, the subsequent invocations of operations like SimpleAMQQueue#delete can results in failures similar to the one below

      2012-03-07 19:22:37,132 ERROR [IoReceiver - /169.82.100.202:51439] (LocalTransaction.java:265) - Failed to commit transaction
      org.apache.qpid.transport.SenderClosedException: sender is closed
              at org.apache.qpid.transport.network.io.IoSender.send(IoSender.java:114)
              at org.apache.qpid.transport.network.io.IoSender.send(IoSender.java:40)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.writeFrame(AMQProtocolEngine.java:558)
              at org.apache.qpid.server.AMQChannel.flow(AMQChannel.java:1403)
              at org.apache.qpid.server.AMQChannel.unblock(AMQChannel.java:1384)
              at org.apache.qpid.server.queue.SimpleAMQQueue.checkCapacity(SimpleAMQQueue.java:1696)
              at org.apache.qpid.server.queue.SimpleAMQQueue.dequeue(SimpleAMQQueue.java:897)
              at org.apache.qpid.server.queue.QueueEntryImpl.dequeue(QueueEntryImpl.java:376)
              at org.apache.qpid.server.queue.QueueEntryImpl.discard(QueueEntryImpl.java:407)
              at org.apache.qpid.server.queue.SimpleAMQQueue$13.postCommit(SimpleAMQQueue.java:1608)
              at org.apache.qpid.server.txn.LocalTransaction.commit(LocalTransaction.java:260)
              at org.apache.qpid.server.txn.LocalTransaction.commit(LocalTransaction.java:241)
              at org.apache.qpid.server.queue.SimpleAMQQueue.delete(SimpleAMQQueue.java:1619)
              at org.apache.qpid.server.handler.QueueDeleteHandler.methodReceived(QueueDeleteHandler.java:114)
              at org.apache.qpid.server.handler.ServerMethodDispatcherImpl.dispatchQueueDelete(ServerMethodDispatcherImpl.java:518)
              at org.apache.qpid.framing.amqp_0_9.QueueDeleteBodyImpl.execute(QueueDeleteBodyImpl.java:153)
              at org.apache.qpid.server.state.AMQStateManager.methodReceived(AMQStateManager.java:109)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.methodFrameReceived(AMQProtocolEngine.java:454)
              at org.apache.qpid.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:97)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.frameReceived(AMQProtocolEngine.java:343)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.dataBlockReceived(AMQProtocolEngine.java:292)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:255)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:86)
              at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:118)
              at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:37)
              at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:152)
              at java.lang.Thread.run(Thread.java:662)
      2012-03-07 19:22:37,135 ERROR [IoReceiver - /169.82.100.202:51439] (AMQProtocolEngine.java:520) - Unexpected exception while processing frame.  Closing connection.
      java.lang.RuntimeException: Failed to commit transaction
              at org.apache.qpid.server.txn.LocalTransaction.commit(LocalTransaction.java:271)
              at org.apache.qpid.server.txn.LocalTransaction.commit(LocalTransaction.java:241)
              at org.apache.qpid.server.queue.SimpleAMQQueue.delete(SimpleAMQQueue.java:1619)
              at org.apache.qpid.server.handler.QueueDeleteHandler.methodReceived(QueueDeleteHandler.java:114)
              at org.apache.qpid.server.handler.ServerMethodDispatcherImpl.dispatchQueueDelete(ServerMethodDispatcherImpl.java:518)
              at org.apache.qpid.framing.amqp_0_9.QueueDeleteBodyImpl.execute(QueueDeleteBodyImpl.java:153)
              at org.apache.qpid.server.state.AMQStateManager.methodReceived(AMQStateManager.java:109)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.methodFrameReceived(AMQProtocolEngine.java:454)
              at org.apache.qpid.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:97)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.frameReceived(AMQProtocolEngine.java:343)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.dataBlockReceived(AMQProtocolEngine.java:292)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:255)
              at org.apache.qpid.server.protocol.AMQProtocolEngine.received(AMQProtocolEngine.java:86)
              at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:118)
              at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:37)
              at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:152)
              at java.lang.Thread.run(Thread.java:662)
      Caused by: org.apache.qpid.transport.SenderClosedException: sender is closed
      

        Activity

        Hide
        Alex Rudyy added a comment -

        Attached patch with a fix

        Show
        Alex Rudyy added a comment - Attached patch with a fix
        Hide
        Alex Rudyy added a comment -

        Robbie,

        Do you mind to review and commit the patch attached?

        Show
        Alex Rudyy added a comment - Robbie, Do you mind to review and commit the patch attached?
        Hide
        Keith Wall added a comment -

        Hi Alex,

        A couple of comments from me:

        1) The two new tests are missing their licences.

        2) getID vs getId

        I realise this was existing, and you are merely exposing in the interface, but is it possible to rename one to have a better name before we see the next ID vs Id defect?

        3) I wonder whether we really need the additional complication of AMQQueue#unregisterBlocked().

        Would it not sufficient to rely on your new condition "&& !isClosing()" in AMQChannel#unblock() to mean that SimpleAMQQueue's calls to checkCapacity()s will run without failure and remove the closed channel from the list? I realise this is not as timely as your 'active' method and will mean the channel remains in the list until the queue depth falls below the flowResumeCapacity before clean-up can occur. (AMQQueue's interface is already too complex)

        What do you think?

        Show
        Keith Wall added a comment - Hi Alex, A couple of comments from me: 1) The two new tests are missing their licences. 2) getID vs getId I realise this was existing, and you are merely exposing in the interface, but is it possible to rename one to have a better name before we see the next ID vs Id defect? 3) I wonder whether we really need the additional complication of AMQQueue#unregisterBlocked(). Would it not sufficient to rely on your new condition "&& !isClosing()" in AMQChannel#unblock() to mean that SimpleAMQQueue's calls to checkCapacity()s will run without failure and remove the closed channel from the list? I realise this is not as timely as your 'active' method and will mean the channel remains in the list until the queue depth falls below the flowResumeCapacity before clean-up can occur. (AMQQueue's interface is already too complex) What do you think?
        Hide
        Alex Rudyy added a comment -

        Hi Keith,
        I attached a new patch fixing the issues you had pointed before.

        Please find my comments in line

        >A couple of comments from me:
        >1) The two new tests are missing their licences.

        License text is added

        >2) getID vs getId

        >I realise this was existing, and you are merely exposing in the interface, but is it possible to rename one to have a better name before we see the next ID vs Id defect?

        I removed getID method to avoid confusions

        >3) I wonder whether we really need the additional complication of AMQQueue#unregisterBlocked().

        >Would it not sufficient to rely on your new condition "&& !isClosing()" in AMQChannel#unblock() to mean that SimpleAMQQueue's calls to checkCapacity()s will run without failure and remove the closed channel from the list? I realise this is not as timely as your 'active' method and will mean the channel remains in the list until the queue depth falls below the flowResumeCapacity before clean-up can occur. (AMQQueue's interface is already too complex)

        I removed AMQQueue#unregisterBlocked declaration its implementations completely.

        Also, initially I planed to change compareTo methods in AMQChannel and ServerSession to compare object addresses in memory but decided to leave current implementation which delegates compareTo to UUID instances.

        What do you think about it?

        Show
        Alex Rudyy added a comment - Hi Keith, I attached a new patch fixing the issues you had pointed before. Please find my comments in line >A couple of comments from me: >1) The two new tests are missing their licences. License text is added >2) getID vs getId >I realise this was existing, and you are merely exposing in the interface, but is it possible to rename one to have a better name before we see the next ID vs Id defect? I removed getID method to avoid confusions >3) I wonder whether we really need the additional complication of AMQQueue#unregisterBlocked(). >Would it not sufficient to rely on your new condition "&& !isClosing()" in AMQChannel#unblock() to mean that SimpleAMQQueue's calls to checkCapacity()s will run without failure and remove the closed channel from the list? I realise this is not as timely as your 'active' method and will mean the channel remains in the list until the queue depth falls below the flowResumeCapacity before clean-up can occur. (AMQQueue's interface is already too complex) I removed AMQQueue#unregisterBlocked declaration its implementations completely. Also, initially I planed to change compareTo methods in AMQChannel and ServerSession to compare object addresses in memory but decided to leave current implementation which delegates compareTo to UUID instances. What do you think about it?
        Hide
        Alex Rudyy added a comment -

        Keith,
        Could you please review and commit an updated patch?

        Show
        Alex Rudyy added a comment - Keith, Could you please review and commit an updated patch?
        Hide
        Keith Wall added a comment -

        Patch applied.

        Show
        Keith Wall added a comment - Patch applied.

          People

          • Assignee:
            Keith Wall
            Reporter:
            Alex Rudyy
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development