Uploaded image for project: 'ActiveMQ Artemis'
  1. ActiveMQ Artemis
  2. ARTEMIS-3801

Not getting messages on MQTT subscriptions with $

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 2.22.0
    • None
    • None

    Description

      Switching from Artemis broker 2.20 to 2.21 we experienced an issue about message delivering.
      It looks like MQTT devices don’t receive the messages matching their subscriptions.

      The test code (***) run with an Artemis broker 2.19 or 2.20 as target (wildcard addresses modified as (**)) produces the correct output:

      waiting for messages
      Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
      Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
      Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
      ===
      Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
      Client: test-client-2 - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
      Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
      ===
      Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
      Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
      Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
      …

      With broker 2.21 or 2.22 (configuration changes described in (*) ) as target the output is:

      waiting for messages
      ===
      Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
      ===
      Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
      ===
      Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
      …

      So the broker doesn’t send any message to the clients. May be we missed to configure something needed by 2.21 versions onward?

      *:
      The 2.21 and 2.22-SNAPSHOT default broker.xml configuration file has changed in this way:

      • set the broker name (message-broker)
      • removed double connector bound to 1883 (the broker with the default configuration crashed)
      • allow only MQTT protocol for connector bound to 1883 port
      • removed broadcast connector and configuration
      • added custom wildcard configuration (**)

      **:

       <wildcard-addresses>
          <routing-enabled>true</routing-enabled>
          <delimiter>/</delimiter>
          <any-words>#</any-words>
          <single-word>+</single-word>
      </wildcard-addresses>

      ***:

      /*******************************************************************************
      * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
      *
      * This program and the accompanying materials are made
      * available under the terms of the Eclipse Public License 2.0
      * which is available at https://www.eclipse.org/legal/epl-2.0/
      *
      * SPDX-License-Identifier: EPL-2.0
      *
      * Contributors:
      *     Eurotech - initial API and implementation
      *******************************************************************************/
      package org.eclipse.kapua.qa.common;
      
      import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
      import org.eclipse.paho.client.mqttv3.MqttCallback;
      import org.eclipse.paho.client.mqttv3.MqttClient;
      import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      import org.eclipse.paho.client.mqttv3.MqttException;
      import org.eclipse.paho.client.mqttv3.MqttMessage;
      import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class TestMqttClient {
      
          protected static Logger logger = LoggerFactory.getLogger(TestMqttClient.class);
      
          private static final String SERVER_URI = "tcp://localhost:1883";
          private static final String CLIENT_ID_ADMIN = "test-client-admin";
          private static final String CLIENT_ID_1 = "test-client-1";
          private static final String CLIENT_ID_2 = "test-client-2";
          private static final String USERNAME = "kapua-broker";
          private static final String PASSWORD = "kapua-password";
          private static final String USERNAME_ADMIN = "kapua-sys";
          private static final String PASSWORD_ADMIN = "kapua-password";
      
          private TestMqttClient() {
          }
      
          public static void main(String argv[]) throws MqttException {
              MqttClient clientAdmin = new MqttClient(SERVER_URI, CLIENT_ID_ADMIN, new MemoryPersistence());
              MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new MemoryPersistence());
              MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new MemoryPersistence());
              clientAdmin.setCallback(new TestMqttClientCallback(CLIENT_ID_ADMIN));
              client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
              client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
      
              clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, PASSWORD_ADMIN));
              client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
              client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
              System.out.println("waiting for messages");
              client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
              client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
              clientAdmin.subscribe("#");
      
              client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
              System.out.println("===");
              client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
              System.out.println("===");
              clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
              System.out.println("===");
              clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
              System.out.println("===");
      
              client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
              System.out.println("===");
              client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
              System.out.println("===");
              clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
              System.out.println("===");
              clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
              System.out.println("===");
      
              clientAdmin.disconnect();
              client1.disconnect();
              client2.disconnect();
          }
      
          private static MqttConnectOptions getMqttConnectOptions(String username, String password) {
              MqttConnectOptions options = new MqttConnectOptions();
              options.setCleanSession(true);
              options.setUserName(username);
              options.setPassword(password.toCharArray());
              return options;
          }
      }
      
      class TestMqttClientCallback implements MqttCallback {
      
          private String clientId;
      
          TestMqttClientCallback(String clientId) {
              this.clientId = clientId;
          }
      
          @Override
          public void messageArrived(String topic, MqttMessage message) throws Exception {
              System.out.println("Client: " + clientId + " - Message arrived on topic: " + topic + " - message: " + new String(message.getPayload()));
          }
      
          @Override
          public void deliveryComplete(IMqttDeliveryToken token) {
              System.out.println("Client: " + clientId + " - Delivery completed: " + token.getTopics()[0]);
          }
      
          @Override
          public void connectionLost(Throwable cause) {
              System.out.println("Client: " + clientId + " - Connection lost: " + cause.getMessage());
              cause.printStackTrace();
          }
      }

      Attachments

        Issue Links

          Activity

            People

              jbertram Justin Bertram
              jbertram Justin Bertram
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m