Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-5872

MQTT subscribe fails after unsubscribe

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 5.11.1
    • 5.12.0
    • KahaDB
    • None
    • Windows server 2012

    Description

      We have a problem subscribing after unsubscribing (same topic). The messages are put in a queue, but not delivered to the client (the queue is created on the server).

      We use .NET M2MQTT as our client library.

      Full scenario:

      1) Connect to server.
      2) Subscribe to topic "TEST"
      3) Publish message "TEST 1" to topic "TEST" => message is received
      4) Unsubscribe from topic "TEST"
      5) Subscribe to topic "TEST"
      6) Publish message "TEST 2" to topic "TEST" => message is not received
      7) Subscribe to topic "TEST" => message "TEST 2" is received
      8) Publish message "TEST 3" to topic "TEST" => message is received

      On our server, we see the following error:
      2015-07-02 10:57:05,621 | WARN | Error subscribing to TEST | org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy | ActiveMQ Transport: tcp:///10.2.6.86:56778@8884
      java.lang.NullPointerException
      at org.apache.activemq.store.kahadb.MessageDatabase.addAckLocationForRetroactiveSub(MessageDatabase.java:2220)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.MessageDatabase.updateIndex(MessageDatabase.java:1472)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.MessageDatabase$15.execute(MessageDatabase.java:1207)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.MessageDatabase.process(MessageDatabase.java:1204)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.MessageDatabase$10.visit(MessageDatabase.java:1103)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand.visit(KahaSubscriptionCommand.java:187)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.MessageDatabase.process(MessageDatabase.java:1070)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.MessageDatabase.store(MessageDatabase.java:977)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.MessageDatabase.store(MessageDatabase.java:957)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.addSubscription(KahaDBStore.java:796)[activemq-kahadb-store-5.11.1.jar:5.11.1]
      at org.apache.activemq.store.ProxyTopicMessageStore.addSubscription(ProxyTopicMessageStore.java:98)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.region.Topic.activate(Topic.java:258)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:121)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:160)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:319)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:163)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:427)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:244)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:104)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:102)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:107)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:663)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:348)[activemq-client-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)[activemq-client-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:147)[activemq-mqtt-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:106)[activemq-mqtt-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:173)[activemq-mqtt-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy.doSubscribe(AbstractMQTTSubscriptionStrategy.java:200)[activemq-mqtt-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.mqtt.strategy.MQTTDefaultSubscriptionStrategy.onSubscribe(MQTTDefaultSubscriptionStrategy.java:87)[activemq-mqtt-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.mqtt.strategy.AbstractMQTTSubscriptionStrategy.onSubscribe(AbstractMQTTSubscriptionStrategy.java:108)[activemq-mqtt-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onSubscribe(MQTTProtocolConverter.java:352)[activemq-mqtt-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:204)[activemq-mqtt-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:94)[activemq-mqtt-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)[activemq-client-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)[activemq-client-5.11.1.jar:5.11.1]
      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)[activemq-client-5.11.1.jar:5.11.1]
      at java.lang.Thread.run(Unknown Source)[:1.8.0_31]

      Attachments

        1. MQTTNullPointerTest.java
          4 kB
          Timothy A. Bish
        2. MQTTNullPointerTest.java
          4 kB
          Christopher L. Shannon

        Activity

          People

            tabish Timothy A. Bish
            mathia.vandepoel Mathia Van De Poel
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: