Qpid
  1. Qpid
  2. QPID-3927

Java Broker Priority Queue dispatch not working correctly

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 0.15
    • Fix Version/s: 0.16
    • Component/s: Java Broker
    • Labels:
    • Environment:

      Ubuntu 10.04 LTS,
      Java Broker (0.16), Java Client (0.16), BDBMessageStore backend.

      Description

      The following steps reproduce the issue.

      1) I create a priority Queue (with 10 levels) - Q1
      2) I create a listener for Q1 and attach it to a consumer.
      3) I enqueue a message to the priority Queue with priority 0.
      4) The message gets dequeued by the subscribed consumer, and in the handler callback, I re-enqueue the message that I dequeued with default priority(4) using another session & commit to the same queue Q1.
      5) Commit the consumer session.
      6) Ideally the message I re-enqueued should be delivered the consumer when the handler returns. The message delivery happens sometimes/sometimes not (flaps).

      I decided to take a swing at this and get to the bottom of the problem and looked into the Java Broker code, and this is what I see is happening:

      I realized that the problem was in the logic of the method
      public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node); in priorityQueueList.java

      This method walks through the priority list using the node passed in as the base node, and walks to list to the end. But this causes a problem, when encountering race conditions where
      new messages arrive at a higher priority when a message of lower priority is still worked upon.

      Case 1)

      1) I create a priority Queue (with 10 levels) - Q1
      2) I create a listener for Q1 and attach it to a consumer.
      3) I enqueue a message to the priority Queue with priority 0.
      4) At this point, since the subscriber is free/available, the enqueued message is delivered to the subscriber synchronously. QueueContext (Last Seen -> msg1 in P0(acquired state), released entry -> null)
      5) Enqueue a message to P4 ~ delivery async as the subscriber is busy. QueueContext(last seen -> msg1 in p0(acquired state), released entry -> msg1 in p4(availabe state)).
      6) commit the consumer session and return the handler.
      7) msg in p4 dispatched, QueueContext(last seen -> msg1 in p4(acquired state))
      8) QueueRunner runs again, and updates the QueueContext(last seen -> msg1 in p0) ~ as msg 1 in P0 is still in acquired state.
      9) msg 2 now enqueued to p4Q. QueueContext(last seen -> msg1 in p0, msg2 in p4)
      10) now the subscriber cannot make progress, as the comparison for the next message [QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);] always goes to _entries.next(lastSeen), which is at the last priority and thinks it doesn't have anything to process.

      Example Sequence for problem described above.
      P4 Q -> msg1 -> msg2(arrives after msg1 of p4 is dispatched, but before msg1 of p0 status changes to deleted).
      p0 Q -> msg1

      In the above scenario, I'm still quite confused on the race in state change between acquired -> deleted.

      Case 2)

      1) I create a priority Queue (with 10 levels) - Q1
      2) I enqueue a message with priority P4.
      3) I consume the message synchronously (consumer.receive)., so that the message is now marked deleted.
      4) I commit/close the consumer
      5) I create a new listener for Q1 and attach it to a consumer.
      6) I enqueue a message to the priority Queue with priority 0.
      7) At this point, since the subscriber is free/available, the enqueued message is delivered to the subscriber synchronously. QueueContext (Last Seen -> msg1 in P0(acquired state), released entry -> null)
      8) Enqueue a message to P4 ~ delivery async as the subscriber is busy. QueueContext(last seen -> msg1 in p0(acquired state), released entry -> msg2 in p4(availabe state)).
      9) commit the consumer session and return the handler.
      10) now queueRunner, tries to make progress, but the subscriber cannot make progress, as the comparison for the next message [QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);] always goes to _entries.next(lastSeen), which is at the last priority and thinks it doesn't have anything to process.

      Example Sequence for problem described above. Here msg1 is consumed synchronously before starting the asynchronous consumer.
      P4 Q -> msg1(deleted) -> msg2
      p0 Q -> msg1

      I hope i'm quite clear in the above notes, either way I've attached a test which totally reproduces the issue that I mention here!

      1. QpidPriorityQueueRaceIssue2.java
        6 kB
        Praveen Murugesan
      2. QpidPriorityQueueRaceIssue2.java
        6 kB
        Praveen Murugesan
      3. QpidPriorityQueueRaceIssue.java
        6 kB
        Praveen Murugesan

        Activity

        Hide
        Robbie Gemmell added a comment -

        Changes merged to the 0.16 branch, updating the fix-for version accordingly.

        Show
        Robbie Gemmell added a comment - Changes merged to the 0.16 branch, updating the fix-for version accordingly.
        Hide
        Justin Ross added a comment -

        Reviewed by Alex. Approved for 0.16.

        Show
        Justin Ross added a comment - Reviewed by Alex. Approved for 0.16.
        Hide
        Alex Rudyy added a comment -

        Reviewed with no comments

        Show
        Alex Rudyy added a comment - Reviewed with no comments
        Hide
        Robbie Gemmell added a comment -

        Thats right; if the priority differs then the decision can/must be made solely based on that since it means the entries are within entirely different QueueEntryLists, however if both are of the same priority then the entry IDs must be used to determine the relative position within a particular QueueEntryList.

        Show
        Robbie Gemmell added a comment - Thats right; if the priority differs then the decision can/must be made solely based on that since it means the entries are within entirely different QueueEntryLists, however if both are of the same priority then the entry IDs must be used to determine the relative position within a particular QueueEntryList.
        Hide
        Praveen Murugesan added a comment -

        Awesome...thanks a lot Robbie for the update...Now I understand the cause quite well.

        i looked at your CL..and it makes sense..I see that now you're actually comparing against the priority value itself, unlike earlier where the id's of the last seen/released messages were compared.

        Thanks a ton...I will try to play around with the updated code in trunk, and let you know if I find any issues (I guess I won't)

        Show
        Praveen Murugesan added a comment - Awesome...thanks a lot Robbie for the update...Now I understand the cause quite well. i looked at your CL..and it makes sense..I see that now you're actually comparing against the priority value itself, unlike earlier where the id's of the last seen/released messages were compared. Thanks a ton...I will try to play around with the updated code in trunk, and let you know if I find any issues (I guess I won't)
        Hide
        Robbie Gemmell added a comment -

        Hi Alex, can you click the review button please

        Show
        Robbie Gemmell added a comment - Hi Alex, can you click the review button please
        Hide
        Robbie Gemmell added a comment -

        Hi praveen,

        The second consumer being present meant it would never work as-was, but it did indeed have nothing to do with why it failed to work originally, which actually seems to be identical in both test cases above in the end.

        The out-of-order queues (Priority, Sorted) make specific attempt during enqueues to ensure that when a message is added that would violate traditional FIFO behaviour that the consumer subscriptions are updated to reflect that they need to 'go back' and progress delivery from this 'earlier' point. This is ultimately done by leveraging the same underlying mechanism used to handle messages being acquired by consumers and then released back onto the queue again without actually being consumed (hence the 'released' field name references scattered throughout the process). There was a fairly clear defect in the way checking the need for such updates was performed with the PriorityQueues meaning it could fail to take the proper course of action and lead to the situation you observed, where a subscription had effectively already passed one or more higher priority messages and wouldnt be able to 'go back' for them meaning they would just sit on the queue.

        The update I made seems like it should fix the problem, and did allow both your examples to run reliably (after the modifications as mentioned above). In addition to the unit test(s) added with the update earlier, I have now also added a system test based on the process in your example which would sporadically fail to receive all the messages without the fix in place, and then reliably receives them all with it present.

        This is all on trunk now so you can try it out for yourself if you like. I forced the build of the nightly release job for the Java artefacts (https://builds.apache.org/view/M-R/view/Qpid/job/Qpid-Java-Artefact-Release) to run early, so the broker artefacts at this URL should now contain the fix: https://builds.apache.org/view/M-R/view/Qpid/job/Qpid-Java-Artefact-Release/lastSuccessfulBuild/artifact/trunk/qpid/java/broker/release/

        Robbie

        Show
        Robbie Gemmell added a comment - Hi praveen, The second consumer being present meant it would never work as-was, but it did indeed have nothing to do with why it failed to work originally, which actually seems to be identical in both test cases above in the end. The out-of-order queues (Priority, Sorted) make specific attempt during enqueues to ensure that when a message is added that would violate traditional FIFO behaviour that the consumer subscriptions are updated to reflect that they need to 'go back' and progress delivery from this 'earlier' point. This is ultimately done by leveraging the same underlying mechanism used to handle messages being acquired by consumers and then released back onto the queue again without actually being consumed (hence the 'released' field name references scattered throughout the process). There was a fairly clear defect in the way checking the need for such updates was performed with the PriorityQueues meaning it could fail to take the proper course of action and lead to the situation you observed, where a subscription had effectively already passed one or more higher priority messages and wouldnt be able to 'go back' for them meaning they would just sit on the queue. The update I made seems like it should fix the problem, and did allow both your examples to run reliably (after the modifications as mentioned above). In addition to the unit test(s) added with the update earlier, I have now also added a system test based on the process in your example which would sporadically fail to receive all the messages without the fix in place, and then reliably receives them all with it present. This is all on trunk now so you can try it out for yourself if you like. I forced the build of the nightly release job for the Java artefacts ( https://builds.apache.org/view/M-R/view/Qpid/job/Qpid-Java-Artefact-Release ) to run early, so the broker artefacts at this URL should now contain the fix: https://builds.apache.org/view/M-R/view/Qpid/job/Qpid-Java-Artefact-Release/lastSuccessfulBuild/artifact/trunk/qpid/java/broker/release/ Robbie
        Hide
        Praveen Murugesan added a comment -

        Hi Robbie,

        Thanks for the update.

        In the second case, I meant to close to consumer in the failing test that I sent you.

        I've updated the test and added it again. Even when I close the initial consumer the issue exists. Would that also be fixed by your change?

        Thanks a lot for taking this up immediately. It'd be great if we can get this in 0.16.

        Thanks,
        Praveen

        Show
        Praveen Murugesan added a comment - Hi Robbie, Thanks for the update. In the second case, I meant to close to consumer in the failing test that I sent you. I've updated the test and added it again. Even when I close the initial consumer the issue exists. Would that also be fixed by your change? Thanks a lot for taking this up immediately. It'd be great if we can get this in 0.16. Thanks, Praveen
        Hide
        Robbie Gemmell added a comment -

        I have taken a look at this today and noticed an issue in the PriorityQueue, which looks to have been possibly highlighted by a change made since 0.14 to resolve a not entirely dissimilar issue with the SortedQueue implementation which was added (see QPID-3704).

        I have committed a candidate fix to trunk, I'll request its inclusion in 0.16 once I do/add some further testing, although it is Justins call whether it makes the release.

        The example for your second case wasn't closing its initial consumer, so this will cause it to hold a prefetched message. Both examples were leaving messages on the broker when they were 'complete'. Modifying them to resolve those issues, I was able to repeatedly run them both successfully with the above mentioned change in place.

        Show
        Robbie Gemmell added a comment - I have taken a look at this today and noticed an issue in the PriorityQueue, which looks to have been possibly highlighted by a change made since 0.14 to resolve a not entirely dissimilar issue with the SortedQueue implementation which was added (see QPID-3704 ). I have committed a candidate fix to trunk, I'll request its inclusion in 0.16 once I do/add some further testing, although it is Justins call whether it makes the release. The example for your second case wasn't closing its initial consumer, so this will cause it to hold a prefetched message. Both examples were leaving messages on the broker when they were 'complete'. Modifying them to resolve those issues, I was able to repeatedly run them both successfully with the above mentioned change in place.

          People

          • Assignee:
            Alex Rudyy
            Reporter:
            Praveen Murugesan
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development