Details
Description
We are unable to read the message with message selector if we follow the below steps in ActiveMQ 5.11.1
- Start ActiveMQ
- Send Two messages
- Read one message with our selector,Using consumer.recieve() API
- Send One more message, And hold the messageId
- Read the message with the above message Id which we are holding as a selector("JMSMessageID='" + messageId + "'") immediately.
- Now message we wont able to read, consumer thread will be waiting infinitely
Note : Its required activemq-all-5.11.1.jar,Junit jars need to be in classpath.
Issue reproducible prgoram
import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; /** * We are unable to read the message with message selector if we follow the * below steps in ActiveMQ 5.11.1 * <ul> * <li>Start ActiveMQ</li> * <li>Send Two messages</li> * <li>Read one message with our selector,Using consumer.recieve() API</li> * <li>Send One more message, And hold the messageId</li> * <li>Read the message with the above message Id which we are holding as a * selector("JMSMessageID='" + messageId + "'") immediately.</li> * <li>Now message we wont able to read, consumer thread will be waiting * infinitely</li> * </ul> * <b>Note : </b> Its required activemq-all-5.11.1.jar,Junit jars need to be in classpath. * */ public final class MessageSelectorJunit { private static Connection connection = null; private static final String CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"; private static final String URL = "tcp://localhost:61616"; private static final String QUEUE_NAME = "TestQueue"; @BeforeClass public static void oneTimeSetUp() throws Exception { Properties properties = new Properties(); properties.setProperty(Context.INITIAL_CONTEXT_FACTORY, CONTEXT_FACTORY); properties.setProperty(Context.PROVIDER_URL, URL); // Connection Creation Context context = new InitialContext(properties); ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("QueueConnectionFactory"); connection = connectionFactory.createConnection(); connection.start(); } @Test public void testScenario() throws Exception { // Message1 sendJMSMessage(); sendJMSMessage(); getJMSMessage(null); // Message2 String messageId2 = sendJMSMessage(); String messageSelector = "JMSMessageID='" + messageId2 + "'"; Assert.assertNotNull("Expected message to be read with message selector : " + messageSelector, getJMSMessage(messageSelector)); } private String sendJMSMessage() throws Exception { String messageId = null; Session session = null; MessageProducer producer = null; try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(session.createQueue(QUEUE_NAME)); Message msg = session.createTextMessage("Test Message"); producer.send(msg); messageId = msg.getJMSMessageID(); System.out.println("Send MessageId : " + messageId); } finally { producer.close(); session.close(); } return messageId; } private String getJMSMessage(String messageSelector) throws Exception { String recievedId = null; MessageConsumer consumer = null; Session session = null; try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); System.out.println("Started Recieving the message with the message selector : " + messageSelector); consumer = session.createConsumer(queue, messageSelector); connection.start(); Message msg = consumer.receive(1000); if (msg != null) { recievedId = msg.getJMSMessageID(); System.out.println("Message Recieved, Id : " + msg.getJMSMessageID()); } else { System.out.println("Message not recieved."); } } finally { consumer.close(); session.close(); } return recievedId; } @AfterClass public static void oneTimeTearDown() throws Exception { if (connection != null) { connection.close(); } } }