Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
Switching from Artemis broker 2.20 to 2.21 we experienced an issue about message delivering.
It looks like MQTT devices don’t receive the messages matching their subscriptions.
The test code (***) run with an Artemis broker 2.19 or 2.20 as target (wildcard addresses modified as (**)) produces the correct output:
waiting for messages Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test === Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH Client: test-client-2 - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test === Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test …
With broker 2.21 or 2.22 (configuration changes described in (*) ) as target the output is:
waiting for messages === Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH === Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH === Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS …
So the broker doesn’t send any message to the clients. May be we missed to configure something needed by 2.21 versions onward?
*:
The 2.21 and 2.22-SNAPSHOT default broker.xml configuration file has changed in this way:
- set the broker name (message-broker)
- removed double connector bound to 1883 (the broker with the default configuration crashed)
- allow only MQTT protocol for connector bound to 1883 port
- removed broadcast connector and configuration
- added custom wildcard configuration (**)
**:
<wildcard-addresses> <routing-enabled>true</routing-enabled> <delimiter>/</delimiter> <any-words>#</any-words> <single-word>+</single-word> </wildcard-addresses>
***:
/******************************************************************************* * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others * * This program and the accompanying materials are made * available under the terms of the Eclipse Public License 2.0 * which is available at https://www.eclipse.org/legal/epl-2.0/ * * SPDX-License-Identifier: EPL-2.0 * * Contributors: * Eurotech - initial API and implementation *******************************************************************************/ package org.eclipse.kapua.qa.common; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestMqttClient { protected static Logger logger = LoggerFactory.getLogger(TestMqttClient.class); private static final String SERVER_URI = "tcp://localhost:1883"; private static final String CLIENT_ID_ADMIN = "test-client-admin"; private static final String CLIENT_ID_1 = "test-client-1"; private static final String CLIENT_ID_2 = "test-client-2"; private static final String USERNAME = "kapua-broker"; private static final String PASSWORD = "kapua-password"; private static final String USERNAME_ADMIN = "kapua-sys"; private static final String PASSWORD_ADMIN = "kapua-password"; private TestMqttClient() { } public static void main(String argv[]) throws MqttException { MqttClient clientAdmin = new MqttClient(SERVER_URI, CLIENT_ID_ADMIN, new MemoryPersistence()); MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new MemoryPersistence()); MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new MemoryPersistence()); clientAdmin.setCallback(new TestMqttClientCallback(CLIENT_ID_ADMIN)); client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1)); client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2)); clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, PASSWORD_ADMIN)); client1.connect(getMqttConnectOptions(USERNAME, PASSWORD)); client2.connect(getMqttConnectOptions(USERNAME, PASSWORD)); System.out.println("waiting for messages"); client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#"); client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#"); clientAdmin.subscribe("#"); client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", new MqttMessage("test".getBytes())); System.out.println("==="); client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes())); System.out.println("==="); client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes())); System.out.println("==="); client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes())); System.out.println("==="); clientAdmin.disconnect(); client1.disconnect(); client2.disconnect(); } private static MqttConnectOptions getMqttConnectOptions(String username, String password) { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(username); options.setPassword(password.toCharArray()); return options; } } class TestMqttClientCallback implements MqttCallback { private String clientId; TestMqttClientCallback(String clientId) { this.clientId = clientId; } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Client: " + clientId + " - Message arrived on topic: " + topic + " - message: " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Client: " + clientId + " - Delivery completed: " + token.getTopics()[0]); } @Override public void connectionLost(Throwable cause) { System.out.println("Client: " + clientId + " - Connection lost: " + cause.getMessage()); cause.printStackTrace(); } }
Attachments
Issue Links
- links to