Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-5298

MQTT Transport can generate class cast exception when subscription is to a Virtual Topic

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.10.0
    • 5.11.0
    • MQTT
    • None

    Description

      When a client subscribes to a Virtual Topic and sends a duplicate subscription request the method that resends the old retained messages tries to cast the destination from the TopicRegion is finds to a Topic but in this case it would be a VirtualTopicIntercepter and an exception is thrown disconnecting the client.

      Attachments

        Issue Links

          Activity

            The test to demonstrate this is a modification of testRetainedMessage in MQTTTest.java

                @Test(timeout = 120 * 1000)
                public void testRetainedMessageOnVirtualTopic() throws Exception {
                    MQTT mqtt = createMQTTConnection();
                    mqtt.setKeepAlive((short) 2);
            
                    final String RETAIN = "RETAIN";
                    final String TOPICA = "VirtualTopic/TopicA";
            
                    final String[] clientIds = { null, "foo", "durable" };
                    for (String clientId : clientIds) {
            
                        LOG.info("Running test loop with Client ID: {}", clientId);
            
                        mqtt.setClientId(clientId);
                        mqtt.setCleanSession(!"durable".equals(clientId));
            
                        BlockingConnection connection = mqtt.blockingConnection();
                        connection.connect();
            
                        // set retained message and check
                        connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
                        connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
                        Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
                        assertNotNull("No retained message for " + clientId, msg);
                        assertEquals(RETAIN, new String(msg.getPayload()));
                        msg.ack();
                        assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
            
                        // test duplicate subscription
                        connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
                        msg = connection.receive(15000, TimeUnit.MILLISECONDS);
                        assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
                        assertEquals(RETAIN, new String(msg.getPayload()));
                        msg.ack();
                        assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
                        connection.unsubscribe(new String[]{"TopicA"});
            
                        // clear retained message and check that we don't receive it
                        connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
                        connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
                        msg = connection.receive(500, TimeUnit.MILLISECONDS);
                        assertNull("Retained message not cleared for " + clientId, msg);
                        connection.unsubscribe(new String[]{"TopicA"});
            
                        // set retained message again and check
                        connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
                        connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
                        msg = connection.receive(5000, TimeUnit.MILLISECONDS);
                        assertNotNull("No reset retained message for " + clientId, msg);
                        assertEquals(RETAIN, new String(msg.getPayload()));
                        msg.ack();
                        assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
            
                        // re-connect and check
                        connection.disconnect();
                        connection = mqtt.blockingConnection();
                        connection.connect();
                        connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
                        msg = connection.receive(5000, TimeUnit.MILLISECONDS);
                        assertNotNull("No reset retained message for " + clientId, msg);
                        assertEquals(RETAIN, new String(msg.getPayload()));
                        msg.ack();
                        assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
            
                        connection.unsubscribe(new String[]{"TopicA"});
            
                        connection.disconnect();
                    }
                }
            
            tabish Timothy A. Bish added a comment - The test to demonstrate this is a modification of testRetainedMessage in MQTTTest.java @Test(timeout = 120 * 1000) public void testRetainedMessageOnVirtualTopic() throws Exception { MQTT mqtt = createMQTTConnection(); mqtt.setKeepAlive(( short ) 2); final String RETAIN = "RETAIN" ; final String TOPICA = "VirtualTopic/TopicA" ; final String [] clientIds = { null , "foo" , "durable" }; for ( String clientId : clientIds) { LOG.info( "Running test loop with Client ID: {}" , clientId); mqtt.setClientId(clientId); mqtt.setCleanSession(! "durable" .equals(clientId)); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); // set retained message and check connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true ); connection.subscribe( new Topic[]{ new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull( "No retained message for " + clientId, msg); assertEquals(RETAIN, new String (msg.getPayload())); msg.ack(); assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); // test duplicate subscription connection.subscribe( new Topic[]{ new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); msg = connection.receive(15000, TimeUnit.MILLISECONDS); assertNotNull( "No retained message on duplicate subscription for " + clientId, msg); assertEquals(RETAIN, new String (msg.getPayload())); msg.ack(); assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); connection.unsubscribe( new String []{ "TopicA" }); // clear retained message and check that we don't receive it connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true ); connection.subscribe( new Topic[]{ new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); msg = connection.receive(500, TimeUnit.MILLISECONDS); assertNull( "Retained message not cleared for " + clientId, msg); connection.unsubscribe( new String []{ "TopicA" }); // set retained message again and check connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true ); connection.subscribe( new Topic[]{ new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull( "No reset retained message for " + clientId, msg); assertEquals(RETAIN, new String (msg.getPayload())); msg.ack(); assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); // re-connect and check connection.disconnect(); connection = mqtt.blockingConnection(); connection.connect(); connection.subscribe( new Topic[]{ new Topic(TOPICA, QoS.AT_LEAST_ONCE)}); msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull( "No reset retained message for " + clientId, msg); assertEquals(RETAIN, new String (msg.getPayload())); msg.ack(); assertNull(connection.receive(500, TimeUnit.MILLISECONDS)); connection.unsubscribe( new String []{ "TopicA" }); connection.disconnect(); } }
            tabish Timothy A. Bish added a comment - - edited

            This patch fixes the crash but there are issues with the retained messages because it's a VurtualTopic.

            diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
            index cc51ce7..af5f003 100644
            --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
            +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
            @@ -40,6 +40,7 @@
             import org.apache.activemq.broker.region.Subscription;
             import org.apache.activemq.broker.region.TopicRegion;
             import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
            +import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
             import org.apache.activemq.command.ActiveMQBytesMessage;
             import org.apache.activemq.command.ActiveMQDestination;
             import org.apache.activemq.command.ActiveMQMapMessage;
            @@ -503,7 +504,11 @@
                         for (Subscription subscription : dest.getConsumers()) {
                             if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
                                 try {
            -                        ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
            +                        if (dest instanceof org.apache.activemq.broker.region.Topic) {
            +                            ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
            +                        } else if (dest instanceof VirtualTopicInterceptor) {
            +                            ((VirtualTopicInterceptor)dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription);
            +                        }
                                     if (subscription instanceof PrefetchSubscription) {
                                         // request dispatch for prefetch subs
                                         PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
            
            tabish Timothy A. Bish added a comment - - edited This patch fixes the crash but there are issues with the retained messages because it's a VurtualTopic. diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index cc51ce7..af5f003 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -40,6 +40,7 @@ import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMapMessage; @@ -503,7 +504,11 @@ for (Subscription subscription : dest.getConsumers()) { if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) { try { - ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); + if (dest instanceof org.apache.activemq.broker.region.Topic) { + ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); + } else if (dest instanceof VirtualTopicInterceptor) { + ((VirtualTopicInterceptor)dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription); + } if (subscription instanceof PrefetchSubscription) { // request dispatch for prefetch subs PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;

            Fixed the NPE scenario. The issue with retained messages is not really related to this code so a separate issue will be opened to cover that.

            tabish Timothy A. Bish added a comment - Fixed the NPE scenario. The issue with retained messages is not really related to this code so a separate issue will be opened to cover that.

            People

              Unassigned Unassigned
              tabish Timothy A. Bish
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: