Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
5.15.8
-
None
-
None
-
ActiveMQ 5.15.8 (conf/activemq.xml attached)
KahaDB
STOMP text messages (xml bodies)
Messages sent to topics and routed to multiple queues by Camel and then consumed from those queues. Typical routing rules look like this:
<route id="routeABC"> <from uri="activemq:topic:someTopic"/> <filter> <xpath> $SomeHeader = 'X' or $SomeHeader = 'Y' </xpath> <to uri="activemq:queue:queueABC?jmsKeyFormatStrategy=passthrough"/> </filter> </route> <route id="routeXYZ"> <from uri="activemq:topic:someTopic"/> <filter> <xpath> $SomeHeader = 'X' </xpath> <to uri="activemq:queue:queueXYZ?jmsKeyFormatStrategy=passthrough"/> </filter> </route>
ActiveMQ 5.15.8 (conf/activemq.xml attached) KahaDB STOMP text messages (xml bodies) Messages sent to topics and routed to multiple queues by Camel and then consumed from those queues. Typical routing rules look like this: <route id="routeABC"> <from uri="activemq:topic:someTopic"/> <filter> <xpath> $SomeHeader = 'X' or $SomeHeader = 'Y' </xpath> <to uri="activemq:queue:queueABC?jmsKeyFormatStrategy=passthrough"/> </filter> </route> <route id="routeXYZ"> <from uri="activemq:topic:someTopic"/> <filter> <xpath> $SomeHeader = 'X' </xpath> <to uri="activemq:queue:queueXYZ?jmsKeyFormatStrategy=passthrough"/> </filter> </route>
Description
I have been trying to figure out a somewhat difficult to reproduce/pinpoint issue where it seems like messages are not being delivered to queue consumers properly. The best clue I have at this point is that we occasionally see exceptions like this and I can get such exceptions to appear relatively easily by running a bit more serious activities through our system.
2019-04-25 14:04:00,419 | DEBUG | Async client internal exception occurred with no exception listener registered: java.util.ConcurrentModificationException | org.apache.activemq.ActiveMQConnection | ActiveMQ VMTransport: vm://localhost#108 java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445) at java.util.HashMap$EntryIterator.next(HashMap.java:1479) at java.util.HashMap$EntryIterator.next(HashMap.java:1477) at java.util.HashMap.putMapEntries(HashMap.java:512) at java.util.HashMap.<init>(HashMap.java:490) at org.apache.activemq.command.Message.copy(Message.java:160) at org.apache.activemq.command.ActiveMQMessage.copy(ActiveMQMessage.java:69) at org.apache.activemq.command.ActiveMQTextMessage.copy(ActiveMQTextMessage.java:58) at org.apache.activemq.command.ActiveMQTextMessage.copy(ActiveMQTextMessage.java:53) at org.apache.activemq.ActiveMQConnection$3.processMessageDispatch(ActiveMQConnection.java:1840) at org.apache.activemq.command.MessageDispatch.visit(MessageDispatch.java:113) at org.apache.activemq.ActiveMQConnection.onCommand(ActiveMQConnection.java:1828) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:275) at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:112) at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)
I found vaguely similar issues AMQ-4092, AMQ-5664, and AMQ-5934. Those along with https://stackoverflow.com/questions/12644272/activemq-message-groups-concurrentmodificationexception also indicate that messages are being dispatched concurrently and that there is a potential workaround with a KahaDB setting concurrentStoreAndDispatchQueues ="false" which according to what I've read has potentially serious throughput implications.
I looked a bit at the ActiveMQ code from git (the activemq-5.15.x branch) and at a glance based on the above stack trace and the way the current code looks it to me looks like maybe the incoming ActiveMQTextMessage instance is handed to N concurrent dispatchers, which then tries to copy the message but that this is suffering from some race condition.
Looking at the Message base class the exception happens on line 160
copy.properties = new HashMap<String, Object>(properties);
- Should it be using ConcurrentHashMap perhaps?
- Is there something in the parent thread that delegates message dispatching to other threads that can end up modifying or maybe clearing the source object and doing so before all dispatchers are at least done copying data? I did not dig deep enough to figure out the details of how this dispatching is done and what degree of synchronization exists there.