Uploaded image for project: 'ActiveMQ Artemis'
  1. ActiveMQ Artemis
  2. ARTEMIS-1881

Queue autocreate together with multiple producers/consumers on the same session stopped working with AMQP protocol

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Cannot Reproduce
    • 2.5.0, 2.6.0
    • None
    • AMQP
    • None

    Description

      Usage of a JMS Pool ConnectionFactory means that the pattern where there is multiple producers and consumers created on a single session is fairly natural.

      I am using an autocreated address. For every send or receive operation in the following sequence, a new Message{Producer,Consumer} is created: send, receive, receive, send, receive.

      This (and the attached test for AMQP) used to work as expected in 2.4.0, but broke in 2.5.0 and is broken since.

      The OpenWire version of test usually passes, but sometimes fails. Log is provided in comment.

      According to git bisect the AMQP breakage occurred due to

      1d1d6c8b4686f869df0ca5fc09c20128f8481cff is the first bad commit
      commit 1d1d6c8b4686f869df0ca5fc09c20128f8481cff
      Author: Clebert Suconic clebertsuconic@apache.org
      Date: Tue Nov 21 10:12:20 2017 -0500
      
      ARTEMIS-1416 Implementing cache on queue and address querying
      
      This will cache the last query, optimizing most of the cases
      This won't optimize the case where you are sending producers with different address,
      but this is not the one I'm after now.
      
      
      :040000 040000 315357d1cfdfb9a71a081b8976ad54f5f5070ab3 50eb2ba0ae07fdb77b46575d9d7a5f84bd4f6483 M artemis-protocols

      Test

      activemq-artemis/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AutoCreateTests.java
      package org.apache.activemq.artemis.tests.integration.amqp;
      
      import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
      import org.apache.qpid.jms.JmsConnectionFactory;
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.junit.runners.Parameterized;
      
      import javax.jms.*;
      import java.util.Arrays;
      import java.util.Collection;
      import java.util.UUID;
      
      import static org.hamcrest.CoreMatchers.notNullValue;
      import static org.hamcrest.CoreMatchers.nullValue;
      
      @RunWith(Parameterized.class)
      public class AutoCreateTests extends JMSClientTestSupport {
          enum Protocol {
              AMQP, CORE, OPENWIRE
          }
      
      //    @Parameterized.Parameters
      //    public static Object[] data() {
      //        return new Object[]{Protocol.AMQP, Protocol.CORE, Protocol.OPENWIRE};
      //    }
      
          @Parameterized.Parameters(name = "protocol={0}")
          public static Collection<Object[]> data() {
              return Arrays.asList(new Object[][]{
                      {Protocol.AMQP}, {Protocol.CORE}, {Protocol.OPENWIRE}
              });
          }
      
          @Override
          protected String getConfiguredProtocols() {
              return "AMQP,OPENWIRE,CORE";
          }
      
          @Parameterized.Parameter
          public Protocol protocol;
      
          ConnectionFactory getConnectionFactory() {
              switch (protocol) {
                  case AMQP:
                      return new JmsConnectionFactory("amqp://127.0.0.1:" + AMQP_PORT);
                  case CORE:
                      return new ActiveMQConnectionFactory("tcp://127.0.0.1:" + AMQP_PORT);
                  case OPENWIRE:
                      return new org.apache.activemq.ActiveMQConnectionFactory("tcp://127.0.0.1:" + AMQP_PORT);
              }
              throw new IllegalArgumentException("Unknown value of protocol:" + protocol);
          }
      
          @Test
          public void autocreateSendReceiveReceiveSendReceiveOnTheSameSession() throws JMSException {
              ConnectionFactory f = getConnectionFactory();
              Connection c = f.createConnection();
              c.start();
              Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
              Queue d = s.createQueue(UUID.randomUUID().toString());
      
              try {
                  {
                      MessageProducer p = s.createProducer(d);
                      p.send(s.createMessage());
                      p.close();
                  }
      
                  {
                      MessageConsumer co = s.createConsumer(d);
                      assertThat(co.receive(2000), notNullValue());
                      co.close();
                  }
      
                  {
                      MessageConsumer co = s.createConsumer(d);
                      assertThat(co.receive(2000), nullValue());
                      co.close();
                  }
      
                  {
                      MessageProducer p = s.createProducer(d);
                      p.send(s.createMessage());
                      p.close();
                  }
      
                  {
                      MessageConsumer co = s.createConsumer(d);
                      assertThat(co.receive(2000), notNullValue());  // <-- AMQP fails here
                      co.close();
                  }
              } finally {
                  s.close();
                  c.close();
              }
          }
      }
      

      See the "// <-- AMQP fails here" comment for the assert which is broken for AMQP. No message is received by the last consumer.

      Attachments

        Activity

          People

            Unassigned Unassigned
            jdanek Jiri Daněk
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: