Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-3664

Not all messages will be acknowledged when optimizeAcknowledge is true

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 5.5.1
    • 5.7.0
    • Broker
    • None
    • Windows 7 and Linux Debian with JRE 1.6.24 or JRE 1.6.27

    Description

      I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.

      I am using camel 2.9.0 to produce and consume the messages.
      I am using the consumer Template with asyncSendBody.
      The following route is configured in the camelContext:

          <camel:camelContext id="camelContext">
          	<camel:template id="producerTemplate"/>
          	<camel:consumerTemplate id="consumerTemplate"/>
          	<camel:route>
          		<camel:from uri="jms:queue0?concurrentConsumers=3&amp;maxConcurrentConsumers=10&amp;asyncConsumer=true"/>
          		<camel:to uri="beanConsumer"/>
          	</camel:route>
          </camel:camelContext>
      
      The config for the ActiveMQComponent:
          <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
      		<property name="connectionFactory">		
      			<bean class="org.apache.activemq.pool.PooledConnectionFactory">
         				<property name="connectionFactory">
        					<bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
         						<property name="optimizeAcknowledge" value="true"/>
         						<property name="dispatchAsync" value="true"/>
        						<property name="sendAcksAsync" value="true"/>
        						<property name="useAsyncSend" value="true"/>
      				 		<property name="brokerURL" value="nio://138-ham-de:61616"/>				 		
      				 		<property name="useDedicatedTaskRunner" value="false"/> 
          				</bean>	
            			</property>
            		</bean>
            	</property>
          </bean>
      
      

      I think, the problem is here:
      Class ActiveMQMessageConsumer:

          private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
              if (unconsumedMessages.isClosed()) {
                  return;
              }
              if (messageExpired) {
                  synchronized (deliveredMessages) {
                      deliveredMessages.remove(md);
                  }
                  stats.getExpiredMessageCount().increment();
                  ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
              } else {
                  stats.onMessage();
                  if (session.getTransacted()) {
                      // Do nothing.
                  } else if (isAutoAcknowledgeEach()) {
                      if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                          synchronized (deliveredMessages) {
                              if (!deliveredMessages.isEmpty()) {
                                  if (optimizeAcknowledge) {
                                      ackCounter++;
                                      if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
                                      	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                      	if (ack != null) {
                                  		    deliveredMessages.clear();
                                  		    ackCounter = 0;
                                  		    session.sendAck(ack);
                                  		    optimizeAckTimestamp = System.currentTimeMillis();
                                      	}
                                      }
                                  } else {
                                      MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                      if (ack!=null) {
                                          deliveredMessages.clear();
                                          session.sendAck(ack);
                                      }
                                  }
                              }
                          }
                          deliveryingAcknowledgements.set(false);
                      }
                  } else if (isAutoAcknowledgeBatch()) {
                      ackLater(md, MessageAck.STANDARD_ACK_TYPE);
                  } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                      boolean messageUnackedByConsumer = false;
                      synchronized (deliveredMessages) {
                          messageUnackedByConsumer = deliveredMessages.contains(md);
                      }
                      if (messageUnackedByConsumer) {
                          ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                      }
                  } 
                  else {
                      throw new IllegalStateException("Invalid session state.");
                  }
              }
          }
      
      

      What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked?

      Attachments

        Activity

          People

            tabish Timothy A. Bish
            matw Matthias Wessel
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: