Uploaded image for project: 'ActiveMQ Artemis'
  1. ActiveMQ Artemis
  2. ARTEMIS-4050

Last value queue not working as expected

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.26.0
    • None
    • STOMP
    • None

    Description

      Currently in some cases last value queues deliver all messages to consumers as opposed to only the last one.

      I wrote the following test which is working fine:

      /*
       * Licensed to the Apache Software Foundation (ASF) under one or more
       * contributor license agreements. See the NOTICE file distributed with
       * this work for additional information regarding copyright ownership.
       * The ASF licenses this file to You under the Apache License, Version 2.0
       * (the "License"); you may not use this file except in compliance with
       * the License. You may obtain a copy of the License at
       *
       *     http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      package org.apache.activemq.artemis.tests.integration.stomp;
      
      import org.apache.activemq.artemis.api.core.Message;
      import org.apache.activemq.artemis.api.core.QueueConfiguration;
      import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
      import org.apache.activemq.artemis.core.server.ActiveMQServer;
      import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
      import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
      import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
      import org.junit.After;
      import org.junit.Assert;
      import org.junit.Before;
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.junit.runners.Parameterized;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.lang.invoke.MethodHandles;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.UUID;
      
      @RunWith(Parameterized.class)
      public class StompLVQTest extends StompTestBase {
      
       private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
      
       protected StompClientConnection producerConn;
       protected StompClientConnection consumerConn;
      
       @Override
       protected ActiveMQServer createServer() throws Exception {
        ActiveMQServer server = super.createServer();
        server.getConfiguration().setAddressQueueScanPeriod(100);
        return server;
       }
      
       @Override
       @Before
       public void setUp() throws Exception {
        super.setUp();
      
        server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true));
      
        producerConn = StompClientConnectionFactory.createClientConnection(uri);
        consumerConn = StompClientConnectionFactory.createClientConnection(uri);
       }
      
       @Override
       @After
       public void tearDown() throws Exception {
        try {
         boolean connected = producerConn != null && producerConn.isConnected();
         logger.debug("Connection 1.0 connected: {}", connected);
         if (connected) {
          try {
           producerConn.disconnect();
          } catch (Exception e) {
           // ignore
          }
         }
        } finally {
         super.tearDown();
         producerConn.closeTransport();
        }
      
        try {
         boolean connected = consumerConn != null && consumerConn.isConnected();
         logger.debug("Connection 1.0 connected: {}", connected);
         if (connected) {
          try {
           consumerConn.disconnect();
          } catch (Exception e) {
           // ignore
          }
         }
        } finally {
         super.tearDown();
         consumerConn.closeTransport();
        }
       }
      
       @Test
       public void testLVQ() throws Exception {
        final String name = "lvq";
      
        producerConn.connect(defUser, defPass);
        consumerConn.connect(defUser, defPass);
      
        subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);
      
        Thread producer = new Thread() {
         @Override
         public void run() {
          try {
           for (int i = 1; i <= 100; i++) {
            String uuid = UUID.randomUUID().toString();
      
            ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
              .addHeader(Stomp.Headers.Send.DESTINATION, name)
              .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
              .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
              // .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
              .setBody(String.valueOf(i));
      
            frame = producerConn.sendFrame(frame);
      
            assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
            assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
           }
          } catch(Exception e) {
           logger.error(null, e);
          }
         }
        };
      
        Thread consumer = new Thread() {
         @Override
         public void run() {
          try {
           List<ClientStompFrame> messages = new ArrayList<>();
           ClientStompFrame frame;
      
           while((frame = consumerConn.receiveFrame(10000)) != null)
           {
            assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
      
            ack(consumerConn, null, frame);
      
            messages.add(frame);
           }
      
           logger.info("Received messages: {}", messages);
      
           Assert.assertEquals(2, messages.size());
           Assert.assertEquals("1", messages.get(0).getBody());
           Assert.assertEquals("100", messages.get(1).getBody());
          } catch(Exception e) {
           logger.error(null, e);
          }
         }
        };
      
        producer.start();
        producer.join();
      
        consumer.start();
        consumer.join();
       }
      } 

      The client subscribes before the producer starts sending the messages and receives only the first and the last message which is expected – the first one is in delivery and for the next ones the very last one will be kept.

      In the actual application (via stomp-php) it is not working: all messages get delivered to the consumer.

      The queue with the issue is created by defining:

      <address name="queue">
          <anycast>
              <queue name="queue" last-value="true">
                  <durable>true</durable>
              </queue>
          </anycast>
      </address>

      which should be the equivalent of the one in the test.

      The issue seems to be in QueueImpl::deliver:

      ConsumerHolder<? extends Consumer> holder;
      if (consumers.hasNext()) {
         holder = consumers.next();
      } else {
         pruneLastValues();
         break;
      } 

      where pruneLastValues() should always be called (or rather each message checked for duplicates).

      If that is the expected behaviour, then I suggest to add an optional feature to always deduplicate the messages.

      I propose the following as the solution:

      diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
      index e67ae7dab1..82bebeaf19 100644
      --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
      +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
      @@ -199,21 +199,31 @@ public class LastValueQueue extends QueueImpl {
             // called with synchronized(this) from super.deliver()
             try (LinkedListIterator<MessageReference> iter = messageReferences.iterator()) {
                while (iter.hasNext()) {
      -            MessageReference ref = iter.next();
      -            if (!currentLastValue(ref)) {
      +            MessageReference ref = interceptMessage(iter.next());
      +            if (ref == null) {
                      iter.remove();
      -               try {
      -                  referenceHandled(ref);
      -                  super.refRemoved(ref);
      -                  ref.acknowledge(null, AckReason.REPLACED, null);
      -               } catch (Exception e) {
      -                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
      -               }
                   }
                }
             }
          }
       
      +   @Override
      +   protected MessageReference interceptMessage(MessageReference ref) {
      +      if (!currentLastValue(ref)) {
      +         try {
      +            referenceHandled(ref);
      +            super.refRemoved(ref);
      +            ref.acknowledge(null, AckReason.REPLACED, null);
      +         } catch (Exception e) {
      +            ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
      +         }
      +
      +         return null;
      +      }
      +
      +      return super.interceptMessage(ref);
      +   }
      +
          private boolean currentLastValue(final MessageReference ref) {
             boolean currentLastValue = false;
             SimpleString lastValueProp = ref.getLastValueProperty();
      diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
      index 66d6ff789b..77cdc0db90 100644
      --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
      +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
      @@ -3035,15 +3035,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       
                   Consumer consumer = holder.consumer;
                   Consumer groupConsumer = null;
      +            ref = null;
       
                   if (holder.iter == null) {
                      holder.iter = messageReferences.iterator();
                   }
       
      -            if (holder.iter.hasNext()) {
      -               ref = holder.iter.next();
      -            } else {
      -               ref = null;
      +            while (holder.iter.hasNext()) {
      +               ref = interceptMessage(holder.iter.next());
      +
      +               if (ref == null) {
      +                  holder.iter.remove();
      +                  handled++;
      +               } else {
      +                  break;
      +               }
                   }
       
                   if (ref == null) {
      @@ -3154,6 +3160,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             // interception point for LVQ
          }
       
      +   protected MessageReference interceptMessage(MessageReference ref) {
      +      return ref;
      +   }
      +
          protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
             holder.iter.remove();
             refRemoved(ref);

      currentLastValue() is lightweight, so there is no major performance impact checking whether each message is the last version or not.

      Attachments

        Issue Links

          Activity

            People

              jbertram Justin Bertram
              siilike9 Lauri Keel
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: