Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-7634

[Java Broker,amqp 1.0] Broker does not respond to Flow command with drain=true if queue is empty and prefetch is 0

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • qpid-java-6.1, qpid-java-6.1.1, qpid-java-broker-7.0.0
    • qpid-java-broker-7.0.0
    • Broker-J
    • None

    Description

      When AMQP 1.0 client sends the Flow command with drain=true and queue is empty the Broker does not respond if prefetch is 0.

      The following snippet reproduces the issue

      Connection connection = factory.createConnection(username, password);
      connection.start();
      
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      MessageConsumer messageConsumer = session.createConsumer(queue);
      
      TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait(); // <-- times out
      

      The broker logs

      2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - RECV[/127.0.0.1:51108|1] : Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list],capabilities=[queue, global]},target=Target{}}
      2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue' with arguments 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1] ], consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, optionSet=[ACQUIRES, SEES_REQUEUES]']
      2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER
      2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing default: ALLOWED
      2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, type=Consumer]']
      2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, type=Consumer]'] performed successfully with result: null
      2017-01-24 11:05:56,476 INFO  [VirtualHostNode-default-Config] (q.m.s.create) - [con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)] SUB-1001 : Create
      2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config] (o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with arguments 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1] ], consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result: Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue, type=Consumer]
      2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - SEND[/127.0.0.1:51108|1] : Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}}
      2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Written 251 bytes
      2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
      2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s)
      2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - RECV[/127.0.0.1:51108|1] : Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
      2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
      2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
      ...
      

      Relevant part of the spec:

      http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-flow-control

      If the sender's drain flag is set and there are no available messages, the sender MUST advance its delivery-count until link-credit is zero, and send its updated flow state to the receiver.

      Attachments

        Activity

          People

            Unassigned Unassigned
            orudyy Alex Rudyy
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: