Description
Symptom
=======
We have a system based on ActiveMQ that stores messages in a non-peristent queue. Frequently, we have to move a specific message from this queue to another queue. The message to be moved may be anywhere in the queue, and is identified by a selector on a custom JMS integer property.
To facilitate the selection and move, we use org.apache.activemq.broker.region.Queue#moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
We've found that once our queue grows past 10K messages, moving a single message takes over 10s. When the queue grows past 20K messages, a move takes 70s. It's clear from testing that the time to move a message grows exponentially as the queue size increases, to the point that moveMatchingMessagesTo becomes unusable.
Cause
=====
AMQ 5.5.0 has this implementation for moveMatchingMessagesTo:
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int movedCounter = 0; Set<QueueMessageReference> set = new CopyOnWriteArraySet<QueueMessageReference>(); do { doPageIn(true); pagedInMessagesLock.readLock().lock(); try{ set.addAll(pagedInMessages.values()); }finally { pagedInMessagesLock.readLock().unlock(); } List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set); for (QueueMessageReference ref : list) { if (filter.evaluate(context, ref)) { // We should only move messages that can be locked. moveMessageTo(context, ref, dest); set.remove(ref); if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; } } } } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); return movedCounter; }
In the context that we use, maximumMessages is Integer.MAXINT:
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception { return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE); }
Since moveMatchingMessagesTo instantiates the set variable as a CopyOnWriteArraySet, each doPageIn loop creates a new array, copies the existing set members, and then linearly scans the array for the insertion. The result is that moveMatchingMessagesTo is an O(n^2) algorithm with respect to message copying, where n is the size of the queue.
Solution
========
set is scoped to a single call of moveMatchingMessagesTo, and is only accessed by a single thread, so there is no benefit to using CopyOnWriteArraySet. Simply changing set to a HashSet prevents the need for the doPageIn loop to copy the set on each iteration, and insertion becomes an O(1) operation.
Attached is a unit test that demonstrates how moving the last message from a queue of 10K messages takes 8s (on our machine). Included is a patch Queue that changes set from a CopyOnWriteArraySet to a HashSet; with this patch, the same unit test completes in under 200ms.
Fixed, patch applied along with fixing the other two methods that had this same issue. Test updated to cover all three.