Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-15580

SJMS Batch Consumer startup race condition

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.3
    • 3.6.0, 3.4.5
    • camel-sjms
    • None
    • Unknown

    Description

      There is a race condition between the SJMS Batch Consumer route start thread and the batch consumption loop thread.  When it triggers the batch consumption loop exits early and the SJMS Batch Consumer does not read any JMS messages.

      In short:

      • The AtomicBoolean running is used as a flag to shut down the batch consumption loop
      • The batch consumption loop is submitted to another thread and only after that running is changed to true
      • This means sometimes the batch consumption loop sees running as false during startup

      The easiest way to reproduce it is to add a sleep into SJMSBatchConsumer$StartConsumerTask#run

       

      final List<AtomicBoolean> triggers = new ArrayList<>();
      for (int i = 0; i < consumerCount; i++) {
          BatchConsumptionLoop loop = new BatchConsumptionLoop();
          loop.setKeepAliveDelay(keepAliveDelay);
          triggers.add(loop.getCompletionTimeoutTrigger());
          /*
           * Note: Batch consumption loop is submitted to another thread here
           */
          jmsConsumerExecutors.submit(loop);
      }
      
      if (completionInterval > 0) {
          // trigger completion based on interval
          timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(triggers), completionInterval, completionInterval, TimeUnit.MILLISECONDS);
      }
      
      if (attempt > 1) {
          LOG.info("Successfully refreshed connection after {} attempts.", attempt);
      }
      /*
       * Note: Add this sleep to reproduce the race condition, simulating
       * this thread being pre-empted by other work
       */
      Thread.sleep(100);  
      LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize);
      /*
       * Note: running is only changed to true here but the batch consumption loop
       * that reads this values was submitted to another thread earlier
       */
      running.set(true);
      return;
       

       

      The batch consumption loop checks the running flag like this:

                  private void consumeBatchesOnLoop(final Session session, final MessageConsumer consumer) throws JMSException {
                      final boolean usingTimeout = completionTimeout > 0;
      
                      LOG.trace("BatchConsumptionTask +++ start +++");
      
                      while (running.get()) { 

       

      Usually there's a second check that would cause everything to loop again - it may see running as false but see isStarting() as true.

                      }while (running.get() || isStarting()); 

      But with asyncStartListener enabled I think that isStarting() is likely to be false as well.

       

      I believe this issue is causing fairly frequent intermittent test failures in our CI environment (jenkins slaves in kubernetes, linux).  But I've been unable to reproduce it on my laptop (windows) without adding the artificial delay on the main thread.  

      I've been able to get thread dumps from the CI environment showing the executor waiting for a task instead of executing the batch consumption loop

      "Camel (camel-8) thread #125 - SjmsBatchConsumer" 
         java.lang.Thread.State: WAITING
              at sun.misc.Unsafe.park(Native Method)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
              at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
              at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
       

       

      Usually they should look like this:

      "Camel (camel-8) thread #123 - SjmsBatchConsumer" 
         java.lang.Thread.State: TIMED_WAITING
              at java.lang.Object.wait(Native Method)
              at org.apache.activemq.FifoMessageDispatchChannel.dequeue(FifoMessageDispatchChannel.java:74)
              at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:486)
              at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:653)
              at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.consumeBatchesOnLoop(SjmsBatchConsumer.java:429)
              at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.access$1300(SjmsBatchConsumer.java:383)
              at org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:326)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748) 

       

      I also get tracing logs where the batch consumption tasks starts & ends very quickly.

      	Line 4377: 2020-09-24 03:16:41.567 DEBUG||| 4604 --- [artStopListener] o.a.c.c.sjms.batch.SjmsBatchConsumer     : Attempt #1. Starting 1 consumer(s) for myqueue:300
      	Line 4415: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask +++ start +++
      	Line 4416: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask +++ end +++
      	Line 4435: 2020-09-24 03:16:41.568 INFO ||| 4604 --- [artStopListener] o.a.c.c.sjms.batch.SjmsBatchConsumer     : Started 1 consumer(s) for myqueue:300 

       

      Side note: Could the queue name be added to the thread name?  The JMS component consumers do that.

       

       

      Attachments

        1. potentialPatch.txt
          1 kB
          Brad Harvey

        Issue Links

          Activity

            People

              vrlgohel Viral Gohel
              bradhgbst Brad Harvey
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: