Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
qpid-java-6.1, qpid-java-6.1.1, qpid-java-broker-7.0.0
-
None
Description
When AMQP 1.0 client sends the Flow command with drain=true and queue is empty the Broker does not respond if prefetch is 0.
The following snippet reproduces the issue
Connection connection = factory.createConnection(username, password); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createConsumer(queue); TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait(); // <-- times out
The broker logs
2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - RECV[/127.0.0.1:51108|1] : Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list],capabilities=[queue, global]},target=Target{}} 2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue' with arguments 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1] ], consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, optionSet=[ACQUIRES, SEES_REQUEUES]'] 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing default: ALLOWED 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, type=Consumer]'] 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, type=Consumer]'] performed successfully with result: null 2017-01-24 11:05:56,476 INFO [VirtualHostNode-default-Config] (q.m.s.create) - [con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)] SUB-1001 : Create 2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with arguments 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1] ], consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result: Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, type=Consumer] 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - SEND[/127.0.0.1:51108|1] : Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}} 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Written 251 bytes 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s) 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s) 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - RECV[/127.0.0.1:51108|1] : Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true} 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s) 2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s) ...
Relevant part of the spec:
http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-flow-control
If the sender's drain flag is set and there are no available messages, the sender MUST advance its delivery-count until link-credit is zero, and send its updated flow state to the receiver.