Details
-
New Feature
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
5.7.0
-
None
-
None
-
Patch Available
Description
Using queues as an example, but this also applies to topics...
When a queue is created, it inherits the System-wide (broker) usage manager and its limits. A policy can be applied to the queue in order to specify a destination-specific memory usage limit:
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> ...
This policy entry is still dependent on the existing org.apache.activemq.usage.MemoryUsage class:
public void configure(Broker broker,Queue queue) { baseConfiguration(broker,queue); if (dispatchPolicy != null) { queue.setDispatchPolicy(dispatchPolicy); } queue.setDeadLetterStrategy(getDeadLetterStrategy()); queue.setMessageGroupMapFactory(getMessageGroupMapFactory()); if (memoryLimit > 0) { queue.getMemoryUsage().setLimit(memoryLimit); } ...
We wanted to create a usage policy that would limit the number of messages in a queue by count (i.e., max 1000 messages) rather than by memory usage.
The existing usage strategies are memory-centric. It would be nice if a destination had a generic Usage property rather than a specific MemoryUsage property, but barring refactoring the interfaces, it would be possible to achieve our goal if we could set the MemoryUsage property of a queue; this is as simple as adding a Destination.setMemoryUsage(MemoryUsage) property go with the existing Destination.getMemoryUsage(MemoryUsage) property.
The attached patch contains the trivial changes to expose Destination.setMemoryUsage. The patch also includes an example of an extended policy that allows a discrete (i.e., count based) usage policy on queues. Rather than enforcing a specific limit to the number of messages in a queue, this policy determines the limit as a multiple of the number of consumers. It would be even easier to do the former, but our use case required the latter. Also note that this consumer-ratio policy is orthogonal to the consumer prefetch limit — it's a policy that we needed to prevent a distributed queue with many messages from being completely drained to a remote broker during startup (i.e., before additional brokers had started).
<bean class="com.invoqsystems.foundation.activemq.ExtendedPolicyEntry"> <property name="queue" value="shared.notification.*" /> <property name="messageToConsumerRatioLimit" value="99" /> </bean>
public class ExtendedPolicyEntry extends PolicyEntry { private long messageToConsumerRatioLimit; /** * This is called by AMQ when a queue is first created. If a message-to-consumer ratio is specified, a * {@link DiscreteMemoryUsage} class with a {@link ConsumerRatioUsageCapacity} limiter replaces the queue's * byte-based MemoryUsage class. The original parent of the queue's byte-based MemoryUsage becomes the parent of the * {@link DiscreteMemoryUsage}, so it will also receive updates and can signal when the queue is full (e.g., because * of queue or system memory limits). */ @Override public void configure(Broker broker, Queue queue) { super.configure(broker, queue); if (messageToConsumerRatioLimit > 0) { DiscreteMemoryUsage ratioUsage = new DiscreteMemoryUsage(queue.getMemoryUsage().getParent(), ":ratio"); ratioUsage.setLimiter(new ConsumerRatioUsageCapacity(queue)); ratioUsage.setLimit(messageToConsumerRatioLimit); ratioUsage.setParent(queue.getMemoryUsage()); ratioUsage.setExecutor(queue.getMemoryUsage().getExecutor()); queue.setMemoryUsage(ratioUsage); } } ...