Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-5912

"Unable to find message with id" exception will result if message is delivered straight through and transport exception occurs during send to consumer

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • None
    • 0.29
    • Broker-J
    • None

    Description

      If a message takes the straight through delivery path through the Broker (that is, where the IO thread that accepts the message from the publisher is the same thread that delivers the message to the consumer), and the send to the consumer fails with exception (for instance, a sender timeout exception), the connection to the consumer is rightfully closed. However, when the consumer reconnects, the Broker sends the message to the consumer successfully, but will suffer an "Unable to find message with id" store exception when the consumers tries to or ack the message/commit.

      The issue is that mishandling of the sendertimeout exception has meant the the Broker has failed to perform publishing post delivery tasks so the message is never committed to disk.

      Reproduction

      1. Prepare a JMS publisher application that sends one message, make message sufficiently large to overwhelm the TCP buffers (I use 100MB).
      2. Prepare a JMS consumer application to receive the message.
      3. Start the JMS consumer application, once it has begun to await the message (MessageConsumer.receive()) either stop the process or put a breakpoint in IoReceiver#run line 160)
      4. Start the JMS publisher, it will send the message to the Broker
      5. Broker will choose the straight through path
      6. Broker's send to consumer will timeout (stack 1 below)
      7. JMS consumer will be disconnected
      8. Restart the JMS consumer
      9. Expected behave message delivered to consumer successfully. Actual behaviour, Broker reports exception when client tries to ack the message (stack 2 below)

      stack 1 - Broker reports timeout on send to consumer

      2014-07-22 11:32:56,296 ERROR [IoReceiver - /127.0.0.1:57756] (io.IoSender) - write timed out for socket /127.0.0.1:57748: head -2146434251, tail -2146958539
      2014-07-22 11:32:56,297 INFO  [IoReceiver - /127.0.0.1:57748] (subscription.close) - [Broker] [sub:7(vh(/default)/qu(myqueue)] SUB-1002 : Close
      2014-07-22 11:32:56,297 ERROR [IoReceiver - /127.0.0.1:57756] (v0_10.ServerSessionDelegate) - Exception processing command
      org.apache.qpid.transport.SenderException: write timed out for socket /127.0.0.1:57748: head -2146434251, tail -2146958539
      	at org.apache.qpid.transport.SenderException.rethrow(SenderException.java:49)
      	at org.apache.qpid.transport.Session.invoke(Session.java:784)
      	at org.apache.qpid.server.protocol.v0_10.ServerSession.sendMessage(ServerSession.java:263)
      	at org.apache.qpid.server.protocol.v0_10.ConsumerTarget_0_10.send(ConsumerTarget_0_10.java:304)
      	at org.apache.qpid.server.queue.QueueConsumerImpl.send(QueueConsumerImpl.java:476)
      	at org.apache.qpid.server.queue.AbstractQueue.deliverMessage(AbstractQueue.java:1117)
      	at org.apache.qpid.server.queue.AbstractQueue.deliverToConsumer(AbstractQueue.java:1041)
      	at org.apache.qpid.server.queue.AbstractQueue.access$200(AbstractQueue.java:91)
      	at org.apache.qpid.server.queue.AbstractQueue$4.run(AbstractQueue.java:985)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:356)
      	at org.apache.qpid.server.queue.AbstractQueue.doEnqueue(AbstractQueue.java:941)
      	at org.apache.qpid.server.queue.AbstractQueue.enqueue(AbstractQueue.java:888)
      	at org.apache.qpid.server.exchange.AbstractExchange$2.postCommit(AbstractExchange.java:535)
      	at org.apache.qpid.server.protocol.v0_10.ServerSession$AsyncCommand.complete(ServerSession.java:971)
      	at org.apache.qpid.server.protocol.v0_10.ServerSession.completeAsyncCommands(ServerSession.java:916)
      	at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:103)
      	at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77)
      	at org.apache.qpid.transport.Method.delegate(Method.java:159)
      	at org.apache.qpid.transport.Session.received(Session.java:596)
      	at org.apache.qpid.transport.Connection.dispatch(Connection.java:439)
      	at org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:64)
      	at org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:40)
      	at org.apache.qpid.transport.MethodDelegate.messageTransfer(MethodDelegate.java:113)
      

      stack 2 - broker fail to ack message after client reconnects

      2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806] (util.ServerScopedRuntimeException) - Unable to find message with id 3 on queue myqueue with id 63798825-4070-47df-8187-b58328a3d942
      2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806] (v0_10.ServerSessionDelegate) - Exception processing command
      org.apache.qpid.server.store.StoreException: Unable to find message with id 3 on queue myqueue with id 63798825-4070-47df-8187-b58328a3d942
      	at org.apache.qpid.server.store.AbstractJDBCMessageStore.dequeueMessage(AbstractJDBCMessageStore.java:649)
      	at org.apache.qpid.server.store.AbstractJDBCMessageStore.access$300(AbstractJDBCMessageStore.java:49)
      	at org.apache.qpid.server.store.AbstractJDBCMessageStore$JDBCTransaction.dequeueMessage(AbstractJDBCMessageStore.java:1166)
      	at org.apache.qpid.server.txn.AsyncAutoCommitTransaction.dequeue(AsyncAutoCommitTransaction.java:104)
      	at org.apache.qpid.server.protocol.v0_10.ServerSession.acknowledge(ServerSession.java:457)
      	at org.apache.qpid.server.protocol.v0_10.ExplicitAcceptDispositionChangeListener.onAccept(ExplicitAcceptDispositionChangeListener.java:46)
      	at org.apache.qpid.server.protocol.v0_10.ServerSession$3.performAction(ServerSession.java:283)
      	at org.apache.qpid.server.protocol.v0_10.ServerSession.dispositionChange(ServerSession.java:372)
      	at org.apache.qpid.server.protocol.v0_10.ServerSession.accept(ServerSession.java:279)
      	at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:128)
      	at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:77)
      	at org.apache.qpid.transport.MessageAccept.dispatch(MessageAccept.java:87)
      	at org.apache.qpid.transport.SessionDelegate.command(SessionDelegate.java:55)
      	at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:94)
      	at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77)
      

      Attachments

        1. qpid_after_fix.log
          24 kB
          Keith Wall
        2. qpid_before_fix.log
          30 kB
          Keith Wall

        Activity

          People

            rgodfrey Robert Godfrey
            kwall Keith Wall
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: