Qpid
  1. Qpid
  2. QPID-3978

Broker should remove messages with expired TTL even if they are prefetched by the client

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.10, 0.12, 0.14
    • Fix Version/s: 0.30
    • Component/s: Java Broker
    • Labels:
      None

      Description

      Broker should remove messages with expired TTL even if they are prefetched by the client.

      If client application prefetches the messages and hangs, the messages with expired TTL are remained on broker till client is closed or killed.

      The prefetched messages with expired TTL can cause a lot of confusions to the end users. It would be better to remove them on the broker after TTL is expired without waiting for acknowledgment/releasing them from a client.

        Activity

        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open In Progress In Progress
        830d 2h 9m 1 Rob Godfrey 08/Aug/14 14:16
        In Progress In Progress Reviewable Reviewable
        5s 1 Rob Godfrey 08/Aug/14 14:16
        Reviewable Reviewable Resolved Resolved
        10d 3h 17m 1 Keith Wall 18/Aug/14 17:34
        Resolved Resolved Closed Closed
        38d 23h 9m 1 Justin Ross 26/Sep/14 16:43
        Justin Ross made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Hide
        Justin Ross added a comment -
        Show
        Justin Ross added a comment - Released in Qpid 0.30, http://qpid.apache.org/releases/qpid-0.30/index.html
        Rob Godfrey made changes -
        Fix Version/s 0.30 [ 12327563 ]
        Fix Version/s 0.31 [ 12327564 ]
        Keith Wall made changes -
        Status Reviewable [ 10006 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Keith Wall added a comment -

        Changes look reasonable to me. As we have had many requests for this feature it would be useful/low risk to include in 0.30.

        Show
        Keith Wall added a comment - Changes look reasonable to me. As we have had many requests for this feature it would be useful/low risk to include in 0.30.
        Hide
        ASF subversion and git services added a comment -

        Commit 1618660 from Keith Wall in branch 'qpid/trunk'
        [ https://svn.apache.org/r1618660 ]

        QPID-3978 : [Java Broker] Download severity of log message 'MessageRelease received for message which has not been acquired' to debug

        This message is now normal and will happen as a result of ttl or management action.

        Show
        ASF subversion and git services added a comment - Commit 1618660 from Keith Wall in branch 'qpid/trunk' [ https://svn.apache.org/r1618660 ] QPID-3978 : [Java Broker] Download severity of log message 'MessageRelease received for message which has not been acquired' to debug This message is now normal and will happen as a result of ttl or management action.
        Hide
        ASF subversion and git services added a comment -

        Commit 1618082 from Rob Godfrey in branch 'qpid/trunk'
        [ https://svn.apache.org/r1618082 ]

        QPID-3978 : [Java Broker] Add unit test for UnacknowledgedMessageMap

        Show
        ASF subversion and git services added a comment - Commit 1618082 from Rob Godfrey in branch 'qpid/trunk' [ https://svn.apache.org/r1618082 ] QPID-3978 : [Java Broker] Add unit test for UnacknowledgedMessageMap
        Hide
        Rob Godfrey added a comment -

        Doh - that was a dumb bug. I've committed a fix if you want to re-test. I'll add a unit test in the morning to cover that case.

        Show
        Rob Godfrey added a comment - Doh - that was a dumb bug. I've committed a fix if you want to re-test. I'll add a unit test in the morning to cover that case.
        Hide
        ASF subversion and git services added a comment -

        Commit 1618079 from Rob Godfrey in branch 'qpid/trunk'
        [ https://svn.apache.org/r1618079 ]

        QPID-3978 : [Java Broker] Fix bug in unacknowledged map where non locked (deleted) messages were not being excluded from the messages to dequeue

        Show
        ASF subversion and git services added a comment - Commit 1618079 from Rob Godfrey in branch 'qpid/trunk' [ https://svn.apache.org/r1618079 ] QPID-3978 : [Java Broker] Fix bug in unacknowledged map where non locked (deleted) messages were not being excluded from the messages to dequeue
        Hide
        Rob Godfrey added a comment -

        Hmmm - OK, I'll look into this - this was with AUTO-ACK?

        Show
        Rob Godfrey added a comment - Hmmm - OK, I'll look into this - this was with AUTO-ACK?
        Hide
        Keith Wall added a comment -

        Rob, I gave this feature a manual test and ran into a problem. I'm testing with a test client that creates a consumer, calls start/receiveNoWait to start the message flow, then publishes a message with short TTL, then receive the message. I put a breakpoint on the last receive, and let the Broker TTL the message. When I let the client proceed, I see the following stack and the connection is closed.

        014-08-14 23:27:46,122 ERROR [IoReceiver - /127.0.0.1:61837] (util.ServerScopedRuntimeException) - Unable to find message with id 5 on queue myqueue with id 30580495-7043-49ef-9b86-71bd85cead6b
        2014-08-14 23:27:46,123 ERROR [IoReceiver - /127.0.0.1:61837] (v0_8.AMQProtocolEngine) - Unexpected exception while processing frame.  Closing connection.
        org.apache.qpid.server.store.StoreException: Unable to find message with id 5 on queue myqueue with id 30580495-7043-49ef-9b86-71bd85cead6b
        	at org.apache.qpid.server.store.AbstractJDBCMessageStore.dequeueMessage(AbstractJDBCMessageStore.java:647)
        	at org.apache.qpid.server.store.AbstractJDBCMessageStore.access$600(AbstractJDBCMessageStore.java:49)
        	at org.apache.qpid.server.store.AbstractJDBCMessageStore$JDBCTransaction.dequeueMessage(AbstractJDBCMessageStore.java:1225)
        	at org.apache.qpid.server.txn.AsyncAutoCommitTransaction.dequeue(AsyncAutoCommitTransaction.java:177)
        	at org.apache.qpid.server.protocol.v0_8.AMQChannel.acknowledgeMessage(AMQChannel.java:985)
        	at org.apache.qpid.server.protocol.v0_8.handler.BasicAckMethodHandler.methodReceived(BasicAckMethodHandler.java:65)
        	at org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl.dispatchBasicAck(ServerMethodDispatcherImpl.java:133)
        	at org.apache.qpid.framing.amqp_0_9.BasicAckBodyImpl.execute(BasicAckBodyImpl.java:113)
        	at org.apache.qpid.server.protocol.v0_8.state.AMQStateManager$1.run(AMQStateManager.java:122)
        	at org.apache.qpid.server.protocol.v0_8.state.AMQStateManager$1.run(AMQStateManager.java:118)
        	at java.security.AccessController.doPrivileged(Native Method)
        	at javax.security.auth.Subject.doAs(Subject.java:415)
        	at org.apache.qpid.server.protocol.v0_8.state.AMQStateManager.methodReceived(AMQStateManager.java:117)
        	at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.methodFrameReceived(AMQProtocolEngine.java:617)
        	at org.apache.qpid.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:99)
        	at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.frameReceived(AMQProtocolEngine.java:470)
        	at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.dataBlockReceived(AMQProtocolEngine.java:405)
        	at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.access$1100(AMQProtocolEngine.java:89)
        	at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine$2.run(AMQProtocolEngine.java:309)
        	at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine$2.run(AMQProtocolEngine.java:281)
        	at java.security.AccessController.doPrivileged(Native Method)
        	at javax.security.auth.Subject.doAs(Subject.java:356)
        	at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.received(AMQProtocolEngine.java:280)
        	at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.received(AMQProtocolEngine.java:89)
        	at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:133)
        	at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:49)
        	at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161)
        	at java.lang.Thread.run(Thread.java:745)
        
        
        Show
        Keith Wall added a comment - Rob, I gave this feature a manual test and ran into a problem. I'm testing with a test client that creates a consumer, calls start/receiveNoWait to start the message flow, then publishes a message with short TTL, then receive the message. I put a breakpoint on the last receive, and let the Broker TTL the message. When I let the client proceed, I see the following stack and the connection is closed. 014-08-14 23:27:46,122 ERROR [IoReceiver - /127.0.0.1:61837] (util.ServerScopedRuntimeException) - Unable to find message with id 5 on queue myqueue with id 30580495-7043-49ef-9b86-71bd85cead6b 2014-08-14 23:27:46,123 ERROR [IoReceiver - /127.0.0.1:61837] (v0_8.AMQProtocolEngine) - Unexpected exception while processing frame. Closing connection. org.apache.qpid.server.store.StoreException: Unable to find message with id 5 on queue myqueue with id 30580495-7043-49ef-9b86-71bd85cead6b at org.apache.qpid.server.store.AbstractJDBCMessageStore.dequeueMessage(AbstractJDBCMessageStore.java:647) at org.apache.qpid.server.store.AbstractJDBCMessageStore.access$600(AbstractJDBCMessageStore.java:49) at org.apache.qpid.server.store.AbstractJDBCMessageStore$JDBCTransaction.dequeueMessage(AbstractJDBCMessageStore.java:1225) at org.apache.qpid.server.txn.AsyncAutoCommitTransaction.dequeue(AsyncAutoCommitTransaction.java:177) at org.apache.qpid.server.protocol.v0_8.AMQChannel.acknowledgeMessage(AMQChannel.java:985) at org.apache.qpid.server.protocol.v0_8.handler.BasicAckMethodHandler.methodReceived(BasicAckMethodHandler.java:65) at org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl.dispatchBasicAck(ServerMethodDispatcherImpl.java:133) at org.apache.qpid.framing.amqp_0_9.BasicAckBodyImpl.execute(BasicAckBodyImpl.java:113) at org.apache.qpid.server.protocol.v0_8.state.AMQStateManager$1.run(AMQStateManager.java:122) at org.apache.qpid.server.protocol.v0_8.state.AMQStateManager$1.run(AMQStateManager.java:118) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.qpid.server.protocol.v0_8.state.AMQStateManager.methodReceived(AMQStateManager.java:117) at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.methodFrameReceived(AMQProtocolEngine.java:617) at org.apache.qpid.framing.AMQMethodBodyImpl.handle(AMQMethodBodyImpl.java:99) at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.frameReceived(AMQProtocolEngine.java:470) at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.dataBlockReceived(AMQProtocolEngine.java:405) at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.access$1100(AMQProtocolEngine.java:89) at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine$2.run(AMQProtocolEngine.java:309) at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine$2.run(AMQProtocolEngine.java:281) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:356) at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.received(AMQProtocolEngine.java:280) at org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine.received(AMQProtocolEngine.java:89) at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:133) at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:49) at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:161) at java.lang.Thread.run(Thread.java:745)
        Hide
        ASF subversion and git services added a comment -

        Commit 1616742 from Rob Godfrey in branch 'qpid/trunk'
        [ https://svn.apache.org/r1616742 ]

        QPID-3978 : [Java Broker] Allow for acquired messages to be removed from a queue due to TTL or management actions

        Show
        ASF subversion and git services added a comment - Commit 1616742 from Rob Godfrey in branch 'qpid/trunk' [ https://svn.apache.org/r1616742 ] QPID-3978 : [Java Broker] Allow for acquired messages to be removed from a queue due to TTL or management actions
        Rob Godfrey made changes -
        Status In Progress [ 3 ] Reviewable [ 10006 ]
        Rob Godfrey made changes -
        Status Open [ 1 ] In Progress [ 3 ]
        Rob Godfrey made changes -
        Fix Version/s 0.31 [ 12327564 ]
        Rob Godfrey made changes -
        Field Original Value New Value
        Assignee Rob Godfrey [ rgodfrey ]
        Alex Rudyy created issue -

          People

          • Assignee:
            Rob Godfrey
            Reporter:
            Alex Rudyy
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development