Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-4121

Expose Destination.setMemoryUsage so that custom policies can override default MemoryUsage (e.g., to specify message count limits)

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.7.0
    • 5.8.0
    • None
    • None
    • Patch Available

    Description

      Using queues as an example, but this also applies to topics...

      When a queue is created, it inherits the System-wide (broker) usage manager and its limits. A policy can be applied to the queue in order to specify a destination-specific memory usage limit:

      <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> 
      ...
      

      This policy entry is still dependent on the existing org.apache.activemq.usage.MemoryUsage class:

      org.apache.activemq.broker.region.policy.PolicyEntry
      public void configure(Broker broker,Queue queue) {
          baseConfiguration(broker,queue);
          if (dispatchPolicy != null) {
              queue.setDispatchPolicy(dispatchPolicy);
          }
          queue.setDeadLetterStrategy(getDeadLetterStrategy());
          queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
          if (memoryLimit > 0) {
              queue.getMemoryUsage().setLimit(memoryLimit);
          }
      ...
      

      We wanted to create a usage policy that would limit the number of messages in a queue by count (i.e., max 1000 messages) rather than by memory usage.

      The existing usage strategies are memory-centric. It would be nice if a destination had a generic Usage property rather than a specific MemoryUsage property, but barring refactoring the interfaces, it would be possible to achieve our goal if we could set the MemoryUsage property of a queue; this is as simple as adding a Destination.setMemoryUsage(MemoryUsage) property go with the existing Destination.getMemoryUsage(MemoryUsage) property.

      The attached patch contains the trivial changes to expose Destination.setMemoryUsage. The patch also includes an example of an extended policy that allows a discrete (i.e., count based) usage policy on queues. Rather than enforcing a specific limit to the number of messages in a queue, this policy determines the limit as a multiple of the number of consumers. It would be even easier to do the former, but our use case required the latter. Also note that this consumer-ratio policy is orthogonal to the consumer prefetch limit — it's a policy that we needed to prevent a distributed queue with many messages from being completely drained to a remote broker during startup (i.e., before additional brokers had started).

      Example of ExtendedPolicy configuration
      <bean class="com.invoqsystems.foundation.activemq.ExtendedPolicyEntry">
          <property name="queue" value="shared.notification.*" />
          <property name="messageToConsumerRatioLimit" value="99" />
      </bean>
      
      com.invoqsystems.foundation.activemq.ExtendedPolicyEntry
      public class ExtendedPolicyEntry extends PolicyEntry {
          private long messageToConsumerRatioLimit;
      
          /**
           * This is called by AMQ when a queue is first created. If a message-to-consumer ratio is specified, a
           * {@link DiscreteMemoryUsage} class with a {@link ConsumerRatioUsageCapacity} limiter replaces the queue's
           * byte-based MemoryUsage class. The original parent of the queue's byte-based MemoryUsage becomes the parent of the
           * {@link DiscreteMemoryUsage}, so it will also receive updates and can signal when the queue is full (e.g., because
           * of queue or system memory limits).
           */
          @Override
          public void configure(Broker broker, Queue queue) {
              super.configure(broker, queue);
      
              if (messageToConsumerRatioLimit > 0) {
                  DiscreteMemoryUsage ratioUsage = new DiscreteMemoryUsage(queue.getMemoryUsage().getParent(), ":ratio");
                  ratioUsage.setLimiter(new ConsumerRatioUsageCapacity(queue));
                  ratioUsage.setLimit(messageToConsumerRatioLimit);
                  ratioUsage.setParent(queue.getMemoryUsage());
                  ratioUsage.setExecutor(queue.getMemoryUsage().getExecutor());
                  queue.setMemoryUsage(ratioUsage);
              }
          }
      ...
      

      Attachments

        1. AMQ4121.patch
          2 kB
          Stirling Chow
        2. ConsumerRatioUsageCapacity.java
          3 kB
          Stirling Chow
        3. DiscreteMemoryUsage.java
          2 kB
          Stirling Chow
        4. ExtendedPolicyEntry.java
          3 kB
          Stirling Chow

        Activity

          People

            tabish Timothy A. Bish
            stirlingc Stirling Chow
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: