Details
Description
I make performance test with activemq. When I set optimizeAcknowledge = true I get a dramatic performance improvement, but when I shut down the producer the consumer does not acknowledge all messages! If I stop the consumer and then I start the consumer a second time the consumer recieves messages again and again not all messages will be acknoledged in the queue.
I am using camel 2.9.0 to produce and consume the messages.
I am using the consumer Template with asyncSendBody.
The following route is configured in the camelContext:
<camel:camelContext id="camelContext"> <camel:template id="producerTemplate"/> <camel:consumerTemplate id="consumerTemplate"/> <camel:route> <camel:from uri="jms:queue0?concurrentConsumers=3&maxConcurrentConsumers=10&asyncConsumer=true"/> <camel:to uri="beanConsumer"/> </camel:route> </camel:camelContext> The config for the ActiveMQComponent: <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="connectionFactory"> <bean class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="optimizeAcknowledge" value="true"/> <property name="dispatchAsync" value="true"/> <property name="sendAcksAsync" value="true"/> <property name="useAsyncSend" value="true"/> <property name="brokerURL" value="nio://138-ham-de:61616"/> <property name="useDedicatedTaskRunner" value="false"/> </bean> </property> </bean> </property> </bean>
I think, the problem is here:
Class ActiveMQMessageConsumer:
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { if (unconsumedMessages.isClosed()) { return; } if (messageExpired) { synchronized (deliveredMessages) { deliveredMessages.remove(md); } stats.getExpiredMessageCount().increment(); ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } else { stats.onMessage(); if (session.getTransacted()) { // Do nothing. } else if (isAutoAcknowledgeEach()) { if (deliveryingAcknowledgements.compareAndSet(false, true)) { synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } } } else { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack!=null) { deliveredMessages.clear(); session.sendAck(ack); } } } } deliveryingAcknowledgements.set(false); } } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { boolean messageUnackedByConsumer = false; synchronized (deliveredMessages) { messageUnackedByConsumer = deliveredMessages.contains(md); } if (messageUnackedByConsumer) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } else { throw new IllegalStateException("Invalid session state."); } } }
What will happen when no producer will send a message to this queue so that no message will pass this method? When will the deliveredMessages been acked?