Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-3312

Queue's moveMatchingMessagesTo method is extremely slow to the point of being unusable as Queue size increases

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.5.0
    • 5.6.0
    • Broker
    • None
    • Patch Available

    Description

      Symptom
      =======
      We have a system based on ActiveMQ that stores messages in a non-peristent queue. Frequently, we have to move a specific message from this queue to another queue. The message to be moved may be anywhere in the queue, and is identified by a selector on a custom JMS integer property.

      To facilitate the selection and move, we use org.apache.activemq.broker.region.Queue#moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)

      We've found that once our queue grows past 10K messages, moving a single message takes over 10s. When the queue grows past 20K messages, a move takes 70s. It's clear from testing that the time to move a message grows exponentially as the queue size increases, to the point that moveMatchingMessagesTo becomes unusable.

      Cause
      =====
      AMQ 5.5.0 has this implementation for moveMatchingMessagesTo:

      Queue#moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter
          public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
                  ActiveMQDestination dest, int maximumMessages) throws Exception {
              int movedCounter = 0;
              Set<QueueMessageReference> set = new CopyOnWriteArraySet<QueueMessageReference>();
              do {
                  doPageIn(true);
                  pagedInMessagesLock.readLock().lock();
                  try{
                      set.addAll(pagedInMessages.values());
                  }finally {
                      pagedInMessagesLock.readLock().unlock();
                  }
                  List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
                  for (QueueMessageReference ref : list) {
                      if (filter.evaluate(context, ref)) {
                          // We should only move messages that can be locked.
                          moveMessageTo(context, ref, dest);
                          set.remove(ref);
                          if (++movedCounter >= maximumMessages && maximumMessages > 0) {
                              return movedCounter;
                          }
                      }
                  }
              } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
              return movedCounter;
          }
      

      In the context that we use, maximumMessages is Integer.MAXINT:

      moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
          public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
                  throws Exception {
              return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
          }
      

      Since moveMatchingMessagesTo instantiates the set variable as a CopyOnWriteArraySet, each doPageIn loop creates a new array, copies the existing set members, and then linearly scans the array for the insertion. The result is that moveMatchingMessagesTo is an O(n^2) algorithm with respect to message copying, where n is the size of the queue.

      Solution
      ========
      set is scoped to a single call of moveMatchingMessagesTo, and is only accessed by a single thread, so there is no benefit to using CopyOnWriteArraySet. Simply changing set to a HashSet prevents the need for the doPageIn loop to copy the set on each iteration, and insertion becomes an O(1) operation.

      Attached is a unit test that demonstrates how moving the last message from a queue of 10K messages takes 8s (on our machine). Included is a patch Queue that changes set from a CopyOnWriteArraySet to a HashSet; with this patch, the same unit test completes in under 200ms.

      Attachments

        1. AMQ3312.patch
          0.8 kB
          Stirling Chow
        2. LargeQueueSparseDeleteTest.java
          3 kB
          Stirling Chow

        Activity

          People

            tabish Timothy A. Bish
            stirlingc Stirling Chow
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: