ActiveMQ
  1. ActiveMQ
  2. AMQ-2745

Deadlock or Performance Bottleneck when reading messages with Correlation

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: 5.3.1, 5.3.2, 5.4.0, 5.4.1
    • Fix Version/s: NEEDS_REVIEW
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Java 64-bit, Windows 2008 Server, Centos 5 64bit, Ubuntu 110 64-bit, OS X 10.5, 10.6

      Description

      We have a situation where we are posting messages to a queue with two different correlation ids specifically intended to reach two different clients who subscribe with message selectors for the appropriate correlation. The clients are reading with message listeners. When one client stops reading the expected behavior, and the behavior we saw on 5.3.0, is that the messages with the correlation for the stopped client will backup on the queue and will not effect the performance of the second client who is still reading the messages with the other correlation. With our memory config messages can backup into the hundreds of thousands before noticing any performance impact on the active client.

      However this is not the case in 5.3.1 or 5.3.2. With 5.3.1 once enough messages backup for the stopped client, suddenly the active client's performance drops drastically 20 ms reads to 30,000ms reads. We will see this within a few hundred messages. I believe there is some kind of deadlock, or buffering bottleneck that was introduced on the client side.

      1. activemq.xml
        5 kB
        Brad Willard
      2. activemq.xml
        5 kB
        Brad Willard
      3. PutMessages.java
        2 kB
        Brad Willard
      4. ReadMessages.java
        2 kB
        Brad Willard

        Issue Links

          Activity

          Hide
          Gary Tully added a comment -

          it would be great if you could provide a tests case for this so we can see your configuration and setup. Of interest is the maxPageSize for the queue and the distribution of messages across the two selectors.

          Show
          Gary Tully added a comment - it would be great if you could provide a tests case for this so we can see your configuration and setup. Of interest is the maxPageSize for the queue and the distribution of messages across the two selectors.
          Hide
          Brad Willard added a comment -

          I have created two classes to show this problem: test.PutMessages and test.ReadMessages that show the problem. Steps to reproduce

          1) Start a 5.3.0 broker

          2) Start two messages reader for two different correlations on the same queue
          java -cp <yourclasspath> test.ReadMessages tcp://localhost:61616 TestQueue ForReader1
          java -cp <yourclasspath> test.ReadMessages tcp://localhost:61616 TestQueue ForReader2

          3) Start two messages producers for the two different correlations
          java -cp <yourclasspath> tesPutMessages tcp://localhost:61616 TestQueue ForReader1
          java -cp <yourclasspath> test.PutMessages tcp://localhost:61616 TestQueue ForReader2

          4) Looking at the output of the readers you started on step two, you will both read the messages for the correlation with the time on the broker about 1ms.

          5) Stop the reader ForReader1, you will notice that the program ForReader2 is uneffected. Messages with corrlations "ForReader1" backup on the queue, and the program ForReader2 continues reading normally.

          6) stop all classes, and stop 5.3.0 broker. Start a 5.3.2 broker.

          7) Repeat steps 1-5. Except you'll notice that once you stop ForReader1, ForReader2 is effected which is shouldn't be. ForReader2 will basically stop being able to read messages until you start ForReader1 again. ForReader2 will occasionally get messages, but incredibly slowly and performance is ruined.

          package test;

          import java.net.*;
          import javax.jms.*;
          import org.apache.activemq.ActiveMQConnectionFactory;

          /**
          *

          • @author bwillard
            */
            public class PutMessages extends Thread {

          final private MessageProducer producer;
          final private String correlationID;
          final private Session session;

          public PutMessages(URI uri, String queueName, String correlationID) throws Exception

          { this.correlationID = correlationID; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); Connection connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); }

          public void run() {

          ObjectMessage message;
          String text;
          long counter = 0;

          while (true) {

          try

          { this.sleep(5); counter++; message = session.createObjectMessage(); message.setJMSCorrelationID(correlationID); text = "Message " + counter + " for consumer " + correlationID; message.setObject(text); producer.send(message); }

          catch (Exception exc)

          { System.err.println("Error sending message"); exc.printStackTrace(System.err); }

          }
          }

          public static void main(String[] args) {

          try

          { URI uri = URI.create(args[0]); String queueName = args[1]; String correlationID = args[2]; new PutMessages(uri, queueName, correlationID).start(); }

          catch (Exception exc)

          { exc.printStackTrace(); }

          }
          }







          package test;

          import java.net.*;
          import javax.jms.*;
          import org.apache.activemq.ActiveMQConnectionFactory;

          /**
          *
          * @author bwillard
          */
          public class ReadMessages implements MessageListener {

          final private MessageConsumer consumer;
          final private String correlationID;
          final private Session session;

          public ReadMessages(URI uri, String queueName, String correlationID) throws Exception { this.correlationID = correlationID; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); Connection connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); consumer = session.createConsumer(queue, "JMSCorrelationID='" + correlationID + "'"); consumer.setMessageListener(this); }

          public void onMessage(Message msg) {
          long inTime, outTime, brokerTime;

          try {

          if (msg instanceof ObjectMessage) { ObjectMessage txt = (ObjectMessage) msg; inTime = txt.getLongProperty("JMSActiveMQBrokerInTime"); outTime = txt.getLongProperty("JMSActiveMQBrokerOutTime"); brokerTime = outTime - inTime; System.out.println("Message waited " + brokerTime + "ms : " + txt.getObject().toString()); }

          } catch (Exception exc) { System.err.println("Error reading message"); exc.printStackTrace(); }
          }

          public static void main(String[] args) {

          try { URI uri = URI.create(args[0]); String queueName = args[1]; String correlationID = args[2]; new ReadMessages(uri, queueName, correlationID); } catch (Exception exc) { exc.printStackTrace(); }

          }
          }

          Show
          Brad Willard added a comment - I have created two classes to show this problem: test.PutMessages and test.ReadMessages that show the problem. Steps to reproduce 1) Start a 5.3.0 broker 2) Start two messages reader for two different correlations on the same queue java -cp <yourclasspath> test.ReadMessages tcp://localhost:61616 TestQueue ForReader1 java -cp <yourclasspath> test.ReadMessages tcp://localhost:61616 TestQueue ForReader2 3) Start two messages producers for the two different correlations java -cp <yourclasspath> tesPutMessages tcp://localhost:61616 TestQueue ForReader1 java -cp <yourclasspath> test.PutMessages tcp://localhost:61616 TestQueue ForReader2 4) Looking at the output of the readers you started on step two, you will both read the messages for the correlation with the time on the broker about 1ms. 5) Stop the reader ForReader1, you will notice that the program ForReader2 is uneffected. Messages with corrlations "ForReader1" backup on the queue, and the program ForReader2 continues reading normally. 6) stop all classes, and stop 5.3.0 broker. Start a 5.3.2 broker. 7) Repeat steps 1-5. Except you'll notice that once you stop ForReader1, ForReader2 is effected which is shouldn't be. ForReader2 will basically stop being able to read messages until you start ForReader1 again. ForReader2 will occasionally get messages, but incredibly slowly and performance is ruined. package test; import java.net.*; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author bwillard */ public class PutMessages extends Thread { final private MessageProducer producer; final private String correlationID; final private Session session; public PutMessages(URI uri, String queueName, String correlationID) throws Exception { this.correlationID = correlationID; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); Connection connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); } public void run() { ObjectMessage message; String text; long counter = 0; while (true) { try { this.sleep(5); counter++; message = session.createObjectMessage(); message.setJMSCorrelationID(correlationID); text = "Message " + counter + " for consumer " + correlationID; message.setObject(text); producer.send(message); } catch (Exception exc) { System.err.println("Error sending message"); exc.printStackTrace(System.err); } } } public static void main(String[] args) { try { URI uri = URI.create(args[0]); String queueName = args[1]; String correlationID = args[2]; new PutMessages(uri, queueName, correlationID).start(); } catch (Exception exc) { exc.printStackTrace(); } } } package test; import java.net.*; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; /** * * @author bwillard */ public class ReadMessages implements MessageListener { final private MessageConsumer consumer; final private String correlationID; final private Session session; public ReadMessages(URI uri, String queueName, String correlationID) throws Exception { this.correlationID = correlationID; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); Connection connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); consumer = session.createConsumer(queue, "JMSCorrelationID='" + correlationID + "'"); consumer.setMessageListener(this); } public void onMessage(Message msg) { long inTime, outTime, brokerTime; try { if (msg instanceof ObjectMessage) { ObjectMessage txt = (ObjectMessage) msg; inTime = txt.getLongProperty("JMSActiveMQBrokerInTime"); outTime = txt.getLongProperty("JMSActiveMQBrokerOutTime"); brokerTime = outTime - inTime; System.out.println("Message waited " + brokerTime + "ms : " + txt.getObject().toString()); } } catch (Exception exc) { System.err.println("Error reading message"); exc.printStackTrace(); } } public static void main(String[] args) { try { URI uri = URI.create(args[0]); String queueName = args[1]; String correlationID = args[2]; new ReadMessages(uri, queueName, correlationID); } catch (Exception exc) { exc.printStackTrace(); } } }
          Hide
          Brad Willard added a comment -

          I updated the issue because it's a major performance issue that also exists in 5.3.2 when using messages selectors.

          Show
          Brad Willard added a comment - I updated the issue because it's a major performance issue that also exists in 5.3.2 when using messages selectors.
          Hide
          Brad Willard added a comment -

          This is the config file I used in both brokers to show the problem

          Show
          Brad Willard added a comment - This is the config file I used in both brokers to show the problem
          Hide
          Brad Willard added a comment -

          Source files instead of pasting into comment, sorry.

          Show
          Brad Willard added a comment - Source files instead of pasting into comment, sorry.
          Hide
          Brad Willard added a comment -

          Has anyone had a chance to verify this is a real problem or if there is something wrong with my config? I am unable to upgrade past broker 5.3.0 because of it, and really want to be able to upgrade to resolve other issues I've been seeing. I also want to make sure this isn't also an issue in the 5.4 broker due out.

          Show
          Brad Willard added a comment - Has anyone had a chance to verify this is a real problem or if there is something wrong with my config? I am unable to upgrade past broker 5.3.0 because of it, and really want to be able to upgrade to resolve other issues I've been seeing. I also want to make sure this isn't also an issue in the 5.4 broker due out.
          Show
          Brad Willard added a comment - Confirmed this is still an issue in the 5.4 snapshot https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.4-SNAPSHOT/apache-activemq-5.4-SNAPSHOT-bin.tar.gz
          Hide
          Brad Willard added a comment -

          I wanted to confirm that this is still an issue in the 5.4 release. I can do it with the default properties file it ship with as well. Increasing the maxPageSize in the policyEntry seems to help the issue, but eventually the consumer on a separate correlation will lock up.

          Any ideas, this is has prevented up from upgrading since 5.3.0.

          Show
          Brad Willard added a comment - I wanted to confirm that this is still an issue in the 5.4 release. I can do it with the default properties file it ship with as well. Increasing the maxPageSize in the policyEntry seems to help the issue, but eventually the consumer on a separate correlation will lock up. Any ideas, this is has prevented up from upgrading since 5.3.0.
          Hide
          Brad Willard added a comment -

          Modifications to 5.4.0 default config file that still show the problem running the provided sample code.

          Show
          Brad Willard added a comment - Modifications to 5.4.0 default config file that still show the problem running the provided sample code.
          Hide
          Gary Tully added a comment -

          This is another case of https://issues.apache.org/activemq/browse/AMQ-2217 - there are some strategies using named queues or virtual queues that can help as outlined in the comments on AMQ-2217

          5.3.0 had a bug in this regard that could lead to an out of memory exception as there was no limit on the size of the in memory dispatch queue, it was as if maxPageSize == MAX_INT. With a very sparse selector, the broker would exhaust available memory.
          Set maxPageSize to MAX_INT to replicate.

          Show
          Gary Tully added a comment - This is another case of https://issues.apache.org/activemq/browse/AMQ-2217 - there are some strategies using named queues or virtual queues that can help as outlined in the comments on AMQ-2217 5.3.0 had a bug in this regard that could lead to an out of memory exception as there was no limit on the size of the in memory dispatch queue, it was as if maxPageSize == MAX_INT. With a very sparse selector, the broker would exhaust available memory. Set maxPageSize to MAX_INT to replicate.
          Hide
          Brad Willard added a comment -

          I have confirmed this is still a problem in 5.4.1 I have also confirmed you can reproduce this problem with the default activemq configuration file. Disabling flow control has no effect, neither does lowering the message pre-fetch. This is a real performance issues for anyone using message selectors.

          Show
          Brad Willard added a comment - I have confirmed this is still a problem in 5.4.1 I have also confirmed you can reproduce this problem with the default activemq configuration file. Disabling flow control has no effect, neither does lowering the message pre-fetch. This is a real performance issues for anyone using message selectors.
          Hide
          Timothy Bish added a comment -

          Relates to the same issue of maxPageSize

          Show
          Timothy Bish added a comment - Relates to the same issue of maxPageSize

            People

            • Assignee:
              Unassigned
              Reporter:
              Brad Willard
            • Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development