Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-5877

The Queue got hanged with a scenario said in description with version 5.11.1 but this works with version 5.5.1

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 5.11.1
    • Fix Version/s: 5.12.0
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Window 7 Enterprise Edition, 64 bit
      JDK 1.8.0_25, 64 bit
      apache-activemq-5.11.1

    • Regression:
      Unit Test Broken

      Description

      We are unable to read the message with message selector if we follow the below steps in ActiveMQ 5.11.1

      • Start ActiveMQ
      • Send Two messages
      • Read one message with our selector,Using consumer.recieve() API
      • Send One more message, And hold the messageId
      • Read the message with the above message Id which we are holding as a selector("JMSMessageID='" + messageId + "'") immediately.
      • Now message we wont able to read, consumer thread will be waiting infinitely

      Note : Its required activemq-all-5.11.1.jar,Junit jars need to be in classpath.

      Issue reproducible prgoram

      import java.util.Properties;
      
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.Message;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageProducer;
      import javax.jms.Queue;
      import javax.jms.Session;
      import javax.naming.Context;
      import javax.naming.InitialContext;
      
      import org.junit.AfterClass;
      import org.junit.Assert;
      import org.junit.BeforeClass;
      import org.junit.Test;
      
      /**
       * We are unable to read the message with message selector if we follow the
       * below steps in ActiveMQ 5.11.1
       * <ul>
       * <li>Start ActiveMQ</li>
       * <li>Send Two messages</li>
       * <li>Read one message with our selector,Using consumer.recieve() API</li>
       * <li>Send One more message, And hold the messageId</li>
       * <li>Read the message with the above message Id which we are holding as a
       * selector("JMSMessageID='" + messageId + "'") immediately.</li>
       * <li>Now message we wont able to read, consumer thread will be waiting
       * infinitely</li>
       * </ul>
       * <b>Note : </b> Its required activemq-all-5.11.1.jar,Junit jars need to be in classpath.
       *
       */
      public final class MessageSelectorJunit {
      
      	private static Connection connection = null;
      	private static final String CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
      	private static final String URL = "tcp://localhost:61616";
      	private static final String QUEUE_NAME = "TestQueue";
      
      	@BeforeClass
      	public static void oneTimeSetUp() throws Exception {
      		Properties properties = new Properties();
      		properties.setProperty(Context.INITIAL_CONTEXT_FACTORY, CONTEXT_FACTORY);
      		properties.setProperty(Context.PROVIDER_URL, URL);
      
      		// Connection Creation
      		Context context = new InitialContext(properties);
      		ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("QueueConnectionFactory");
      		connection = connectionFactory.createConnection();
      		connection.start();
      	}
      
      	@Test
      	public void testScenario() throws Exception {
      		// Message1
      		sendJMSMessage();
      		sendJMSMessage();
      		getJMSMessage(null);
      
      		// Message2
      		String messageId2 = sendJMSMessage();
      		String messageSelector = "JMSMessageID='" + messageId2 + "'";
      		Assert.assertNotNull("Expected message to be read with message selector : " + messageSelector,
      				getJMSMessage(messageSelector));
      	}
      
      	private String sendJMSMessage() throws Exception {
      		String messageId = null;
      		Session session = null;
      		MessageProducer producer = null;
      		try {
      			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      			producer = session.createProducer(session.createQueue(QUEUE_NAME));
      
      			Message msg = session.createTextMessage("Test Message");
      			producer.send(msg);
      
      			messageId = msg.getJMSMessageID();
      			System.out.println("Send MessageId : " + messageId);
      		} finally {
      			producer.close();
      			session.close();
      		}
      		return messageId;
      	}
      
      	private String getJMSMessage(String messageSelector) throws Exception {
      		String recievedId = null;
      		MessageConsumer consumer = null;
      		Session session = null;
      		try {
      			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      			Queue queue = session.createQueue(QUEUE_NAME);
      			System.out.println("Started Recieving the message with the message selector : " + messageSelector);
      			consumer = session.createConsumer(queue, messageSelector);
      			connection.start();
      			Message msg = consumer.receive(1000);
      			if (msg != null) {
      				recievedId = msg.getJMSMessageID();
      				System.out.println("Message Recieved, Id : " + msg.getJMSMessageID());
      			} else {
      				System.out.println("Message not recieved.");
      			}
      		} finally {
      			consumer.close();
      			session.close();
      		}
      		return recievedId;
      	}
      
      	@AfterClass
      	public static void oneTimeTearDown() throws Exception {
      		if (connection != null) {
      			connection.close();
      		}
      	}
      }
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              sivaphani.kothuri@gmail.com siva phani kumar
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: