Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-6851

Messages using Message Groups can arrive out of order when using CachedMessageGroupMap

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Feedback Received
    • Affects Version/s: 5.12.0, 5.15.2
    • Fix Version/s: None
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Linux, CentOS 7

      openjdk version "1.8.0_151"
      OpenJDK Runtime Environment (build 1.8.0_151-b12)
      OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)

      Description

      The default broker behavior for message groups uses a CachedMessageGroupMap with a least-recently-used cache with a capacity of 1024. When more that 1024 group IDs are used messages can be consumed out-of-order.

      Scenario.
      Configure two consumers for a queue.
      Send a message with group ID '0' that requires a long time to consume.
      Send 1024 additional messages with group IDs '1' through '1024' that require a short time to consume.
      Send a message of group ID '0' that requires a short time to consume.

      Expected:
      The second message in group '0' is consumed after the first message in group '0'

      Actual:
      The second message in group '0' is consumed before the first message in group '0' has finished.

      The LRU cache is evicting the group to consumer mapping for group '0' before the second message arrives, allowing the second message in group '0' to be processed by a different consumer than the first message.
      Using the MessageGroupHashBucket or the SimpleMessageGroupMap results in the expected behavior.

      package com.example.outoforderjms;
      
      import java.io.Serializable;
      import java.time.Instant;
      import java.time.ZoneId;
      import java.time.format.DateTimeFormatter;
      import java.util.Locale;
      import javax.jms.ConnectionFactory;
      import org.apache.activemq.ActiveMQConnectionFactory;
      import org.apache.activemq.pool.PooledConnectionFactory;
      import org.springframework.context.annotation.AnnotationConfigApplicationContext;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.jms.annotation.EnableJms;
      import org.springframework.jms.annotation.JmsListener;
      import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
      import org.springframework.jms.core.JmsTemplate;
      import org.springframework.jms.core.MessagePostProcessor;
      
      @EnableJms
      @Configuration
      public class OutOfOrderJms {
      
        private static final int MODULUS = 1025;
        private static final int COUNT = MODULUS + 1;
        private static final String QUEUE_NAME = "MessageGroupTest";
      
        public static void main(String[] args) {
          AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
      
          ctx.register(OutOfOrderJms.class);
          ctx.refresh();
      
          JmsTemplate template = new JmsTemplate();
          template.setConnectionFactory(CONNECTION_FACTORY);
          for (int i = 0; i < COUNT; i++) {
            SomeMessage someMessage = new SomeMessage(i, Integer.toString(i % MODULUS));
            if (someMessage.getGroup().equals("0")) {
              System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " producing message " + someMessage);
            }
            template.convertAndSend(QUEUE_NAME, someMessage, getMessageGroupPostProcessor(someMessage));
          }
        }
      
        private static String getTimeStamp() {
          DateTimeFormatter formatter =
              DateTimeFormatter.ofPattern("hh:mm:ss:SSSS")
                  .withLocale(Locale.US)
                  .withZone(ZoneId.systemDefault());
          return formatter.format(Instant.now());
        }
      
        private static MessagePostProcessor getMessageGroupPostProcessor(Serializable object) {
          return message -> {
            SomeMessage m = ((SomeMessage) object);
            message.setStringProperty(
                "JMSXGroupID", m.getGroup());
            return message;
          };
        }
      
        @JmsListener(destination = QUEUE_NAME, containerFactory = "containerFactory")
        private void process(SomeMessage someMessage) throws InterruptedException {
        //  Simulate long-processing message for first message produced.
        if (someMessage.getMessage() == 0) {
            for (int i = 10; i > 0; i--) {
              Thread.sleep(1000);
              System.out.println(i + " ");
            }
          }
          if (someMessage.getGroup().equals("0") || someMessage.getGroup().equals("1")) {
            System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " consuming message " + someMessage);
          }
        }
      
        @Bean
        public DefaultJmsListenerContainerFactory containerFactory() {
          DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
          factory.setConnectionFactory(CONNECTION_FACTORY);
          factory.setConcurrency("1-2");
          return factory;
        }
      
        private static ConnectionFactory CONNECTION_FACTORY = new PooledConnectionFactory(
            new ActiveMQConnectionFactory(
                "admin",
                "admin",
                "failover:tcp://localhost:61616")
        );
      
        private static class SomeMessage implements Serializable {
          private final int message;
          private final String group;
      
          private SomeMessage(int message, String group) {
            this.message = message;
            this.group = group;
          }
      
      
          int getMessage() {
            return message;
          }
      
          String getGroup() {
            return group;
          }
      
          @Override
          public String toString() {
            return "SomeMessage{" +
                "message=" + message +
                ", group='" + group + '\'' +
                '}';
          }
        }
      }
      
      

      Output shows message 1025 finishing before message 0

      03:11:15:1730 main producing message SomeMessage{message=0, group='0'}
      03:11:15:2220 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1, group='1'}
      10 
      9 
      8 
      03:11:18:9530 main producing message SomeMessage{message=1025, group='0'}
      03:11:18:9540 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1025, group='0'}
      7 
      6 
      5 
      4 
      3 
      2 
      1 
      03:11:25:2130 DefaultMessageListenerContainer-1 consuming message SomeMessage{message=0, group='0'}
      

        Issue Links

          Activity

          Hide
          gtully Gary Tully added a comment -

          makes sense, I pushed some of your observations to the documentation, see the last paragraph: https://cwiki.apache.org/confluence/display/ACTIVEMQ/Message+Groups

          Show
          gtully Gary Tully added a comment - makes sense, I pushed some of your observations to the documentation, see the last paragraph: https://cwiki.apache.org/confluence/display/ACTIVEMQ/Message+Groups
          Hide
          jlmont Joshua Montgomery added a comment -

          The default behavior is not what I expected from the documentation.
          http://activemq.apache.org/message-groups.html
          I expected that if I used a group ID for messages, all messages in that group were guaranteed to be processed in order.
          My use case was keeping messages ordered for each of thousands of groups. When running the code, instead of getting guaranteed order, I got messages being processed out-of-order.
          I can see from the implementations of MessageGroupHashBucket and SimpleMessageGroupMap that message groups work by associating each group with a consumer.
          It looks like SimpleMessageGroupMap keeps track of every group but suffers from unbounded memory use.
          MessageGroupHashBucked keeps track of every group and has bounded memory use.
          CachedMessageGroupMap has bounded memory use, but only keeps track of up to 1024 (or the maximum configured size) groups, then loses track of any groups older than the newest 1024. That becomes a serious problem when there are messages for 2000 groups in the queue and each group has several messages waiting.

          I noticed that https://issues.apache.org/jira/browse/AMQ-3565 mentions the idea of closing groups for which there are no messages in the queue, and the CachedMessageGroupMap is mentioned as doing this. However, the LRU map evicts old groups but does not wait until there are no queued messages associated with the group. I consider this incorrect behavior because queued messages will not be kept in order. I would consider it correct behavior if the LRU map only evicted groups for which no messages were queued, but retained all groups for which there were still messages queued.

          I also found that https://issues.apache.org/jira/browse/AMQ-5942 mentions the same issue and the comments say the documentation should be changed. If the decision is made to retain the current behavior, then certainly the documentation should make it clear that the guaranteed ordering of messages is limited in the default configuration to 1024 groups (or the configured maximum), and no guarantees apply beyond that.

          Thank you for your quick reply. I appreciate that there are competing demands of memory consumption, load balancing, complexity, etc.

          Show
          jlmont Joshua Montgomery added a comment - The default behavior is not what I expected from the documentation. http://activemq.apache.org/message-groups.html I expected that if I used a group ID for messages, all messages in that group were guaranteed to be processed in order. My use case was keeping messages ordered for each of thousands of groups. When running the code, instead of getting guaranteed order, I got messages being processed out-of-order. I can see from the implementations of MessageGroupHashBucket and SimpleMessageGroupMap that message groups work by associating each group with a consumer. It looks like SimpleMessageGroupMap keeps track of every group but suffers from unbounded memory use. MessageGroupHashBucked keeps track of every group and has bounded memory use. CachedMessageGroupMap has bounded memory use, but only keeps track of up to 1024 (or the maximum configured size) groups, then loses track of any groups older than the newest 1024. That becomes a serious problem when there are messages for 2000 groups in the queue and each group has several messages waiting. I noticed that https://issues.apache.org/jira/browse/AMQ-3565 mentions the idea of closing groups for which there are no messages in the queue, and the CachedMessageGroupMap is mentioned as doing this. However, the LRU map evicts old groups but does not wait until there are no queued messages associated with the group. I consider this incorrect behavior because queued messages will not be kept in order. I would consider it correct behavior if the LRU map only evicted groups for which no messages were queued, but retained all groups for which there were still messages queued. I also found that https://issues.apache.org/jira/browse/AMQ-5942 mentions the same issue and the comments say the documentation should be changed. If the decision is made to retain the current behavior, then certainly the documentation should make it clear that the guaranteed ordering of messages is limited in the default configuration to 1024 groups (or the configured maximum), and no guarantees apply beyond that. Thank you for your quick reply. I appreciate that there are competing demands of memory consumption, load balancing, complexity, etc.
          Hide
          gtully Gary Tully added a comment -

          you can also increase the cache size - org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory#setCacheSize

          are you requesting a different default behaviour?

          Show
          gtully Gary Tully added a comment - you can also increase the cache size - org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory#setCacheSize are you requesting a different default behaviour?

            People

            • Assignee:
              Unassigned
              Reporter:
              jlmont Joshua Montgomery
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development