Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-4392

MQTT BlockingConnection disconnect doesn't disconnects the client connection

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.8.0
    • 5.9.0
    • None
    • None

    Description

      The disconnect method doesn't work (especially when a client id is supplied).

      If the connection object is reused, the client throws the following exception:

      java.lang.IllegalStateException: Already connected
      	at org.fusesource.mqtt.client.CallbackConnection.connect(CallbackConnection.java:109)
      	at org.fusesource.mqtt.client.FutureConnection$2.run(FutureConnection.java:94)
      	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:96)
      	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
      

      If the connection object is not reused and the client id is, the client throws the following exception:

      java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
      	at org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:313)
      	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:660)
      	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
      	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:226)
      	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:96)
      	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
      

      and, the server logs the following messages:

      2013-03-20 11:36:04,893 WARN  [org.apache.activemq.broker.TransportConnection] (ActiveMQ Transport: tcp:///192.168.205.2:59401@1883) Failed to add Connection ID:TestServer-58505-1363685123521-2:34, reason: javax.jms.InvalidClientIDException: Broker: localhost - Client: test_id already connected from tcp://192.168.205.2:59398
      2013-03-20 11:36:04,893 WARN  [org.apache.activemq.broker.TransportConnection.Transport] (ActiveMQ Transport: tcp:///192.168.205.2:59401@1883) Transport Connection to: tcp://192.168.205.2:59401 failed: java.io.IOException: Broker: localhost - Client: test_id already connected from tcp://192.168.205.2:59398
      2013-03-20 11:37:59,867 WARN  [org.apache.activemq.broker.TransportConnection.Transport] (MQTTInactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@c7892e[State = 0, empty queue]) Transport Connection to: tcp://192.168.205.2:59398 failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>45000) long: tcp://192.168.205.2:59398
      

      Code example (reusing connection):

      MQTT mqtt = new MQTT();
      mqtt.setHost(url);
      mqtt.setUserName(user);
      mqtt.setPassword(password);
      BlockingConnection connection = mqtt.blockingConnection();
      
      int i = 0;
      while (true) {
      	connection.connect();
      	String message = "TestMessage: " + i;
      	connection.publish("VendorOrderTopic", message.getBytes(), QoS.AT_LEAST_ONCE, false);
      	System.out.println("Vendor: Sent message.");
      
      	Thread.sleep(2500);
      	connection.disconnect();
      	Thread.sleep(2500);
      	i++;
      }
      

      Code example (multiple connections):

      MQTT mqtt = new MQTT();
      mqtt.setHost(url);
      mqtt.setUserName(user);
      mqtt.setPassword(password);
      mqtt.setClientId("test_id");
      
      int i = 0;
      while (true) {
      	BlockingConnection connection = mqtt.blockingConnection();
      	connection.connect();
      	String message = "TestMessage: " + i;
      	connection.publish("VendorOrderTopic", message.getBytes(), QoS.AT_LEAST_ONCE, false);
      	System.out.println("Vendor: Sent message.");
      
      	Thread.sleep(2500);
      	connection.disconnect();
      	Thread.sleep(2500);
      	i++;
      }
      

      This problem also occurs when using the eclipse paho client API.

      Attachments

        Activity

          People

            tabish Timothy A. Bish
            pmarques Pedro Marques
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: