Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-2580

Durable subscribers receives nothing when reconnecting with a prefetch size less than the number of messages that don't match a message selector

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.3.0
    • 5.4.0
    • Selector
    • None
    • Patch Available

    Description

      1. Create a connection factory with a message prefetch size of PREFETCH_SIZE.
      2. Create a durable subscriber to a Topic with a message selector of "a=X".
      3. Disconnect.
      4. More than PREFETCH_SIZE messages are then put onto the Topic with a string property "a=Y".
      5. Just one message is put onto the Topic with string property "a=X".
      6. The durable subscriber connects again but it does not get the message with string property "a=X". In fact, it gets nothing.

      It appears that upon reconnecting, the message selector is not respected when retrieving the message from storage.

      I've got a unit test to demonstrate this plus a proposed fix.

      ### Eclipse Workspace Patch 1.0
      #P activemq
      Index: activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
      ===================================================================
      --- activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java       (revision 900353)
      +++ activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java       (working copy)
      @@ -306,7 +306,7 @@
                                       count++;
                                       container.setBatchEntry(msg.getMessageId(), entry);
                                   } else {
      -                                break;
      +                                //break;
                                   }
                               } else {
                                   container.reset();
      Index: activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
      ===================================================================
      --- activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java      (revision 900353)
      +++ activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java      (working copy)
      @@ -46,10 +46,11 @@
           
           public boolean recoverMessage(Message message) throws Exception {
               if (listener.hasSpace()) {
      -            listener.recoverMessage(message);
      -            lastRecovered = message.getMessageId();
      -            count++;
      -            return true;
      +            if (listener.recoverMessage(message)) {
      +                lastRecovered = message.getMessageId();
      +                count++;
      +                return true;
      +            }
               }
               return false;
           }
      Index: activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java
      ===================================================================
      --- activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java      (revision 0)
      +++ activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java      (revision 0)
      @@ -0,0 +1,174 @@
      +package org.apache.activemq.pool;
      +
      +import java.io.File;
      +
      +import javax.jms.JMSException;
      +import javax.jms.MessageConsumer;
      +import javax.jms.MessageProducer;
      +import javax.jms.Session;
      +import javax.jms.TextMessage;
      +import javax.jms.Topic;
      +import javax.jms.TopicConnection;
      +import javax.jms.TopicSession;
      +
      +import junit.framework.TestCase;
      +
      +import org.apache.activemq.ActiveMQConnectionFactory;
      +import org.apache.activemq.ActiveMQPrefetchPolicy;
      +import org.apache.activemq.broker.BrokerService;
      +import org.apache.activemq.store.PersistenceAdapter;
      +
      +public class PrefetchTest extends TestCase {
      +
      +       private static final String TOPIC_NAME = "topicName";
      +       private static final String CLIENT_ID = "client_id";
      +       private static final String textOfSelectedMsg = "good_message";
      +
      +       protected TopicConnection connection;
      +
      +       private Topic topic;
      +       private Session session;
      +       private MessageProducer producer;
      +       private PooledConnectionFactory connectionFactory;
      +       private TopicConnection topicConnection;
      +       private String bindAddress;
      +       private BrokerService service;
      +
      +       protected void setUp() throws Exception {
      +               bindAddress = "tcp://localhost:61616";
      +               super.setUp();
      +               initDurableBroker();
      +               initConnectionFactory();
      +               initTopic();
      +
      +       }
      +
      +       protected void tearDown() throws Exception {
      +           shutdownClient();
      +               connectionFactory.stop();
      +               service.stop();
      +               super.tearDown();
      +       }
      +
      +       private void initConnection() throws JMSException {
      +           System.out.println("Initializing connection");
      +               connection = (TopicConnection) connectionFactory.createConnection(); 
      +               connection.start();
      +       }
      +
      +    public void testTopicIsDurableSmokeTest() throws Exception {
      +        
      +       initClient();
      +       MessageConsumer consumer = createMessageConsumer();
      +       System.out.println("Consuming message");
      +       assertNull(consumer.receive(1));
      +       shutdownClient();
      +       consumer.close();
      +    
      +       sendMessages();
      +       shutdownClient();
      +    
      +       initClient();
      +       consumer = createMessageConsumer();
      +    
      +       System.out.println("Consuming message");
      +       TextMessage answer1 = (TextMessage)consumer.receive(1000);
      +       assertNotNull(answer1);
      +    
      +       consumer.close();
      +    }
      +    
      +    private MessageConsumer createMessageConsumer() throws JMSException {
      +        System.out.println("creating durable subscriber");
      +       return session.createDurableSubscriber(topic, 
      +               TOPIC_NAME, 
      +               "name='value'", 
      +               false);
      +    }
      +
      +       private void initClient() throws JMSException {
      +           System.out.println("Initializing client");
      +           
      +               initConnection();
      +       initSession();
      +       }
      +
      +       private void shutdownClient()
      +                       throws JMSException {
      +           System.out.println("Closing session and connection");
      +        session.close();
      +        connection.close();
      +        session = null;
      +        connection = null;
      +       }
      +
      +       private void sendMessages()
      +                       throws JMSException {
      +           initConnection();
      +
      +               initSession();
      +
      +               System.out.println("Creating producer");
      +               producer = session.createProducer(topic);
      +
      +               sendMessageThatFailsSelection();
      +        
      +               sendMessage(textOfSelectedMsg, "value");
      +       }
      +
      +       private void initSession() throws JMSException {
      +           System.out.println("Initializing session");
      +               session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
      +       }
      +
      +       private void sendMessageThatFailsSelection() throws JMSException {
      +               for (int i = 0 ; i < 5 ; i++) {
      +                       String textOfNotSelectedMsg = "Msg_" + i;
      +                       sendMessage(textOfNotSelectedMsg, "not_value");
      +                       System.out.println("#");
      +               }
      +       }
      +
      +       private void sendMessage(
      +                       String msgText,
      +                       String propertyValue) throws JMSException {
      +           System.out.println("Creating message: " + msgText);
      +               TextMessage messageToSelect = session.createTextMessage(msgText);
      +        messageToSelect.setStringProperty("name", propertyValue);
      +        System.out.println("Sending message");
      +               producer.send(messageToSelect);
      +       }
      +
      +       protected void initConnectionFactory() {
      +               ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory();
      +               connectionFactory = new PooledConnectionFactory(activeMqConnectionFactory);
      +       }
      +
      +
      +       private ActiveMQConnectionFactory createActiveMqConnectionFactory() {
      +               ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory();
      +        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
      +        prefetchPolicy.setDurableTopicPrefetch(2);
      +               activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy );
      +        activeMqConnectionFactory.setClientID(CLIENT_ID);
      +               return activeMqConnectionFactory;
      +       }
      +
      +       private void initDurableBroker() throws Exception {
      +               service = new BrokerService();
      +               PersistenceAdapter persistenceAdaptor = service.getPersistenceAdapter();
      +               File file = new File("phills_durable_dir");
      +               persistenceAdaptor.setDirectory(file);
      +               service.setTransportConnectorURIs(new String[] { bindAddress } );
      +               service.setPersistent(true);
      +               service.setUseJmx(true);
      +               service.start();
      +
      +       }
      +
      +       private void initTopic() throws JMSException {
      +               topicConnection = (TopicConnection) connectionFactory.createConnection();
      +               TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
      +               topic = topicSession.createTopic(TOPIC_NAME);
      +       }
      +}
      

      Attachments

        1. selector_patch_and_test.zip
          7 kB
          Phillip Henry

        Activity

          People

            gtully Gary Tully
            philliph Phillip Henry
            Votes:
            3 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: