Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-3312

Queue's moveMatchingMessagesTo method is extremely slow to the point of being unusable as Queue size increases

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.5.0
    • 5.6.0
    • Broker
    • None
    • Patch Available

    Description

      Symptom
      =======
      We have a system based on ActiveMQ that stores messages in a non-peristent queue. Frequently, we have to move a specific message from this queue to another queue. The message to be moved may be anywhere in the queue, and is identified by a selector on a custom JMS integer property.

      To facilitate the selection and move, we use org.apache.activemq.broker.region.Queue#moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)

      We've found that once our queue grows past 10K messages, moving a single message takes over 10s. When the queue grows past 20K messages, a move takes 70s. It's clear from testing that the time to move a message grows exponentially as the queue size increases, to the point that moveMatchingMessagesTo becomes unusable.

      Cause
      =====
      AMQ 5.5.0 has this implementation for moveMatchingMessagesTo:

      Queue#moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter
          public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
                  ActiveMQDestination dest, int maximumMessages) throws Exception {
              int movedCounter = 0;
              Set<QueueMessageReference> set = new CopyOnWriteArraySet<QueueMessageReference>();
              do {
                  doPageIn(true);
                  pagedInMessagesLock.readLock().lock();
                  try{
                      set.addAll(pagedInMessages.values());
                  }finally {
                      pagedInMessagesLock.readLock().unlock();
                  }
                  List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
                  for (QueueMessageReference ref : list) {
                      if (filter.evaluate(context, ref)) {
                          // We should only move messages that can be locked.
                          moveMessageTo(context, ref, dest);
                          set.remove(ref);
                          if (++movedCounter >= maximumMessages && maximumMessages > 0) {
                              return movedCounter;
                          }
                      }
                  }
              } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
              return movedCounter;
          }
      

      In the context that we use, maximumMessages is Integer.MAXINT:

      moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
          public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
                  throws Exception {
              return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
          }
      

      Since moveMatchingMessagesTo instantiates the set variable as a CopyOnWriteArraySet, each doPageIn loop creates a new array, copies the existing set members, and then linearly scans the array for the insertion. The result is that moveMatchingMessagesTo is an O(n^2) algorithm with respect to message copying, where n is the size of the queue.

      Solution
      ========
      set is scoped to a single call of moveMatchingMessagesTo, and is only accessed by a single thread, so there is no benefit to using CopyOnWriteArraySet. Simply changing set to a HashSet prevents the need for the doPageIn loop to copy the set on each iteration, and insertion becomes an O(1) operation.

      Attached is a unit test that demonstrates how moving the last message from a queue of 10K messages takes 8s (on our machine). Included is a patch Queue that changes set from a CopyOnWriteArraySet to a HashSet; with this patch, the same unit test completes in under 200ms.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            tabish Timothy A. Bish
            stirlingc Stirling Chow
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment