Description
Symptom
=======
We have a system based on ActiveMQ that stores messages in a non-peristent queue. Frequently, we have to move a specific message from this queue to another queue. The message to be moved may be anywhere in the queue, and is identified by a selector on a custom JMS integer property.
To facilitate the selection and move, we use org.apache.activemq.broker.region.Queue#moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
We've found that once our queue grows past 10K messages, moving a single message takes over 10s. When the queue grows past 20K messages, a move takes 70s. It's clear from testing that the time to move a message grows exponentially as the queue size increases, to the point that moveMatchingMessagesTo becomes unusable.
Cause
=====
AMQ 5.5.0 has this implementation for moveMatchingMessagesTo:
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int movedCounter = 0; Set<QueueMessageReference> set = new CopyOnWriteArraySet<QueueMessageReference>(); do { doPageIn(true); pagedInMessagesLock.readLock().lock(); try{ set.addAll(pagedInMessages.values()); }finally { pagedInMessagesLock.readLock().unlock(); } List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set); for (QueueMessageReference ref : list) { if (filter.evaluate(context, ref)) { // We should only move messages that can be locked. moveMessageTo(context, ref, dest); set.remove(ref); if (++movedCounter >= maximumMessages && maximumMessages > 0) { return movedCounter; } } } } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); return movedCounter; }
In the context that we use, maximumMessages is Integer.MAXINT:
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception { return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE); }
Since moveMatchingMessagesTo instantiates the set variable as a CopyOnWriteArraySet, each doPageIn loop creates a new array, copies the existing set members, and then linearly scans the array for the insertion. The result is that moveMatchingMessagesTo is an O(n^2) algorithm with respect to message copying, where n is the size of the queue.
Solution
========
set is scoped to a single call of moveMatchingMessagesTo, and is only accessed by a single thread, so there is no benefit to using CopyOnWriteArraySet. Simply changing set to a HashSet prevents the need for the doPageIn loop to copy the set on each iteration, and insertion becomes an O(1) operation.
Attached is a unit test that demonstrates how moving the last message from a queue of 10K messages takes 8s (on our machine). Included is a patch Queue that changes set from a CopyOnWriteArraySet to a HashSet; with this patch, the same unit test completes in under 200ms.
Unit test that can be run against the unpatched AMQ trunk to demonstrate how long moving a message from a queue can take. The same unit test can be used to verify that the patch improves performance of this operation by a considerable amount.