Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.4.3
-
None
-
Unknown
Description
There is a race condition between the SJMS Batch Consumer route start thread and the batch consumption loop thread. When it triggers the batch consumption loop exits early and the SJMS Batch Consumer does not read any JMS messages.
In short:
- The AtomicBoolean running is used as a flag to shut down the batch consumption loop
- The batch consumption loop is submitted to another thread and only after that running is changed to true
- This means sometimes the batch consumption loop sees running as false during startup
The easiest way to reproduce it is to add a sleep into SJMSBatchConsumer$StartConsumerTask#run
final List<AtomicBoolean> triggers = new ArrayList<>(); for (int i = 0; i < consumerCount; i++) { BatchConsumptionLoop loop = new BatchConsumptionLoop(); loop.setKeepAliveDelay(keepAliveDelay); triggers.add(loop.getCompletionTimeoutTrigger()); /* * Note: Batch consumption loop is submitted to another thread here */ jmsConsumerExecutors.submit(loop); } if (completionInterval > 0) { // trigger completion based on interval timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(triggers), completionInterval, completionInterval, TimeUnit.MILLISECONDS); } if (attempt > 1) { LOG.info("Successfully refreshed connection after {} attempts.", attempt); } /* * Note: Add this sleep to reproduce the race condition, simulating * this thread being pre-empted by other work */ Thread.sleep(100); LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize); /* * Note: running is only changed to true here but the batch consumption loop * that reads this values was submitted to another thread earlier */ running.set(true); return;
The batch consumption loop checks the running flag like this:
private void consumeBatchesOnLoop(final Session session, final MessageConsumer consumer) throws JMSException { final boolean usingTimeout = completionTimeout > 0; LOG.trace("BatchConsumptionTask +++ start +++"); while (running.get()) {
Usually there's a second check that would cause everything to loop again - it may see running as false but see isStarting() as true.
}while (running.get() || isStarting());
But with asyncStartListener enabled I think that isStarting() is likely to be false as well.
I believe this issue is causing fairly frequent intermittent test failures in our CI environment (jenkins slaves in kubernetes, linux). But I've been unable to reproduce it on my laptop (windows) without adding the artificial delay on the main thread.
I've been able to get thread dumps from the CI environment showing the executor waiting for a task instead of executing the batch consumption loop
"Camel (camel-8) thread #125 - SjmsBatchConsumer" java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Usually they should look like this:
"Camel (camel-8) thread #123 - SjmsBatchConsumer" java.lang.Thread.State: TIMED_WAITING at java.lang.Object.wait(Native Method) at org.apache.activemq.FifoMessageDispatchChannel.dequeue(FifoMessageDispatchChannel.java:74) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:486) at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:653) at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.consumeBatchesOnLoop(SjmsBatchConsumer.java:429) at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.access$1300(SjmsBatchConsumer.java:383) at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:326) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
I also get tracing logs where the batch consumption tasks starts & ends very quickly.
Line 4377: 2020-09-24 03:16:41.567 DEBUG||| 4604 --- [artStopListener] o.a.c.c.sjms.batch.SjmsBatchConsumer : Attempt #1. Starting 1 consumer(s) for myqueue:300 Line 4415: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer : BatchConsumptionTask +++ start +++ Line 4416: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer : BatchConsumptionTask +++ end +++ Line 4435: 2020-09-24 03:16:41.568 INFO ||| 4604 --- [artStopListener] o.a.c.c.sjms.batch.SjmsBatchConsumer : Started 1 consumer(s) for myqueue:300
Side note: Could the queue name be added to the thread name? The JMS component consumers do that.
Attachments
Attachments
Issue Links
- links to