import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; import javax.jms.*; import java.util.Date; import java.net.URI; import java.io.IOException; /** * FailoverTest */ public class FailoverTest { private static final String s_queueName = "failover_test"; private static final String s_brokerURL = "failover:(tcp://localhost:61616,tcp://localhost:51616)"; private Connection m_connection; private Session m_session; private MessageProducer m_producer; public static void main(String [] args) { try { FailoverTest fot = new FailoverTest(); fot.testForDuplicateMessage(); } catch (Exception e) { e.printStackTrace(); } } public void testForDuplicateMessage() throws Exception { Thread mc1 = messageConsumer("consumer-1"); Thread mc2 = messageConsumer("consumer-2"); produceMessages(); while (true) { Thread.sleep(1000); } } void produceMessages() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, s_brokerURL); m_connection = connectionFactory.createConnection(); m_connection.setExceptionListener(new ExceptionListener() { public void onException(JMSException jmsException) { System.err.println("onException caught JMSException: " + jmsException); } }); m_connection.start(); // Create the m_session m_session = m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = m_session.createQueue(s_queueName); // Create the m_producer. m_producer = m_session.createProducer(destination); m_producer.setDeliveryMode(DeliveryMode.PERSISTENT); MapMessage message = m_session.createMapMessage(); message.setStringProperty("prop_name", "prop_value_1:" + System.currentTimeMillis()); message.setStringProperty("JMSXGroupID", "g1"); m_producer.send(message); message = m_session.createMapMessage(); message.setStringProperty("prop_name", "prop_value_2:" + System.currentTimeMillis()); message.setStringProperty("JMSXGroupID", "g1"); m_producer.send(message); m_connection.close(); } Thread messageConsumer(final String clientID) throws Exception { Thread t = new Thread(new Runnable() { public void run() { Connection connection = null; try { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, s_brokerURL); ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); prefetchPolicy.setQueuePrefetch(1); connectionFactory.setPrefetchPolicy(prefetchPolicy); connection = connectionFactory.createConnection(); connection.setExceptionListener(new ExceptionListener() { public void onException(JMSException jmsException) { System.err.println("onException caught JMSException for [" + clientID + "]: " + jmsException); } }); ActiveMQConnection ac = (ActiveMQConnection)connection; ac.addTransportListener(new TransportListener(){ public void onCommand(Object command) { // System.out.println("[" + clientID + "] onCommand: " + command); } public void onException(IOException error) { System.out.println("[" + clientID + "] onException: " + error); } public void transportInterupted() { System.out.println("[" + clientID + "] transportInterupted"); } public void transportResumed() { System.out.println("[" + clientID + "] transportResumed"); } }); connection.setClientID(clientID); connection.start(); // Create the m_session Session session = connection.createSession(true, Session.SESSION_TRANSACTED); // Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createQueue(s_queueName); // Create the m_consumer. MessageConsumer consumer = session.createConsumer(destination); String lastMessageID = null; while (true) { Message message = consumer.receive(1000); if (message != null) { if (message.getJMSMessageID().equals(lastMessageID)) { System.out.println("[" + clientID + "] " + new Date() + ": Found duplicate message [ID="+message.getJMSMessageID()+"]"); session.commit(); continue; } lastMessageID= message.getJMSMessageID(); MapMessage mm = (MapMessage) message; String value = mm.getStringProperty("prop_name"); String groupID = mm.getStringProperty("JMSXGroupID"); System.out.println("[" + clientID + "] " + new Date() + ": Retrieved Message, Group=" + groupID + ", message [ID="+message.getJMSMessageID()+"], value=" + value); try { Thread.sleep(60000); } catch (Exception e) {} System.out.println("[" + clientID + "] " + new Date() + ": Committing transaction..."); try { session.commit(); } catch (JMSException e) { System.err.println("Exception caught: " + e); } message = null; } } } catch (Exception e) { System.err.println("Exception caught [" + clientID + "]: " + e); } System.out.println("[" + clientID + "] " + new Date() + ": Thread terminating."); try { connection.close(); } catch (JMSException e) { } } }); t.setName(clientID); t.start(); return t; } }