Details
Description
The following steps reproduce the issue.
1) I create a priority Queue (with 10 levels) - Q1
2) I create a listener for Q1 and attach it to a consumer.
3) I enqueue a message to the priority Queue with priority 0.
4) The message gets dequeued by the subscribed consumer, and in the handler callback, I re-enqueue the message that I dequeued with default priority(4) using another session & commit to the same queue Q1.
5) Commit the consumer session.
6) Ideally the message I re-enqueued should be delivered the consumer when the handler returns. The message delivery happens sometimes/sometimes not (flaps).
I decided to take a swing at this and get to the bottom of the problem and looked into the Java Broker code, and this is what I see is happening:
I realized that the problem was in the logic of the method
public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node); in priorityQueueList.java
This method walks through the priority list using the node passed in as the base node, and walks to list to the end. But this causes a problem, when encountering race conditions where
new messages arrive at a higher priority when a message of lower priority is still worked upon.
Case 1)
1) I create a priority Queue (with 10 levels) - Q1
2) I create a listener for Q1 and attach it to a consumer.
3) I enqueue a message to the priority Queue with priority 0.
4) At this point, since the subscriber is free/available, the enqueued message is delivered to the subscriber synchronously. QueueContext (Last Seen -> msg1 in P0(acquired state), released entry -> null)
5) Enqueue a message to P4 ~ delivery async as the subscriber is busy. QueueContext(last seen -> msg1 in p0(acquired state), released entry -> msg1 in p4(availabe state)).
6) commit the consumer session and return the handler.
7) msg in p4 dispatched, QueueContext(last seen -> msg1 in p4(acquired state))
8) QueueRunner runs again, and updates the QueueContext(last seen -> msg1 in p0) ~ as msg 1 in P0 is still in acquired state.
9) msg 2 now enqueued to p4Q. QueueContext(last seen -> msg1 in p0, msg2 in p4)
10) now the subscriber cannot make progress, as the comparison for the next message [QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);] always goes to _entries.next(lastSeen), which is at the last priority and thinks it doesn't have anything to process.
Example Sequence for problem described above.
P4 Q -> msg1 -> msg2(arrives after msg1 of p4 is dispatched, but before msg1 of p0 status changes to deleted).
p0 Q -> msg1
In the above scenario, I'm still quite confused on the race in state change between acquired -> deleted.
Case 2)
1) I create a priority Queue (with 10 levels) - Q1
2) I enqueue a message with priority P4.
3) I consume the message synchronously (consumer.receive)., so that the message is now marked deleted.
4) I commit/close the consumer
5) I create a new listener for Q1 and attach it to a consumer.
6) I enqueue a message to the priority Queue with priority 0.
7) At this point, since the subscriber is free/available, the enqueued message is delivered to the subscriber synchronously. QueueContext (Last Seen -> msg1 in P0(acquired state), released entry -> null)
8) Enqueue a message to P4 ~ delivery async as the subscriber is busy. QueueContext(last seen -> msg1 in p0(acquired state), released entry -> msg2 in p4(availabe state)).
9) commit the consumer session and return the handler.
10) now queueRunner, tries to make progress, but the subscriber cannot make progress, as the comparison for the next message [QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);] always goes to _entries.next(lastSeen), which is at the last priority and thinks it doesn't have anything to process.
Example Sequence for problem described above. Here msg1 is consumed synchronously before starting the asynchronous consumer.
P4 Q -> msg1(deleted) -> msg2
p0 Q -> msg1
I hope i'm quite clear in the above notes, either way I've attached a test which totally reproduces the issue that I mention here!