Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-4576

MQTT BlockingConnection.receive fails when subscribing multiple topics

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.8.0
    • 5.9.0
    • None
    • None

    Description

      When more than one topic is supplied to BlockingConnection.subscribe the BlockingConnection.receive fails and the following exception is thrown:

      java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
      	at org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
      	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
      	at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
      	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
      	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
      	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
      	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
      

      On the server shows the following messages:

      2013-06-06 15:06:00,125 WARN  [org.apache.activemq.transport.mqtt.MQTTProtocolConverter] (ActiveMQ BrokerService[localhost] Task-1) Exception occurred processing: 
      null: javax.jms.JMSException: Durable consumer is in use for client: 6056@3232261834SOC and subscriptionName: 6056@3232261834SOC
      2013-06-06 15:06:00,130 WARN  [org.apache.activemq.broker.TransportConnection] (ActiveMQ Transport: tcp:///127.0.0.1:53389@1883) Failed to add Connection ID:LTD-SFW004-53303-1370527418664-2:14, reason: javax.jms.InvalidClientIDException: Broker: localhost - Client: 6056@3232261834SOC already connected from tcp://127.0.0.1:53388
      2013-06-06 15:06:00,130 WARN  [org.apache.activemq.broker.TransportConnection.Transport] (ActiveMQ Transport: tcp:///127.0.0.1:53389@1883) Transport Connection to: tcp://127.0.0.1:53389 failed: java.io.IOException: Broker: localhost - Client: 6056@3232261834SOC already connected from tcp://127.0.0.1:53388
      2013-06-06 15:06:00,130 ERROR [pt.intellicare.onecare.mqtt.OneCareFuseMqttClient] (DefaultQuartzScheduler_Worker-8) Problem receiving mqtt messages: java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
      	at org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331) [:1.5-SNAPSHOT]
      	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) [:1.17]
      	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) [:1.17]
      	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) [:1.17]
      	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) [:1.17]
      	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) [:1.17]
      

      Code example:

      MQTT = new MQTT();
      mqtt.setHost(url);
      mqtt.setClientId(clientId);
      mqtt.setUserName(user);
      mqtt.setPassword(password);
      mqtt.setCleanSession(false);
      
      BlockingConnection connection = mqtt.blockingConnection();
      connection.connect();
      Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", QoS.EXACTLY_ONCE)};
      byte[] qoses = connection.subscribe(topics);
      while (true) {
          Message message = connection.receive();
          byte[] payload = message.getPayload();
          String messageContent = new String(payload);
          System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent);
          message.ack();
      }
      

      The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly.

      Attachments

        Activity

          People

            tabish Timothy A. Bish
            pmarques Pedro Marques
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: