Description
If a message takes the straight through delivery path through the Broker (that is, where the IO thread that accepts the message from the publisher is the same thread that delivers the message to the consumer), and the send to the consumer fails with exception (for instance, a sender timeout exception), the connection to the consumer is rightfully closed. However, when the consumer reconnects, the Broker sends the message to the consumer successfully, but will suffer an "Unable to find message with id" store exception when the consumers tries to or ack the message/commit.
The issue is that mishandling of the sendertimeout exception has meant the the Broker has failed to perform publishing post delivery tasks so the message is never committed to disk.
Reproduction
- Prepare a JMS publisher application that sends one message, make message sufficiently large to overwhelm the TCP buffers (I use 100MB).
- Prepare a JMS consumer application to receive the message.
- Start the JMS consumer application, once it has begun to await the message (MessageConsumer.receive()) either stop the process or put a breakpoint in IoReceiver#run line 160)
- Start the JMS publisher, it will send the message to the Broker
- Broker will choose the straight through path
- Broker's send to consumer will timeout (stack 1 below)
- JMS consumer will be disconnected
- Restart the JMS consumer
- Expected behave message delivered to consumer successfully. Actual behaviour, Broker reports exception when client tries to ack the message (stack 2 below)
stack 1 - Broker reports timeout on send to consumer
2014-07-22 11:32:56,296 ERROR [IoReceiver - /127.0.0.1:57756] (io.IoSender) - write timed out for socket /127.0.0.1:57748: head -2146434251, tail -2146958539 2014-07-22 11:32:56,297 INFO [IoReceiver - /127.0.0.1:57748] (subscription.close) - [Broker] [sub:7(vh(/default)/qu(myqueue)] SUB-1002 : Close 2014-07-22 11:32:56,297 ERROR [IoReceiver - /127.0.0.1:57756] (v0_10.ServerSessionDelegate) - Exception processing command org.apache.qpid.transport.SenderException: write timed out for socket /127.0.0.1:57748: head -2146434251, tail -2146958539 at org.apache.qpid.transport.SenderException.rethrow(SenderException.java:49) at org.apache.qpid.transport.Session.invoke(Session.java:784) at org.apache.qpid.server.protocol.v0_10.ServerSession.sendMessage(ServerSession.java:263) at org.apache.qpid.server.protocol.v0_10.ConsumerTarget_0_10.send(ConsumerTarget_0_10.java:304) at org.apache.qpid.server.queue.QueueConsumerImpl.send(QueueConsumerImpl.java:476) at org.apache.qpid.server.queue.AbstractQueue.deliverMessage(AbstractQueue.java:1117) at org.apache.qpid.server.queue.AbstractQueue.deliverToConsumer(AbstractQueue.java:1041) at org.apache.qpid.server.queue.AbstractQueue.access$200(AbstractQueue.java:91) at org.apache.qpid.server.queue.AbstractQueue$4.run(AbstractQueue.java:985) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:356) at org.apache.qpid.server.queue.AbstractQueue.doEnqueue(AbstractQueue.java:941) at org.apache.qpid.server.queue.AbstractQueue.enqueue(AbstractQueue.java:888) at org.apache.qpid.server.exchange.AbstractExchange$2.postCommit(AbstractExchange.java:535) at org.apache.qpid.server.protocol.v0_10.ServerSession$AsyncCommand.complete(ServerSession.java:971) at org.apache.qpid.server.protocol.v0_10.ServerSession.completeAsyncCommands(ServerSession.java:916) at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:103) at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77) at org.apache.qpid.transport.Method.delegate(Method.java:159) at org.apache.qpid.transport.Session.received(Session.java:596) at org.apache.qpid.transport.Connection.dispatch(Connection.java:439) at org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:64) at org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:40) at org.apache.qpid.transport.MethodDelegate.messageTransfer(MethodDelegate.java:113)
stack 2 - broker fail to ack message after client reconnects
2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806] (util.ServerScopedRuntimeException) - Unable to find message with id 3 on queue myqueue with id 63798825-4070-47df-8187-b58328a3d942 2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806] (v0_10.ServerSessionDelegate) - Exception processing command org.apache.qpid.server.store.StoreException: Unable to find message with id 3 on queue myqueue with id 63798825-4070-47df-8187-b58328a3d942 at org.apache.qpid.server.store.AbstractJDBCMessageStore.dequeueMessage(AbstractJDBCMessageStore.java:649) at org.apache.qpid.server.store.AbstractJDBCMessageStore.access$300(AbstractJDBCMessageStore.java:49) at org.apache.qpid.server.store.AbstractJDBCMessageStore$JDBCTransaction.dequeueMessage(AbstractJDBCMessageStore.java:1166) at org.apache.qpid.server.txn.AsyncAutoCommitTransaction.dequeue(AsyncAutoCommitTransaction.java:104) at org.apache.qpid.server.protocol.v0_10.ServerSession.acknowledge(ServerSession.java:457) at org.apache.qpid.server.protocol.v0_10.ExplicitAcceptDispositionChangeListener.onAccept(ExplicitAcceptDispositionChangeListener.java:46) at org.apache.qpid.server.protocol.v0_10.ServerSession$3.performAction(ServerSession.java:283) at org.apache.qpid.server.protocol.v0_10.ServerSession.dispositionChange(ServerSession.java:372) at org.apache.qpid.server.protocol.v0_10.ServerSession.accept(ServerSession.java:279) at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:128) at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:77) at org.apache.qpid.transport.MessageAccept.dispatch(MessageAccept.java:87) at org.apache.qpid.transport.SessionDelegate.command(SessionDelegate.java:55) at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:94) at org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77)