Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-4585

MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 5.8.0
    • 5.10.0
    • None
    • None

    Description

      The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected.
      The exceptions thrown:

      java.net.ProtocolException: Command from server contained an invalid message id: 1
      	at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
      	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
      	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
      	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
      	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
      	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
      	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
      	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
      	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
      	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
      
      java.lang.ArrayIndexOutOfBoundsException: 0
      	at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
      	at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
      	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
      	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
      	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
      	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
      	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
      	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
      	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
      	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
      	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
      
      java.net.ProtocolException: Unexpected MQTT command type: 0
      	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
      	at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
      	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
      	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
      	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
      	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
      	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
      	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
      	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
      

      No message is shown in the server. The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine.
      Code sample (publisher, permanently running):

      MQTT mqtt = new MQTT();
      mqtt.setHost(url);
      mqtt.setUserName(user);
      mqtt.setPassword(password);
      mqtt.setClientId("test_id");
      
      int i = 0;
      while (true) {
      	BlockingConnection connection = mqtt.blockingConnection();
      	connection.connect();
      	String message = "TestMessage: " + i;
      	connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false);
      	System.out.println("Vendor: Sent message.");
      
      	Thread.sleep(2500);
      	connection.disconnect();
      	Thread.sleep(2500);
      	i++;
      }
      

      Code sample (subscriber, fails multiple times when restarting after the connection is closed):

      BlockingConnection connection = null;
      try {
          MQTT = new MQTT();
          mqtt.setHost(url);
          mqtt.setClientId(clientId);
          mqtt.setUserName(user);
          mqtt.setPassword(password);
          mqtt.setCleanSession(false);
      
          connection = mqtt.blockingConnection();
          connection.connect();
          Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
          byte[] qoses = connection.subscribe(topics);
          int numMessages = 1;
          while (numMessages % 10 != 0) {
              Message message = connection.receive();
              byte[] payload = message.getPayload();
              String messageContent = new String(payload);
              System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent);
              message.ack();
              numMessages++;
          }
      } finally {
          if(connection != null) {
              try {
                  connection.disconnect();
              } catch (Exception e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }
          }
      }
      

      The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly

      Attachments

        1. MQTTTest.java
          9 kB
          Pedro Marques

        Activity

          People

            Unassigned Unassigned
            pmarques Pedro Marques
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: