Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.26.0
-
None
-
None
Description
Currently in some cases last value queues deliver all messages to consumers as opposed to only the last one.
I wrote the following test which is working fine:
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.artemis.tests.integration.stomp; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; import java.util.UUID; @RunWith(Parameterized.class) public class StompLVQTest extends StompTestBase { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected StompClientConnection producerConn; protected StompClientConnection consumerConn; @Override protected ActiveMQServer createServer() throws Exception { ActiveMQServer server = super.createServer(); server.getConfiguration().setAddressQueueScanPeriod(100); return server; } @Override @Before public void setUp() throws Exception { super.setUp(); server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true)); producerConn = StompClientConnectionFactory.createClientConnection(uri); consumerConn = StompClientConnectionFactory.createClientConnection(uri); } @Override @After public void tearDown() throws Exception { try { boolean connected = producerConn != null && producerConn.isConnected(); logger.debug("Connection 1.0 connected: {}", connected); if (connected) { try { producerConn.disconnect(); } catch (Exception e) { // ignore } } } finally { super.tearDown(); producerConn.closeTransport(); } try { boolean connected = consumerConn != null && consumerConn.isConnected(); logger.debug("Connection 1.0 connected: {}", connected); if (connected) { try { consumerConn.disconnect(); } catch (Exception e) { // ignore } } } finally { super.tearDown(); consumerConn.closeTransport(); } } @Test public void testLVQ() throws Exception { final String name = "lvq"; producerConn.connect(defUser, defPass); consumerConn.connect(defUser, defPass); subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0); Thread producer = new Thread() { @Override public void run() { try { for (int i = 1; i <= 100; i++) { String uuid = UUID.randomUUID().toString(); ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND) .addHeader(Stomp.Headers.Send.DESTINATION, name) .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test") .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid) // .addHeader(Stomp.Headers.Send.PERSISTENT, "true") .setBody(String.valueOf(i)); frame = producerConn.sendFrame(frame); assertEquals(Stomp.Responses.RECEIPT, frame.getCommand()); assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); } } catch(Exception e) { logger.error(null, e); } } }; Thread consumer = new Thread() { @Override public void run() { try { List<ClientStompFrame> messages = new ArrayList<>(); ClientStompFrame frame; while((frame = consumerConn.receiveFrame(10000)) != null) { assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); ack(consumerConn, null, frame); messages.add(frame); } logger.info("Received messages: {}", messages); Assert.assertEquals(2, messages.size()); Assert.assertEquals("1", messages.get(0).getBody()); Assert.assertEquals("100", messages.get(1).getBody()); } catch(Exception e) { logger.error(null, e); } } }; producer.start(); producer.join(); consumer.start(); consumer.join(); } }
The client subscribes before the producer starts sending the messages and receives only the first and the last message which is expected – the first one is in delivery and for the next ones the very last one will be kept.
In the actual application (via stomp-php) it is not working: all messages get delivered to the consumer.
The queue with the issue is created by defining:
<address name="queue"> <anycast> <queue name="queue" last-value="true"> <durable>true</durable> </queue> </anycast> </address>
which should be the equivalent of the one in the test.
The issue seems to be in QueueImpl::deliver:
ConsumerHolder<? extends Consumer> holder; if (consumers.hasNext()) { holder = consumers.next(); } else { pruneLastValues(); break; }
where pruneLastValues() should always be called (or rather each message checked for duplicates).
If that is the expected behaviour, then I suggest to add an optional feature to always deduplicate the messages.
I propose the following as the solution:
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index e67ae7dab1..82bebeaf19 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -199,21 +199,31 @@ public class LastValueQueue extends QueueImpl { // called with synchronized(this) from super.deliver() try (LinkedListIterator<MessageReference> iter = messageReferences.iterator()) { while (iter.hasNext()) { - MessageReference ref = iter.next(); - if (!currentLastValue(ref)) { + MessageReference ref = interceptMessage(iter.next()); + if (ref == null) { iter.remove(); - try { - referenceHandled(ref); - super.refRemoved(ref); - ref.acknowledge(null, AckReason.REPLACED, null); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); - } } } } } + @Override + protected MessageReference interceptMessage(MessageReference ref) { + if (!currentLastValue(ref)) { + try { + referenceHandled(ref); + super.refRemoved(ref); + ref.acknowledge(null, AckReason.REPLACED, null); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); + } + + return null; + } + + return super.interceptMessage(ref); + } + private boolean currentLastValue(final MessageReference ref) { boolean currentLastValue = false; SimpleString lastValueProp = ref.getLastValueProperty(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 66d6ff789b..77cdc0db90 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3035,15 +3035,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { Consumer consumer = holder.consumer; Consumer groupConsumer = null; + ref = null; if (holder.iter == null) { holder.iter = messageReferences.iterator(); } - if (holder.iter.hasNext()) { - ref = holder.iter.next(); - } else { - ref = null; + while (holder.iter.hasNext()) { + ref = interceptMessage(holder.iter.next()); + + if (ref == null) { + holder.iter.remove(); + handled++; + } else { + break; + } } if (ref == null) { @@ -3154,6 +3160,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // interception point for LVQ } + protected MessageReference interceptMessage(MessageReference ref) { + return ref; + } + protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) { holder.iter.remove(); refRemoved(ref);
currentLastValue() is lightweight, so there is no major performance impact checking whether each message is the last version or not.
Attachments
Issue Links
- relates to
-
ARTEMIS-4085 Exclusive LVQ not working as expected
- Closed