Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Feedback Received
-
5.12.0, 5.15.2
-
None
-
None
-
Linux, CentOS 7
openjdk version "1.8.0_151"
OpenJDK Runtime Environment (build 1.8.0_151-b12)
OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
Description
The default broker behavior for message groups uses a CachedMessageGroupMap with a least-recently-used cache with a capacity of 1024. When more that 1024 group IDs are used messages can be consumed out-of-order.
Scenario.
Configure two consumers for a queue.
Send a message with group ID '0' that requires a long time to consume.
Send 1024 additional messages with group IDs '1' through '1024' that require a short time to consume.
Send a message of group ID '0' that requires a short time to consume.
Expected:
The second message in group '0' is consumed after the first message in group '0'
Actual:
The second message in group '0' is consumed before the first message in group '0' has finished.
The LRU cache is evicting the group to consumer mapping for group '0' before the second message arrives, allowing the second message in group '0' to be processed by a different consumer than the first message.
Using the MessageGroupHashBucket or the SimpleMessageGroupMap results in the expected behavior.
package com.example.outoforderjms; import java.io.Serializable; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Locale; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.pool.PooledConnectionFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessagePostProcessor; @EnableJms @Configuration public class OutOfOrderJms { private static final int MODULUS = 1025; private static final int COUNT = MODULUS + 1; private static final String QUEUE_NAME = "MessageGroupTest"; public static void main(String[] args) { AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); ctx.register(OutOfOrderJms.class); ctx.refresh(); JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(CONNECTION_FACTORY); for (int i = 0; i < COUNT; i++) { SomeMessage someMessage = new SomeMessage(i, Integer.toString(i % MODULUS)); if (someMessage.getGroup().equals("0")) { System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " producing message " + someMessage); } template.convertAndSend(QUEUE_NAME, someMessage, getMessageGroupPostProcessor(someMessage)); } } private static String getTimeStamp() { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("hh:mm:ss:SSSS") .withLocale(Locale.US) .withZone(ZoneId.systemDefault()); return formatter.format(Instant.now()); } private static MessagePostProcessor getMessageGroupPostProcessor(Serializable object) { return message -> { SomeMessage m = ((SomeMessage) object); message.setStringProperty( "JMSXGroupID", m.getGroup()); return message; }; } @JmsListener(destination = QUEUE_NAME, containerFactory = "containerFactory") private void process(SomeMessage someMessage) throws InterruptedException { // Simulate long-processing message for first message produced. if (someMessage.getMessage() == 0) { for (int i = 10; i > 0; i--) { Thread.sleep(1000); System.out.println(i + " "); } } if (someMessage.getGroup().equals("0") || someMessage.getGroup().equals("1")) { System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " consuming message " + someMessage); } } @Bean public DefaultJmsListenerContainerFactory containerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(CONNECTION_FACTORY); factory.setConcurrency("1-2"); return factory; } private static ConnectionFactory CONNECTION_FACTORY = new PooledConnectionFactory( new ActiveMQConnectionFactory( "admin", "admin", "failover:tcp://localhost:61616") ); private static class SomeMessage implements Serializable { private final int message; private final String group; private SomeMessage(int message, String group) { this.message = message; this.group = group; } int getMessage() { return message; } String getGroup() { return group; } @Override public String toString() { return "SomeMessage{" + "message=" + message + ", group='" + group + '\'' + '}'; } } }
Output shows message 1025 finishing before message 0
03:11:15:1730 main producing message SomeMessage{message=0, group='0'} 03:11:15:2220 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1, group='1'} 10 9 8 03:11:18:9530 main producing message SomeMessage{message=1025, group='0'} 03:11:18:9540 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1025, group='0'} 7 6 5 4 3 2 1 03:11:25:2130 DefaultMessageListenerContainer-1 consuming message SomeMessage{message=0, group='0'}