Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
5.8.0
-
None
-
None
Description
The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected.
The exceptions thrown:
java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
java.lang.ArrayIndexOutOfBoundsException: 0 at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
java.net.ProtocolException: Unexpected MQTT command type: 0 at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
No message is shown in the server. The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine.
Code sample (publisher, permanently running):
MQTT mqtt = new MQTT(); mqtt.setHost(url); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setClientId("test_id"); int i = 0; while (true) { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); String message = "TestMessage: " + i; connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false); System.out.println("Vendor: Sent message."); Thread.sleep(2500); connection.disconnect(); Thread.sleep(2500); i++; }
Code sample (subscriber, fails multiple times when restarting after the connection is closed):
BlockingConnection connection = null; try { MQTT = new MQTT(); mqtt.setHost(url); mqtt.setClientId(clientId); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setCleanSession(false); connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)}; byte[] qoses = connection.subscribe(topics); int numMessages = 1; while (numMessages % 10 != 0) { Message message = connection.receive(); byte[] payload = message.getPayload(); String messageContent = new String(payload); System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent); message.ack(); numMessages++; } } finally { if(connection != null) { try { connection.disconnect(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly