ActiveMQ
  1. ActiveMQ
  2. AMQ-2955

Message getting stuck on queue, leading to KahaDB log files not being deleted and disk running out of space

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 5.4.1
    • Fix Version/s: 5.5.0
    • Component/s: Message Store
    • Labels:
      None
    • Environment:

      Red Hat Enterprise Linux 5

      Description

      Using the following test client, we see a single message getting stuck on the queue. This then prevents the KahaDB files from being cleaned up. Once this message gets stuck, we need to restart the broker before it can be consumed. This is a total show stopper for us, as when this occurs in our system the large number of messages that we produce and consume each second causes the disk to run out of space within the space of an hour. We also see the same behaviour using synchronous sending and without failover.

      This doesn't happen every time with the test client - the most reliable way I have found to reproduce it is to start the broker and wait for the first MessageDatabase checkpoint to finish before starting the test client.

      Test Client
      import java.io.BufferedWriter;
      import java.io.FileWriter;
      import java.util.Random;
      
      import javax.jms.Connection;
      import javax.jms.Message;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageListener;
      import javax.jms.MessageProducer;
      import javax.jms.Queue;
      import javax.jms.Session;
      import javax.jms.ConnectionFactory;
      
      import org.apache.activemq.ActiveMQConnectionFactory;
      
      public class Test {
              public static final void main(String[] args) throws Exception {
                      ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?jms.useAsyncSend=true&trackMessages=true");
                      final Connection producerConn = cf.createConnection();
                      final Connection consumerConn = cf.createConnection();
      
                      final BufferedWriter producerLog = new BufferedWriter(new FileWriter("produced.log"));
                      final BufferedWriter consumerLog = new BufferedWriter(new FileWriter("consumed.log"));
      
                      new Thread(new Runnable() {
                              public void run() {
                                      try {
                                              producerConn.start();
                                              Session session = producerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                              Queue queue = session.createQueue("TEST_QUEUE");
                                              MessageProducer producer = session.createProducer(queue);
                                              Random random = new Random();
                                              byte[] messageBytes = new byte[1024];
      
                                              for (int i = 0; i < 100000; i++) {
                                              //while (true) {
                                                      random.nextBytes(messageBytes);
                                                      Message message = session.createObjectMessage(messageBytes);
                                                      producer.send(message);
                                                      producerLog.write(message.getJMSMessageID());
                                                      producerLog.newLine();
                                                      producerLog.flush();
                                              }
                                              System.out.println("Produced 100000 messages...");
                                              producerLog.close();
                                      }
                                      catch (Exception e) {
                                              e.printStackTrace();
                                      }
                              }
                      }).start();
      
                      System.out.println("Started producer...");
      
                      new Thread(new Runnable() {
                              public void run() {
                                      try {
                                              consumerConn.start();
                                              Session session = consumerConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                                              Queue queue = session.createQueue("TEST_QUEUE");
                                              MessageConsumer consumer = session.createConsumer(queue);
                                              consumer.setMessageListener(new MessageListener() {
                                                      public void onMessage(Message message) {
                                                              try {
                                                                      message.acknowledge();
                                                                      consumerLog.write(message.getJMSMessageID());
                                                                      consumerLog.newLine();
                                                                      consumerLog.flush();
                                                              }
                                                              catch (Exception e) {
                                                                      e.printStackTrace();
                                                              }
                                                      }
                                              });
                                      }
                                      catch (Exception e) {
                                              e.printStackTrace();
                                      }
                              }
                      }).start();
      
                      System.out.println("Started consumer...");
              }
      }
      

      After the 100,000 messages have been produced, we can see the following difference in the log files:

      [pblackburn@xxxx test]$ diff produced.log consumed.log
      10394d10393
      < ID:xxxx-35451-1285948546531-0:0:1:1:10394
      [pblackburn@xxxx test]$
      

      Looking in the activemq log file, at around this point we see:

      2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, pagedInMessages.size 349, enqueueSize: 10390
      2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, pagedInMessages.size 350, enqueueSize: 10391
      2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, pagedInMessages.size 351, enqueueSize: 10392
      2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 205, pagedInMessages.size 352, enqueueSize: 10393
      2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change from: 69% of available memory, to: 70% of available memory
      2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change from: 70% of available memory, to: 69% of available memory
      2010-10-01 15:55:51 AbstractStoreCursor [DEBUG] TEST_QUEUE disabling cache on size:0, lastCachedIdSeq: 10398 current node seqId: 10399
      2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change from: 69% of available memory, to: 70% of available memory
      2010-10-01 15:55:51 Queue [DEBUG] TEST_QUEUE toPageIn: 2, Inflight: 353, pagedInMessages.size 353, enqueueSize: 10395
      2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change from: 70% of available memory, to: 69% of available memory
      2010-10-01 15:55:51 Usage [DEBUG] Main:memory:queue://TEST_QUEUE:memory: usage change from: 69% of available memory, to: 70% of available memory
      

      At the end of the log file, where we have a single message stuck on the queue, we see:

      2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size 0, enqueueSize: 100000
      2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size 0, enqueueSize: 100000
      2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size 0, enqueueSize: 100000
      2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size 0, enqueueSize: 100000
      2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size 0, enqueueSize: 100000
      2010-10-01 15:56:10 Queue [DEBUG] TEST_QUEUE toPageIn: 1, Inflight: 0, pagedInMessages.size 0, enqueueSize: 100000
      

      We can see the checkpoint failing to clean up the log files:

      2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint started.
      2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 2 as contained ack(s) refer to referenced file: [1, 2]
      2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 3 as contained ack(s) refer to referenced file: [2, 3]
      2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 4 as contained ack(s) refer to referenced file: [3, 4]
      2010-10-01 15:56:36 MessageDatabase [DEBUG] not removing data file: 5 as contained ack(s) refer to referenced file: [4, 5]
      2010-10-01 15:56:36 MessageDatabase [DEBUG] Checkpoint done.
      

      At this point our consumer had consumed all of the messages except the single stuck message.

      We are using a clean out of the box set up - we have made no changes to the default activemq.xml file,

      1. activemq.xml
        5 kB
        Peter Blackburn
      2. TrackedMessageDequeuer.java
        3 kB
        Peter Blackburn
      3. TrackedMessageEnqueuer.java
        3 kB
        Peter Blackburn

        Activity

        Hide
        Peter Blackburn added a comment -

        With a different test harness, we can get this failure to occur at the same point every time (for 1K message size always happens at the 286th message sent, for 10K message size at the 45th message sent nd for 100K message size at the 5th message sent).

        Further investigation shows that when this occurs, the following conditions hold:

        In class Queue, method doPageIn, the following loop body does not get executed as messages.hasNext() is returning false:

        while (messages.hasNext() && count < toPageIn) {
            MessageReference node = messages.next();
            messages.remove();
        // snipped for brevity
        

        At this point, count=0, toPageIn=1 and messages.size()=1.

        Following the code through to the BTreeNode class, we find that the leaf node contains a single key with value 44. When the BTreeNode.BTreeIterator class is instantiated, it is being passed in a value of 535 for the value of the default cursor position as the batchResetNeeded flag is false. This causes the loop body in the findNextPage method to exit before it sets the nextEntry field, leaving it null.

        If we stick a quick hack into the doPageIn method in class Queue then the problem seems to go away, but we still don't know what the underlying cause was and we are wary of changing code that we don't fully understand:

        if (messages.size() > 0 && !messages.hasNext()) {
            store.resetBatching();
        }
        
        while (messages.hasNext() && count < toPageIn) {
            MessageReference node = messages.next();
            messages.remove();
        // snipped for brevity
        
        Show
        Peter Blackburn added a comment - With a different test harness, we can get this failure to occur at the same point every time (for 1K message size always happens at the 286th message sent, for 10K message size at the 45th message sent nd for 100K message size at the 5th message sent). Further investigation shows that when this occurs, the following conditions hold: In class Queue , method doPageIn , the following loop body does not get executed as messages.hasNext() is returning false: while (messages.hasNext() && count < toPageIn) { MessageReference node = messages.next(); messages.remove(); // snipped for brevity At this point, count=0 , toPageIn=1 and messages.size()=1 . Following the code through to the BTreeNode class, we find that the leaf node contains a single key with value 44. When the BTreeNode.BTreeIterator class is instantiated, it is being passed in a value of 535 for the value of the default cursor position as the batchResetNeeded flag is false. This causes the loop body in the findNextPage method to exit before it sets the nextEntry field, leaving it null. If we stick a quick hack into the doPageIn method in class Queue then the problem seems to go away, but we still don't know what the underlying cause was and we are wary of changing code that we don't fully understand: if (messages.size() > 0 && !messages.hasNext()) { store.resetBatching(); } while (messages.hasNext() && count < toPageIn) { MessageReference node = messages.next(); messages.remove(); // snipped for brevity
        Hide
        Gary Tully added a comment -

        My suspicion is the operation of the setBatch method, disabling the cache will help pinpoint (policy entry useCache=false).

        One thing that looks odd is in org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore#resetBatching,
        that does not obtain the indexLock, it should be comparable to org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore#setBatch in this regard. It looks like this could be related to your problem, if an batch rest was missed from contention.

        btw: What is the difference between your test case above and the other test harness?

        Show
        Gary Tully added a comment - My suspicion is the operation of the setBatch method, disabling the cache will help pinpoint (policy entry useCache=false). One thing that looks odd is in org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore#resetBatching, that does not obtain the indexLock, it should be comparable to org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore#setBatch in this regard. It looks like this could be related to your problem, if an batch rest was missed from contention. btw: What is the difference between your test case above and the other test harness?
        Hide
        Peter Blackburn added a comment - - edited

        Attached test harness.

        Compile with following:

        javac -cp .activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar:jms.jar *.java
        

        When producing the error, we restart the activemq server and wait until we see the MessageDatabase "Checkpoint done" message in the log, then kick off the enquer as follows:

        java -cp .:jms.jar:activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar jms.TrackedMessageEnqueuer 'tcp://localhost:61616' 1 10 0
        

        and then immediately kick off the dequeuer as follows:

        java -cp .:jms.jar:lib/activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar jms.TrackedMessageDequeuer 'tcp://localhost:61616'
        

        We don't always get the error, but when it occurs it is always on the 45th message sent, using the 10K message size as shown.

        Show
        Peter Blackburn added a comment - - edited Attached test harness. Compile with following: javac -cp .activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar:jms.jar *.java When producing the error, we restart the activemq server and wait until we see the MessageDatabase "Checkpoint done" message in the log, then kick off the enquer as follows: java -cp .:jms.jar:activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar jms.TrackedMessageEnqueuer 'tcp://localhost:61616' 1 10 0 and then immediately kick off the dequeuer as follows: java -cp .:jms.jar:lib/activemq-core-5.4.1.jar:geronimo-j2ee-management_1.1_spec-1.0.1.jar:log4j-1.2.15.jar:commons-logging.jar jms.TrackedMessageDequeuer 'tcp://localhost:61616' We don't always get the error, but when it occurs it is always on the 45th message sent, using the 10K message size as shown.
        Hide
        Peter Blackburn added a comment -

        Thanks for that Gary, I added some additional debug - the method KahaDBStore.KahaDMMessageStore#resetBatching is called once as the broker starts up and is not called again during my test run (I removed my hack from the Queue class).

        I then tried the following three variations of this method:

        1. Added call to indexLock.readLock().lock()
        2. Added call to lockAsyncJobQueue and indexLock.readLock().lock()
        3. Added call to indexLock.writeLock().lock()

        The stuck message is still observed with each variation.

        I'll keep digging.

        Show
        Peter Blackburn added a comment - Thanks for that Gary, I added some additional debug - the method KahaDBStore.KahaDMMessageStore#resetBatching is called once as the broker starts up and is not called again during my test run (I removed my hack from the Queue class). I then tried the following three variations of this method: Added call to indexLock.readLock().lock() Added call to lockAsyncJobQueue and indexLock.readLock().lock() Added call to indexLock.writeLock().lock() The stuck message is still observed with each variation. I'll keep digging.
        Hide
        Peter Blackburn added a comment - - edited

        As suggested, I tried adding useCache="false" to the queues' policyEntry element in activemq.xml. After doing this I have been unable to reproduce the issue.

        Show
        Peter Blackburn added a comment - - edited As suggested, I tried adding useCache="false" to the queues' policyEntry element in activemq.xml . After doing this I have been unable to reproduce the issue.
        Hide
        Gary Tully added a comment -

        that is good, in that it points to org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore#setBatch and the cursor and store being out of sync. The intention is that when the cache is full and eventually exhausted, reading from the store can commence from the point in the store that corresponds to the last entry in the cache.
        can you attach your activemq.xml?

        Show
        Gary Tully added a comment - that is good, in that it points to org.apache.activemq.store.kahadb.KahaDBStore.KahaDBMessageStore#setBatch and the cursor and store being out of sync. The intention is that when the cache is full and eventually exhausted, reading from the store can commence from the point in the store that corresponds to the last entry in the cache. can you attach your activemq.xml?
        Hide
        Peter Blackburn added a comment -

        Attached activemq.xml file used.

        Show
        Peter Blackburn added a comment - Attached activemq.xml file used.
        Hide
        Dejan Bosanac added a comment -

        I cannot reproduce this problem in 5.5.0 version (while it is easily reproduced in 5.4.1 and 5.4.2). There was some work in this area for 5.5.0 release, so it seems that this has been fixed as well.

        Can you please retest and reopen the issue if you still experience problems.

        Show
        Dejan Bosanac added a comment - I cannot reproduce this problem in 5.5.0 version (while it is easily reproduced in 5.4.1 and 5.4.2). There was some work in this area for 5.5.0 release, so it seems that this has been fixed as well. Can you please retest and reopen the issue if you still experience problems.

          People

          • Assignee:
            Dejan Bosanac
            Reporter:
            Peter Blackburn
          • Votes:
            3 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development