Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-4148

Static subscriptions from network bridges do not respect TTL (off by one in calculation), resulting in duplicate consumers.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.8.0
    • 5.8.0
    • None
    • None
    • Patch Available

    Description

      Symptom
      =======
      The remote broker path is not set on network consumers that result from static subscriptions; as a result, they are forwarded to other bridges even when the network TTL on the bridges is 1. In an n+1 hub-and-spoke network, the next broker to join receives n subscriptions instead of 1.

      Cause
      =====
      A consumer for a static subscriptions is created by the following code:

      DemandForwardingBridgeSupport.java
      private void startLocalBridge() throws Throwable {
      ...
          if (!disposed.get()) {
                  setupStaticDestinations();
              } else {
                  LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
              }
          }
      }
      
      /**
       * Subscriptions for these destinations are always created
       */
      protected void setupStaticDestinations() {
      ...
                  DemandSubscription sub = createDemandSubscription(dest);
                  try {
                      addSubscription(sub);
                  } catch (IOException e) {
                      LOG.error("Failed to add static destination " + dest, e);
                  }
                  if (LOG.isTraceEnabled()) {
                      LOG.trace("bridging messages for static destination: " + dest);
                  }
              }
          }
      }
      
      final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
          ConsumerInfo info = new ConsumerInfo();
          info.setDestination(destination);
      
          // the remote info held by the DemandSubscription holds the original consumerId,
          // the local info get's overwritten
          info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
          DemandSubscription result = null;
          try {
              result = createDemandSubscription(info);
          } catch (IOException e) {
              LOG.error("Failed to create DemandSubscription ", e);
          }
          return result;
      }
      

      Note how the broker path is not set on the ConsumerInfo that is used for the subscription.

      In contrast, a consumer for a dynamic subscription does have its broker path updated to indicate that it is from a remote broker:

      DemandForwardingBridgeSupport.java
      protected void serviceRemoteCommand(Command command) {
          if (!disposed.get()) {
              try {
                  if (command.isMessageDispatch()) {
                      safeWaitUntilStarted();
                      MessageDispatch md = (MessageDispatch) command;
                      serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
                      ackAdvisory(md.getMessage());
      ...
      
      private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
      ...
              if (addConsumerInfo(info)) {
                      if (LOG.isDebugEnabled()) {
                          LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
                      }
                  } else {
                      if (LOG.isDebugEnabled()) {
                          LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
                      }
                  }
              }
      ...
      
      protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
          boolean consumerAdded = false;
          ConsumerInfo info = consumerInfo.copy();
          addRemoteBrokerToBrokerPath(info);
      ...
      

      Because of this difference, a static subscription will be forwarded to new bridges with a null brokerPath while a dynamic subscription to the same queue will be forwarded with a singleton brokerPath. As a result, static subscriptions will be propagated one further hop than their dynamic counterparts. In the case of a network TTL of 1, the static subscription consumers from existing bridges are unexpectedly propagated to new bridges, while the dynamic subscription consumers are correctly suppressed.

      Solution
      ========
      There should be no logical difference between a network consumer created for a static subscription vs. a dynamic subscription. In either case, the bridge creates a consumer on behalf of the remote broker. As such, the consumer for a static subscription should have the remote broker in its broker path because it represents a subscription from that remote broker (even if there is no consumer).

      Attachments

        1. AMQ4148.patch
          0.9 kB
          Stirling Chow
        2. AMQ4148Test.java
          3 kB
          Stirling Chow

        Activity

          People

            Unassigned Unassigned
            stirlingc Stirling Chow
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: