ActiveMQ
  1. ActiveMQ
  2. AMQ-1918

AbstractStoreCursor.size gets out of synch with Store size and blocks consumers

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 5.1.0
    • Fix Version/s: 5.3.0
    • Component/s: Message Store
    • Labels:
      None

      Description

      In version 5.1.0, we are seeing our queue consumers stop consuming for no reason.
      We have a staged queue environment and we occasionally see one queue display negative pending message counts that hang around -x, rise to -x+n gradually and then fall back to -x abruptly. The messages are building up and being processed in bunches but its not easy to see because the counts are negative. We see this behavior in the messages coming out of the system. Outbound messages come out in bunches and are synchronized with the queue pending count dropping to -x.

      This issue does not happen ALL of the time. It happens about once a week and the only way to fix it is to bounce the broker. It doesn't happen to the same queue everytime, so it is not our consuming code.

      Although we don't have a reproducible scenario, we have been able to debug the issue in our test environment.
      We traced the problem to the cached store size in the AbstractStoreCursor.
      This value becomes 0 or negative and prevents the AbstractStoreCursor from retrieving more messages from the store. (see AbstractStoreCursor.fillBatch() )
      We have seen size value go lower than -1000.
      We have also forced it to fix itself by sending in n+1 messages. Once the size goes above zero, the cached value is refreshed and things work ok again.
      Unfortunately, during low volume times, it could be hours before n+1 messages are received, so our message latency can rise during low volume times....

      I have attached our broker config.

      1. testdata.zip
        156 kB
        Nicusor Tanase
      2. testAMQMessageStore.zip
        4.52 MB
        Richard Yarger
      3. NegativeQueueCursorSupport.java
        15 kB
        Richard Yarger
      4. ASF.LICENSE.NOT.GRANTED--activemq.xml
        6 kB
        Richard Yarger

        Activity

        Hide
        Richard Yarger added a comment -

        I have a reproducible scenario for this issue.
        I cannot guarantee that this is how it always happens, but AbstractStoreCursor.size does go negative everytime I follow this procedure.
        I am sorry I could not make it into a nice little test case. I tried to break it down as best I could.

        I wrapped up my eclipse project. It has 2 consumers, and 2 producers.
        producer_flood sends a larger number of messages to test.queue.1 every 3 seconds
        producer_steady sends a small number of messages to test.queue.1 every 3 seconds
        consumer1 consumes messages off test.queue.1 and forwards them to test.queue.2
        consumer2 consumes messages off test.queue.2 and prints text to system.out
        My broker config file is included in the amq_broker_config dir. It sets the memory limits low so that they can be reached faster.
        The queues are setup for 1MB each and the broker is setup for 3MB.

        Here is the script:
        1) Start a clean broker
        2) Start consumer1 and consumer2
        3) Start producer_flood - this will push messages in faster than consumer1 can handle.
        Let the queue size build up to 1000 messages. This should be 100% of the 1MB allowed for that queue.
        4) Stop consumer2 - this will allow messages to build up in queue2.
        If you watch this queue in a jconsole, you will notice that the percent memory used does not rise, even after you pass the memory limit.
        Let the queue2 size grow to around 2000.
        This will put you near the memory limit for the broker.
        5) Start consumer2 - if you look at jconsole now, the percent memory used is updated and > 100%.
        6) Wait for queue2 to fall below 1000 messages and stop the producer. Let the messages drain and one or both of the queues should now have negative counts.
        7) If you start the producer_steady now you'll notice that messages do not reach consumer2 at the rate that they go in.
        If you debug the broker now and look at AbstractStoreCursor.size, it will be negative.

        Please let me know if you need more info.
        Thanks.

        Show
        Richard Yarger added a comment - I have a reproducible scenario for this issue. I cannot guarantee that this is how it always happens, but AbstractStoreCursor.size does go negative everytime I follow this procedure. I am sorry I could not make it into a nice little test case. I tried to break it down as best I could. I wrapped up my eclipse project. It has 2 consumers, and 2 producers. producer_flood sends a larger number of messages to test.queue.1 every 3 seconds producer_steady sends a small number of messages to test.queue.1 every 3 seconds consumer1 consumes messages off test.queue.1 and forwards them to test.queue.2 consumer2 consumes messages off test.queue.2 and prints text to system.out My broker config file is included in the amq_broker_config dir. It sets the memory limits low so that they can be reached faster. The queues are setup for 1MB each and the broker is setup for 3MB. Here is the script: 1) Start a clean broker 2) Start consumer1 and consumer2 3) Start producer_flood - this will push messages in faster than consumer1 can handle. Let the queue size build up to 1000 messages. This should be 100% of the 1MB allowed for that queue. 4) Stop consumer2 - this will allow messages to build up in queue2. If you watch this queue in a jconsole, you will notice that the percent memory used does not rise, even after you pass the memory limit. Let the queue2 size grow to around 2000. This will put you near the memory limit for the broker. 5) Start consumer2 - if you look at jconsole now, the percent memory used is updated and > 100%. 6) Wait for queue2 to fall below 1000 messages and stop the producer. Let the messages drain and one or both of the queues should now have negative counts. 7) If you start the producer_steady now you'll notice that messages do not reach consumer2 at the rate that they go in. If you debug the broker now and look at AbstractStoreCursor.size, it will be negative. Please let me know if you need more info. Thanks.
        Hide
        Richard Yarger added a comment -

        eclipse project with test files

        Show
        Richard Yarger added a comment - eclipse project with test files
        Hide
        Rob Davies added a comment -

        Thanks Richard - I can't reproduce on Trunk - so I'm going to mark it fixed in 5.2 - if its still a problem - reopen and I'll investigate further

        Show
        Rob Davies added a comment - Thanks Richard - I can't reproduce on Trunk - so I'm going to mark it fixed in 5.2 - if its still a problem - reopen and I'll investigate further
        Hide
        Richard Yarger added a comment -

        I tested with apache-activemq-5.2-20080903.231704-53 and was able to reproduce problem.
        Is there much code difference between yesterday's snapshot on trunk?
        I'll try the snapshot tomorrow to be sure.

        Did you happen to try the scenario against 5.1? If so, were you able to reproduce?

        Show
        Richard Yarger added a comment - I tested with apache-activemq-5.2-20080903.231704-53 and was able to reproduce problem. Is there much code difference between yesterday's snapshot on trunk? I'll try the snapshot tomorrow to be sure. Did you happen to try the scenario against 5.1? If so, were you able to reproduce?
        Hide
        Richard Yarger added a comment -

        I tested the same scenario against activemq-5.2-SNAPSHOT-20080904.231544-55.
        I was able to reproduce the error.
        Is it possible the code you test on trunk was different than what is in the snapshots?

        Show
        Richard Yarger added a comment - I tested the same scenario against activemq-5.2-SNAPSHOT-20080904.231544-55. I was able to reproduce the error. Is it possible the code you test on trunk was different than what is in the snapshots?
        Hide
        Rob Davies added a comment -

        This is a timing issue - so its been difficult for me to reproduce on any version. I'll try get a slower box

        Show
        Rob Davies added a comment - This is a timing issue - so its been difficult for me to reproduce on any version. I'll try get a slower box
        Hide
        Nicusor Tanase added a comment -

        Hi,

        You can use the test cases from the attached archive to reproduce the problem.
        jmeterjms_nic-1.0.jar - modified version of the existing JMS sampler from JMeter project
        JMS Request Only.jmx - test plan for sender
        JMS Receive Only.jmx - test plan for receiver.
        You need to use version 2.3.3 of Jmeter and replace ApacheJMeter_jms.jar with the modified one.

        Sampler code is not closing the connections, so you need to restart Jmeter clients between tests.
        I hope the JMS sampler will make testing easier.

        I consistently get the following problem, using the attached test cases (actual figures may vary).
        The broker is configured to use Oracle10 for persistence. I have created a trigger that copies the records deleted from ACTIVEMQ_MSGS into ACTIVEMQ_HISTORY table so I can see what has been produced and consumed at the persistence layer.

        1. Start Jmeter with 3 producer threads each sending 3333 messages
        2. Start Jmeter with a consumer thread.
        3. Let the producers finish (9999 messages sent to the broker)
        4. Wait for consumer to fetch all 9999 messages.
        5. Consumer stops receiving after 9860 messages (as reported by Jmeter Summary Report).
        6. Table ACTIVEMQ_MSGS contains 304 messages.
        7. Table ACTIVEMQ_HISTORY contains 9695 messages.
        8. JMX statistics show that the queue contains 139 messages.
        9. Trying to browse the queue via JConsole does not return any messages.

        Nic

        Show
        Nicusor Tanase added a comment - Hi, You can use the test cases from the attached archive to reproduce the problem. jmeterjms_nic-1.0.jar - modified version of the existing JMS sampler from JMeter project JMS Request Only.jmx - test plan for sender JMS Receive Only.jmx - test plan for receiver. You need to use version 2.3.3 of Jmeter and replace ApacheJMeter_jms.jar with the modified one. Sampler code is not closing the connections, so you need to restart Jmeter clients between tests. I hope the JMS sampler will make testing easier. I consistently get the following problem, using the attached test cases (actual figures may vary). The broker is configured to use Oracle10 for persistence. I have created a trigger that copies the records deleted from ACTIVEMQ_MSGS into ACTIVEMQ_HISTORY table so I can see what has been produced and consumed at the persistence layer. 1. Start Jmeter with 3 producer threads each sending 3333 messages 2. Start Jmeter with a consumer thread. 3. Let the producers finish (9999 messages sent to the broker) 4. Wait for consumer to fetch all 9999 messages. 5. Consumer stops receiving after 9860 messages (as reported by Jmeter Summary Report). 6. Table ACTIVEMQ_MSGS contains 304 messages. 7. Table ACTIVEMQ_HISTORY contains 9695 messages. 8. JMX statistics show that the queue contains 139 messages. 9. Trying to browse the queue via JConsole does not return any messages. Nic
        Hide
        Nicusor Tanase added a comment -

        I found a way to work around this issue, by changing the way messages are loaded from the database.
        I ran tests with several queues, producers and consumers and did not get any undelivered messages anymore.

        DefaultJDBCAdapter.doRecoverNextMessages() recovers the messages with ID higher then the last recovered messages.
        The SQL statement is:

        org.apache.activemq.store.jdbc.Statements.java
        findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
                                                + " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
        

        However, it can happen that messages with lower id are inserted into the DB after messages with higher IDs.
        Such messages do not get recovered from DB.

        I have changed on my local copy the DefaultJDBCAdapter to act retroactive, looking back maxReturned rows for any missed messages.
        Anyway, I am not familiar with ActiveMQ code, so you might want to have a look at the modified DefaultJDBCAdapter.doRecoverNextMessages() bellow:

        org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.java
           public class DefaultJDBCAdapter implements JDBCAdapter {
        
           private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
           -------------------------------------------------------
        
            public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
                                              int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
                PreparedStatement s = null;
                ResultSet rs = null;
                long id = 0;
                List<Long> cleanupIds = new ArrayList<Long>();
                int index = 0;
                try {
                    s = c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
                    s.setMaxRows(maxReturned*2);
                    s.setString(1, destination.getQualifiedName());
                    s.setLong(2, nextSeq - maxReturned);
                    rs = s.executeQuery();
                    int count = 0;
                    if (statements.isUseExternalMessageReferences()) {
                        while (rs.next() && count < maxReturned) {
                        	id = rs.getLong(1);
                        	if ( lastRecoveredMessagesIds.contains(id) ) {
                        		// this message was already recovered
                        		cleanupIds.add(id);
                        		continue;
                        	}                	
                            if (listener.recoverMessageReference(rs.getString(1))) {
                                count++;
                                lastRecoveredMessagesIds.add(id);
                            } else {
                                LOG.debug("Stopped recover next messages");
                            }
                        }
                    } else {
                        while (rs.next() && count < maxReturned) {
                        	id = rs.getLong(1);
                        	if ( lastRecoveredMessagesIds.contains(id) ) {
                        		// this message was already recovered
                        		cleanupIds.add(id);
                        		continue;
                        	}
                            if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
                                count++;
                                lastRecoveredMessagesIds.add(id);
                            } else {
                                LOG.debug("Stopped recover next messages");
                            }
                        }
                    }
                    
                    //not cleanup the list of recovered messages
                    index = 0;
                    Iterator<Long> it = cleanupIds.iterator();
                    while (it.hasNext() && index < count) {
                    	lastRecoveredMessagesIds.remove(it.next());
                    }
                    
                    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    close(rs);
                    close(s);
                }
            }
        
        }
        
        Show
        Nicusor Tanase added a comment - I found a way to work around this issue, by changing the way messages are loaded from the database. I ran tests with several queues, producers and consumers and did not get any undelivered messages anymore. DefaultJDBCAdapter.doRecoverNextMessages() recovers the messages with ID higher then the last recovered messages. The SQL statement is: org.apache.activemq.store.jdbc.Statements.java findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() + " WHERE CONTAINER=? AND ID > ? ORDER BY ID" ; However, it can happen that messages with lower id are inserted into the DB after messages with higher IDs. Such messages do not get recovered from DB. I have changed on my local copy the DefaultJDBCAdapter to act retroactive, looking back maxReturned rows for any missed messages. Anyway, I am not familiar with ActiveMQ code, so you might want to have a look at the modified DefaultJDBCAdapter.doRecoverNextMessages() bellow: org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.java public class DefaultJDBCAdapter implements JDBCAdapter { private Set< Long > lastRecoveredMessagesIds = new TreeSet< Long >(); ------------------------------------------------------- public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { PreparedStatement s = null ; ResultSet rs = null ; long id = 0; List< Long > cleanupIds = new ArrayList< Long >(); int index = 0; try { s = c.getConnection().prepareStatement(statements.getFindNextMessagesStatement()); s.setMaxRows(maxReturned*2); s.setString(1, destination.getQualifiedName()); s.setLong(2, nextSeq - maxReturned); rs = s.executeQuery(); int count = 0; if (statements.isUseExternalMessageReferences()) { while (rs.next() && count < maxReturned) { id = rs.getLong(1); if ( lastRecoveredMessagesIds.contains(id) ) { // this message was already recovered cleanupIds.add(id); continue ; } if (listener.recoverMessageReference(rs.getString(1))) { count++; lastRecoveredMessagesIds.add(id); } else { LOG.debug( "Stopped recover next messages" ); } } } else { while (rs.next() && count < maxReturned) { id = rs.getLong(1); if ( lastRecoveredMessagesIds.contains(id) ) { // this message was already recovered cleanupIds.add(id); continue ; } if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { count++; lastRecoveredMessagesIds.add(id); } else { LOG.debug( "Stopped recover next messages" ); } } } //not cleanup the list of recovered messages index = 0; Iterator< Long > it = cleanupIds.iterator(); while (it.hasNext() && index < count) { lastRecoveredMessagesIds.remove(it.next()); } } catch (Exception e) { e.printStackTrace(); } finally { close(rs); close(s); } } }
        Hide
        Gary Tully added a comment -

        The following fix may be relevant, the root cause was the cursor and store being out of sync like you describe in the description.
        https://issues.apache.org/activemq/browse/AMQ-1984

        Show
        Gary Tully added a comment - The following fix may be relevant, the root cause was the cursor and store being out of sync like you describe in the description. https://issues.apache.org/activemq/browse/AMQ-1984
        Hide
        Rob Davies added a comment -

        Fixed by SVN revision 729803

        Show
        Rob Davies added a comment - Fixed by SVN revision 729803
        Hide
        Richard Yarger added a comment -

        I applied my test scenario to apache-activemq-5.3-20090113.084327-5.
        It was actually worse.
        I still got a negative queue.
        And I was unable to consume from or produce to queue1.
        The queue was left with 131 messages that consumer1 will not consume, even after I stop and restart the consumer.
        I ran my producers and no messages are added to queue1.

        I restarted the broker and the 131 messages were consumed.
        The following error was in the log:
        ERROR Service - Async error occurred: javax.jms.JMSException: Unmatched acknowledege: MessageAck

        {comm andId = 839, responseRequired = false, ackType = 2, consumerId = ID:vibes-richyarger-1501-1231882347001-0:0:2:1, firstMessage Id = null, lastMessageId = ID:vibes-richyarger-3948-1231881217090-0:5200:1:1:1, destination = queue://test.queue.1, transacti onId = TX:ID:vibes-richyarger-1501-1231882347001-0:0:138, messageCount = 1}

        ; Could not find Message-ID ID:vibes-richyarger-39
        48-1231881217090-0:5200:1:1:1 in dispatched-list (end of ack)
        javax.jms.JMSException: Unmatched acknowledege: MessageAck

        {commandId = 839, responseRequired = false, ackType = 2, consumerI d = ID:vibes-richyarger-1501-1231882347001-0:0:2:1, firstMessageId = null, lastMessageId = ID:vibes-richyarger-3948-123188121 7090-0:5200:1:1:1, destination = queue://test.queue.1, transactionId = TX:ID:vibes-richyarger-1501-1231882347001-0:0:138, mes sageCount = 1}

        ; Could not find Message-ID ID:vibes-richyarger-3948-1231881217090-0:5200:1:1:1 in dispatched-list (end of ack)

        at org.apache.activemq.broker.region.PrefetchSubscription.assertAckMatchesDispatched(PrefetchSubscription.java:439)
        at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:192)
        at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:377)
        at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
        at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
        at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:458)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:595)

        Show
        Richard Yarger added a comment - I applied my test scenario to apache-activemq-5.3-20090113.084327-5. It was actually worse. I still got a negative queue. And I was unable to consume from or produce to queue1. The queue was left with 131 messages that consumer1 will not consume, even after I stop and restart the consumer. I ran my producers and no messages are added to queue1. I restarted the broker and the 131 messages were consumed. The following error was in the log: ERROR Service - Async error occurred: javax.jms.JMSException: Unmatched acknowledege: MessageAck {comm andId = 839, responseRequired = false, ackType = 2, consumerId = ID:vibes-richyarger-1501-1231882347001-0:0:2:1, firstMessage Id = null, lastMessageId = ID:vibes-richyarger-3948-1231881217090-0:5200:1:1:1, destination = queue://test.queue.1, transacti onId = TX:ID:vibes-richyarger-1501-1231882347001-0:0:138, messageCount = 1} ; Could not find Message-ID ID:vibes-richyarger-39 48-1231881217090-0:5200:1:1:1 in dispatched-list (end of ack) javax.jms.JMSException: Unmatched acknowledege: MessageAck {commandId = 839, responseRequired = false, ackType = 2, consumerI d = ID:vibes-richyarger-1501-1231882347001-0:0:2:1, firstMessageId = null, lastMessageId = ID:vibes-richyarger-3948-123188121 7090-0:5200:1:1:1, destination = queue://test.queue.1, transactionId = TX:ID:vibes-richyarger-1501-1231882347001-0:0:138, mes sageCount = 1} ; Could not find Message-ID ID:vibes-richyarger-3948-1231881217090-0:5200:1:1:1 in dispatched-list (end of ack) at org.apache.activemq.broker.region.PrefetchSubscription.assertAckMatchesDispatched(PrefetchSubscription.java:439) at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:192) at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:377) at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462) at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85) at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:458) at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) at java.lang.Thread.run(Thread.java:595)
        Hide
        Richard Yarger added a comment -

        I also have the following errors during startup:
        ERROR Service - Async error occurred: javax.jms.JMSException: Transaction 'TX:ID:vibes-richyarger-1501
        -1231882347001-0:0:1' has not been started.
        javax.jms.JMSException: Transaction 'TX:ID:vibes-richyarger-1501-1231882347001-0:0:1' has not been started.
        at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:270)
        at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:190)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
        at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:458)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:595)
        ERROR Service - Async error occurred: javax.jms.JMSException: Could not correlate acknowledgment with
        dispatched message: MessageAck

        {commandId = 25, responseRequired = false, ackType = 3, consumerId = ID:vibes-richyarger-1501- 1231882347001-0:0:1:1, firstMessageId = null, lastMessageId = ID:vibes-richyarger-3948-1231881217090-0:5298:1:1:1, destinatio n = queue://test.queue.1, transactionId = null, messageCount = 1}

        javax.jms.JMSException: Could not correlate acknowledgment with dispatched message: MessageAck

        {commandId = 25, responseRequi red = false, ackType = 3, consumerId = ID:vibes-richyarger-1501-1231882347001-0:0:1:1, firstMessageId = null, lastMessageId = ID:vibes-richyarger-3948-1231881217090-0:5298:1:1:1, destination = queue://test.queue.1, transactionId = null, messageCount = 1}

        at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:330)
        at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:377)
        at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
        at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
        at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:458)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:595)
        INFO WebConsoleStarter - ActiveMQ WebConsole initialized.
        ERROR Service - Async error occurred: javax.jms.JMSException: Unmatched acknowledege: MessageAck

        {comm andId = 49, responseRequired = false, ackType = 2, consumerId = ID:vibes-richyarger-1501-1231882347001-0:0:2:1, firstMessageI d = null, lastMessageId = ID:vibes-richyarger-3948-1231881217090-0:4731:1:1:1, destination = queue://test.queue.1, transactio nId = TX:ID:vibes-richyarger-1501-1231882347001-0:0:7, messageCount = 1}

        ; Could not find Message-ID ID:vibes-richyarger-3948-
        1231881217090-0:4731:1:1:1 in dispatched-list (end of ack)
        javax.jms.JMSException: Unmatched acknowledege: MessageAck

        {commandId = 49, responseRequired = false, ackType = 2, consumerId = ID:vibes-richyarger-1501-1231882347001-0:0:2:1, firstMessageId = null, lastMessageId = ID:vibes-richyarger-3948-1231881217 090-0:4731:1:1:1, destination = queue://test.queue.1, transactionId = TX:ID:vibes-richyarger-1501-1231882347001-0:0:7, messag eCount = 1}

        ; Could not find Message-ID ID:vibes-richyarger-3948-1231881217090-0:4731:1:1:1 in dispatched-list (end of ack)
        at org.apache.activemq.broker.region.PrefetchSubscription.assertAckMatchesDispatched(PrefetchSubscription.java:439)
        at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:192)
        at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:377)
        at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
        at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
        at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:458)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:595)

        Show
        Richard Yarger added a comment - I also have the following errors during startup: ERROR Service - Async error occurred: javax.jms.JMSException: Transaction 'TX:ID:vibes-richyarger-1501 -1231882347001-0:0:1' has not been started. javax.jms.JMSException: Transaction 'TX:ID:vibes-richyarger-1501-1231882347001-0:0:1' has not been started. at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:270) at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:190) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85) at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:458) at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) at java.lang.Thread.run(Thread.java:595) ERROR Service - Async error occurred: javax.jms.JMSException: Could not correlate acknowledgment with dispatched message: MessageAck {commandId = 25, responseRequired = false, ackType = 3, consumerId = ID:vibes-richyarger-1501- 1231882347001-0:0:1:1, firstMessageId = null, lastMessageId = ID:vibes-richyarger-3948-1231881217090-0:5298:1:1:1, destinatio n = queue://test.queue.1, transactionId = null, messageCount = 1} javax.jms.JMSException: Could not correlate acknowledgment with dispatched message: MessageAck {commandId = 25, responseRequi red = false, ackType = 3, consumerId = ID:vibes-richyarger-1501-1231882347001-0:0:1:1, firstMessageId = null, lastMessageId = ID:vibes-richyarger-3948-1231881217090-0:5298:1:1:1, destination = queue://test.queue.1, transactionId = null, messageCount = 1} at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:330) at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:377) at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462) at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85) at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:458) at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) at java.lang.Thread.run(Thread.java:595) INFO WebConsoleStarter - ActiveMQ WebConsole initialized. ERROR Service - Async error occurred: javax.jms.JMSException: Unmatched acknowledege: MessageAck {comm andId = 49, responseRequired = false, ackType = 2, consumerId = ID:vibes-richyarger-1501-1231882347001-0:0:2:1, firstMessageI d = null, lastMessageId = ID:vibes-richyarger-3948-1231881217090-0:4731:1:1:1, destination = queue://test.queue.1, transactio nId = TX:ID:vibes-richyarger-1501-1231882347001-0:0:7, messageCount = 1} ; Could not find Message-ID ID:vibes-richyarger-3948- 1231881217090-0:4731:1:1:1 in dispatched-list (end of ack) javax.jms.JMSException: Unmatched acknowledege: MessageAck {commandId = 49, responseRequired = false, ackType = 2, consumerId = ID:vibes-richyarger-1501-1231882347001-0:0:2:1, firstMessageId = null, lastMessageId = ID:vibes-richyarger-3948-1231881217 090-0:4731:1:1:1, destination = queue://test.queue.1, transactionId = TX:ID:vibes-richyarger-1501-1231882347001-0:0:7, messag eCount = 1} ; Could not find Message-ID ID:vibes-richyarger-3948-1231881217090-0:4731:1:1:1 in dispatched-list (end of ack) at org.apache.activemq.broker.region.PrefetchSubscription.assertAckMatchesDispatched(PrefetchSubscription.java:439) at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:192) at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:377) at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462) at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85) at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:458) at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) at java.lang.Thread.run(Thread.java:595)
        Hide
        Richard Yarger added a comment -

        I have created a unit test that can reproduce the issue.
        It takes around 5 min to complete.

        I modeled the test off of the CursorSupport test case.
        I just added a second queue and more specific memory settings.
        I also included tests with different prefetch values.
        Lowering prefetch seems to have a direct impact on the issue.

        testWithDefaultPrefetch() and testWithDefaultPrefetchFiveConsumers()
        are usually the ones to fail.

        I am reproducing the issue quite easily with this test case.
        So let me know if you cannot.
        Thanks.

        Show
        Richard Yarger added a comment - I have created a unit test that can reproduce the issue. It takes around 5 min to complete. I modeled the test off of the CursorSupport test case. I just added a second queue and more specific memory settings. I also included tests with different prefetch values. Lowering prefetch seems to have a direct impact on the issue. testWithDefaultPrefetch() and testWithDefaultPrefetchFiveConsumers() are usually the ones to fail. I am reproducing the issue quite easily with this test case. So let me know if you cannot. Thanks.
        Hide
        Richard Yarger added a comment -

        I also noticed another ticket that is most likely related to this issue: AMQ-1940.

        Show
        Richard Yarger added a comment - I also noticed another ticket that is most likely related to this issue: AMQ-1940 .
        Hide
        Norbert Pfistner added a comment -

        Maybe also our problem concerning hanging persistent JDBC Messages is related to this. Please see https://issues.apache.org/activemq/browse/AMQ-2184 for details.
        We have to restart our Broker in our productive environment at least once every 24 h.

        Show
        Norbert Pfistner added a comment - Maybe also our problem concerning hanging persistent JDBC Messages is related to this. Please see https://issues.apache.org/activemq/browse/AMQ-2184 for details. We have to restart our Broker in our productive environment at least once every 24 h.
        Hide
        Richard Yarger added a comment -

        I should have mentioned that this test case has been written against tag 5.1.0.
        I have been debugging the code and I have some concerns about whether the unit test is producing the same issue that we are seeing in our production environment.

        While the unit test decrements the pending message count extra times and it goes negative, it does not cause a cursor size to go negative.

        The unit test is producing a negative queue because duplicate messages are being dispatched to the consumer from the RecoveryDispatch.
        PagedInMessages are cached in a RecoveryDispatch when a new subscriber is added.
        I was able to resolve the issue by adding a lock check to the Queue.iterate() method.
        Which I thought was great until I saw that the RecoveryDispatch had been removed in later versions.

        I am going to run the unit test in trunk.

        Show
        Richard Yarger added a comment - I should have mentioned that this test case has been written against tag 5.1.0. I have been debugging the code and I have some concerns about whether the unit test is producing the same issue that we are seeing in our production environment. While the unit test decrements the pending message count extra times and it goes negative, it does not cause a cursor size to go negative. The unit test is producing a negative queue because duplicate messages are being dispatched to the consumer from the RecoveryDispatch. PagedInMessages are cached in a RecoveryDispatch when a new subscriber is added. I was able to resolve the issue by adding a lock check to the Queue.iterate() method. Which I thought was great until I saw that the RecoveryDispatch had been removed in later versions. I am going to run the unit test in trunk.
        Hide
        Rob Davies added a comment -

        Added test case in SVN revision 778622.
        Looks fixed with trunk on 26th May 2009

        Show
        Rob Davies added a comment - Added test case in SVN revision 778622. Looks fixed with trunk on 26th May 2009

          People

          • Assignee:
            Rob Davies
            Reporter:
            Richard Yarger
          • Votes:
            3 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development