Qpid
  1. Qpid
  2. QPID-1642

JMS ReceiveNowait does not return a message even if the queue is not empty

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: M4
    • Fix Version/s: JIRA Cleanup
    • Component/s: Java Client
    • Labels:
      None

      Description

      Description:
      The first invocation of receiveNoWait does not return a message even if the queue is full.
      How to replicate:
      1) create a queue
      2) send a message
      3) try to consume this message using receiveNoWait
      ==> we get null even if there is a message seating in the queue
      Solution:
      When a JMS consumer first invokes receiveNowait then BasicMessageConsumer_0_10 request for more credits and then poll the _synchronousQueue. This does not leave enough time for the message to be enqueued.
      One solution would be to query the queue size before allowing the credits: Something like that would work:
      public Object getMessageFromQueue(long l) throws InterruptedException
      {
      long size = 0;
      if(l < 0)

      { size = _0_10session.requestQueueDepth(getDestination()) ; }

      if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())

      { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); }

      if (! getSession().prefetch())

      { _syncReceive.set(true); }

      Object o;// = super.getMessageFromQueue(l);
      if (l > 0)

      { o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); }

      else if (l < 0)
      {
      if(size <= 0 )

      { o = _synchronousQueue.poll(); }

      else

      { o = _synchronousQueue.take(); }

      }
      else

      { o = _synchronousQueue.take(); }

      if (! getSession().prefetch())

      { _syncReceive.set(false); }

      return o;
      }

        Activity

        Hide
        Arnaud Simon added a comment -

        Can you validate the solution?

        Show
        Arnaud Simon added a comment - Can you validate the solution?
        Hide
        Arnaud Simon added a comment -

        Actually we cannot do a take as the message may have been consumed by another consumer
        Maybe the solution would just be to poll with a small timeout that would let a chance for the message to be delivered.

        Show
        Arnaud Simon added a comment - Actually we cannot do a take as the message may have been consumed by another consumer Maybe the solution would just be to poll with a small timeout that would let a chance for the message to be delivered.
        Hide
        Gordon Sim added a comment -

        Question: is the issue that the send in step (2) is asynchronous (i.e. the return from the send method does not imply the message is enqueued)?

        Show
        Gordon Sim added a comment - Question: is the issue that the send in step (2) is asynchronous (i.e. the return from the send method does not imply the message is enqueued)?
        Hide
        Arnaud Simon added a comment -

        no, the problem is that we need to know whether messages have been sent to the client after messageFlow returns.
        I believe that a way to achieve that is to do something like

        _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
        MessageCreditUnit.MESSAGE, 1);
        _0_10session.getQpidSession().messageFlush();
        _0_10session.getQpidSession().sync();

        According to messageFlush definition:
        "Forces the sender to exhaust his credit supply. The sender's credit will always be zero when this command completes.
        The command completes when immediately available message data has been transferred, or when the credit supply
        is exhausted."
        Once the sync ope returns potentially available messages will have been transfered and ready to consume form our local queue synchronousQueue

        Show
        Arnaud Simon added a comment - no, the problem is that we need to know whether messages have been sent to the client after messageFlow returns. I believe that a way to achieve that is to do something like _0_10session.getQpidSession().messageFlow(getConsumerTagString(), MessageCreditUnit.MESSAGE, 1); _0_10session.getQpidSession().messageFlush(); _0_10session.getQpidSession().sync(); According to messageFlush definition: "Forces the sender to exhaust his credit supply. The sender's credit will always be zero when this command completes. The command completes when immediately available message data has been transferred, or when the credit supply is exhausted." Once the sync ope returns potentially available messages will have been transfered and ready to consume form our local queue synchronousQueue
        Hide
        Martin Ritchie added a comment -

        Hi Rafi, is there any more work required?

        Show
        Martin Ritchie added a comment - Hi Rafi, is there any more work required?
        Hide
        Robbie Gemmell added a comment -

        Review-OK'ing issue as part of JIRA cleanup. Issue could be resolved, but may not be: see QPID-3469 for further details.

        Show
        Robbie Gemmell added a comment - Review-OK'ing issue as part of JIRA cleanup. Issue could be resolved, but may not be: see QPID-3469 for further details.

          People

          • Assignee:
            Unassigned
            Reporter:
            Arnaud Simon
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development