Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
5.8.0
-
None
-
None
Description
When more than one topic is supplied to BlockingConnection.subscribe the BlockingConnection.receive fails and the following exception is thrown:
java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE at org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
On the server shows the following messages:
2013-06-06 15:06:00,125 WARN [org.apache.activemq.transport.mqtt.MQTTProtocolConverter] (ActiveMQ BrokerService[localhost] Task-1) Exception occurred processing: null: javax.jms.JMSException: Durable consumer is in use for client: 6056@3232261834SOC and subscriptionName: 6056@3232261834SOC 2013-06-06 15:06:00,130 WARN [org.apache.activemq.broker.TransportConnection] (ActiveMQ Transport: tcp:///127.0.0.1:53389@1883) Failed to add Connection ID:LTD-SFW004-53303-1370527418664-2:14, reason: javax.jms.InvalidClientIDException: Broker: localhost - Client: 6056@3232261834SOC already connected from tcp://127.0.0.1:53388 2013-06-06 15:06:00,130 WARN [org.apache.activemq.broker.TransportConnection.Transport] (ActiveMQ Transport: tcp:///127.0.0.1:53389@1883) Transport Connection to: tcp://127.0.0.1:53389 failed: java.io.IOException: Broker: localhost - Client: 6056@3232261834SOC already connected from tcp://127.0.0.1:53388 2013-06-06 15:06:00,130 ERROR [pt.intellicare.onecare.mqtt.OneCareFuseMqttClient] (DefaultQuartzScheduler_Worker-8) Problem receiving mqtt messages: java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE at org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331) [:1.5-SNAPSHOT] at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) [:1.17] at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) [:1.17] at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) [:1.17] at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) [:1.17] at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) [:1.17]
Code example:
MQTT = new MQTT(); mqtt.setHost(url); mqtt.setClientId(clientId); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setCleanSession(false); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", QoS.EXACTLY_ONCE)}; byte[] qoses = connection.subscribe(topics); while (true) { Message message = connection.receive(); byte[] payload = message.getPayload(); String messageContent = new String(payload); System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent); message.ack(); }
The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly.