|
|
|
Just a thought but this sounds like the message delivery mode is not persistent. For reliable message delivery in the event of network failures, the message producers need to set deliveryMode to javax.jms.DeliveryMode.PERSISTENT.
Session session = connection.createSession(...);
MessageProducer producer = session.createProducer(...);
producer.setDeliveryMode(javax.jms.DeliveryMode.PERSISTENT);
....
Nop, we didn't set the delivery mode attibute on the producer which means according to the JMS API it is default to DeliveryMode.PERSISTENT.
Every message is successfullly persistented in the local broker on the producer side if the network is down before producer sending activity begins. But if network is down when message is transfering between two brokers, there is always message loss. ok, we need some help to reproduce this, my efforts so far have not succeeded. In my test scenario I use a simple socket software proxy to control the network connection between both brokers and have it drop the connection during the test. I cannot reproduce message loss however.
Can you describe your setup in some detail or submit a test case? One other thing, have you verified this with trunk? This test can reproduce the message loss using ActiveMQ 5.0 and 5.1.
The producer side have its own broker and have a fixed IP address. <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:broker brokerName="producer" useJmx="true" persistent="true"> <amq:transportConnectors> <amq:transportConnector uri="tcp://localhost:61616" /> </amq:transportConnectors> </amq:broker> <!-- Jms ConnectionFactory --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost:61616" /> </bean> <!-- Spring JMS SimpleConverter --> <bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" /> <!-- JMS Queue Template --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory" /> <property name="messageConverter" ref="converter" /> </bean> <!-- Message Converter --> <bean id="converter" class="com.al.SimpleConverter"> <property name="converter"> <ref local="simpleConverter" /> </property> </bean> <!-- Message porducer --> <bean id="sender" class="com.al.DefaultSender"> <property name="template" ref="jmsTemplate" /> <property name="destinationName" value="test-out" /> </bean> </beans> The test app at producer side: TestApp.java package com.al; import org.apache.log4j.Logger; public class TestApp { /** * Logger for this class */ private static final Logger logger = Logger.getLogger(TestApp.class); public static void main(String[] args) { logger.debug("start test..."); //Initializing spring context Context.init(); // uncomment to send messages DefaultSender sender = Context.getBean("sender"); int idx = 1; int count = 3000; while (idx <= count) { sender.sendMessage(SimpleMessageHelper.genSimpleMessage(idx)); logger.debug("send out message : payload is " + idx); idx++; } /* Infinitely hold main thread to keep the spring context running */ Object lock = new Object(); synchronized (lock) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } The test client is very simple which send 3000 messages to the queue "test-out" on the producer side broker. ============================================================================================ The consumer side using dynamic IP address and deployed on another machine. <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:broker brokerName="brokerA" useJmx="true" persistent="true"> <amq:networkConnectors> <amq:networkConnector uri="static://(tcp://【ip-for-producer-machine】:61616)" duplex="true"/> </amq:networkConnectors> <amq:transportConnectors> <amq:transportConnector uri="tcp://localhost:61616" /> </amq:transportConnectors> </amq:broker> <!-- Jms ConnectionFactory --> <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost:61616" /> </bean> <!-- Spring JMS SimpleConverter --> <bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" /> <!-- Message Converter --> <bean id="converter" class="com.al.SimpleConverter"> <property name="converter"> <ref local="simpleConverter" /> </property> </bean> <!-- MDP --> <!-- consumer 1 --> <bean id="listener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="com.al.DefaultListener" /> </constructor-arg> <property name="defaultListenerMethod" value="onMessage" /> <property name="messageConverter" ref="converter" /> </bean> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory" /> <property name="destinationName" value="test-out" /> <property name="messageListener" ref="listener" /> <property name="sessionTransacted" value="true" /> </bean> </beans> Default Listener code: DefaultListener.java package com.al; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class DefaultListener { private static Log logger = LogFactory.getLog(DefaultListener.class); static int i = 1; public void onMessage(SimpleMessage message) { int j = Integer.valueOf(message.getPayLoad()); logger.debug("receive : j = " + j); if (i != j) { logger.debug("warn : i=" + i + ",j=" + j); } i++; } } The default listener print out the message and compare the index in the message with the number of message received (It should match if no message loss occurs). =========================================================================================== And by the way, this also reproduced when we trying to avoid duplex connection by define network connector on both side. This is archived by fixed IP node connecting to a dynamic IP node using a domain name (simulated by using hosts file to provide routing information) great, thanks for the detail. From looking at the consumer, my guess is that you are getting duplicate messages. If you change your test to use a map and verify map.size matches what was produced I think your test will pass. Duplicates are expected in this failure case, any message that was dispatched but not fully acknowledged when the network dies will eventually be redispatched as a duplicate. It is seen as in-flight by the broker.
Unfotunately, the message idx and message receiving count missmatch indicates message loss but not duplication.
The evidence is message count < message idx in the payload which means, some message lost. 2Bryan Shaw
Your code has problem. It works in your case beacuse DefaultMessageListenerContainer by default creates 1 consumer. In common, you have to change static int i = 1; to static AtomicInteger i = new AtomicInteger(1); i++ isn't atomic operation in java, it is translated to i = i + 1. To Dima:
This is not a production code. And we are only meant to use only one Listener instance. This is only a test to reveal the problem of message loss. By the way, thanks for your comment. The problem is that the consumer uses a transacted session but does not commit the transaction. If you examine the consumer broker using the jmx/jconsole, you will see that the queue has all its messages inflight, delivered but not acked. They won't be committed till the transaction associated with the cached consumer session is committed.
Spring jms template does quite a bit of work under the hood, it may may make sense to strip your test application down to pure jms api calls. <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory" /> <property name="destinationName" value="test-out" /> <property name="messageListener" ref="listener" /> <property name="sessionTransacted" value="false" /> </bean> That is, do not use a transacted session. (Or stick with a transacted session but commit the transaction in your listener via session.commit or via the springjms prescribed way.) resolving this as the issue seems to be related to spring jms template usage. Repoen if this is not the case.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
So the duplex network connector are used to connenct from broker B to broker A.
I have tested with a different topology. Two brokers connecting each other using none duplex connection on both side.
The message loss situation does not change.
This means activemq can't provide reliable message delivery when doing store and forward style message transfer. It is huge!
Please resolve this issue ASAP.