Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-6863

NPE when expiring messages with FilePendingMessageCursor and durable topic subscriptions

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 5.15.2
    • Fix Version/s: None
    • Component/s: Broker
    • Labels:
      None

      Description

      I am using a file based cursor with durable topic subscriptions because in my
      tests the broker would run out of memory when dealing with large numbers of
      messages without an active consumer.
      I have run into a NullPointerException when the messages meant for the topic
      with an active durable subscription expire. Here is part of the stack trace:

      java.lang.NullPointerException: null
      at org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:586)
      [activemq-broker-5.15.2.jar:5.15.2]
      at org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:810)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.discardExpiredMessage(FilePendingMessageCursor.java:489)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.tryAddMessageLast(FilePendingMessageCursor.java:247)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor.addMessageLast(AbstractPendingMessageCursor.java:93)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:157)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:279)
      [activemq-broker-5.15.2.jar:5.15.2]
      at org.apache.activemq.broker.region.Topic$2.recoverMessage(Topic.java:314)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$6.execute(KahaDBStore.java:1012)
      [activemq-kahadb-store-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779)
      [activemq-kahadb-store-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverSubscription(KahaDBStore.java:999)
      [activemq-kahadb-store-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.store.ProxyTopicMessageStore.recoverSubscription(ProxyTopicMessageStore.java:108)
      [activemq-broker-5.15.2.jar:5.15.2]
      at org.apache.activemq.broker.region.Topic.activate(Topic.java:307)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:123)
      [activemq-broker-5.15.2.jar:5.15.2]
      at org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:164)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.TopicRegion.addSubscriptionsForDestination(TopicRegion.java:287)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:162)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:339)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:239)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:104)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:200)
      [activemq-broker-5.15.2.jar:5.15.2]
      at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189)
      [activemq-broker-5.15.2.jar:5.15.2]
      at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:119)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.BrokerService$6.start(BrokerService.java:2370)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:747)
      [activemq-broker-5.15.2.jar:5.15.2]
      at
      org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:733)
      [activemq-broker-5.15.2.jar:5.15.2]
      at org.apache.activemq.broker.BrokerService.start(BrokerService.java:636)
      [activemq-broker-5.15.2.jar:5.15.2]

      I looked at the code and it seems to me that this is caused by the method
      org.apache.activemq.broker.region.cursors.FilePendingMessageCursor#discardExpiredMessage:

      private void discardExpiredMessage(MessageReference reference) {
      LOG.debug("Discarding expired message {}", reference);
      if (reference.isExpired() && broker.isExpired(reference))

      { ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); context.setBroker(broker); ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); }

      }

      There the subscription passed to Destination#messageExpired is set to null.
      If the destination is a topic, then later in
      org.apache.activemq.broker.region.Topic#acknowledge:

      @Override
      public void acknowledge(ConnectionContext context, Subscription sub,
      final MessageAck ack,
      final MessageReference node) throws IOException {
      if (topicStore != null && node.isPersistent())

      { DurableTopicSubscription dsub = (DurableTopicSubscription) sub; SubscriptionKey key = dsub.getSubscriptionKey(); topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(), convertToNonRangedAck(ack, node)); }

      messageConsumed(context, node);
      }

      The code dsub.getSubscriptionKey() throws an NPE.

      I suspect that this problem also applies to the default org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor which uses org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor internally but so far I was not able to reproduce it.

        Attachments

        1. TestCursors.java
          4 kB
          Tomas Pavelka
        2. TestActiveSubscriptions.java
          6 kB
          Tomas Pavelka
        3. TestHighMemoryWaterMark.java
          11 kB
          Tomas Pavelka

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              tpavelka Tomas Pavelka
            • Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: