Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-9592

Incorrect MQTTv3.1.1 behaviour: QoS2 messages were delivered to unintended consumers

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 5.18.5, 6.1.3
    • 6.2.0, 5.18.7, 6.1.5
    • MQTT
    • None

    Description

      To replicate:

      • Run a subscriber with QOS2 to a topic with client ID say "1"and stop it
      • Publish 5 messages to the topic, do not consume these yet.
      • Run another subscriber with QOS2 to the topic with a different client ID say "2'.
      • This subscriber with different cliend ID receives 5 messages that were intended for the clientID "1". However, this does not mean that client ID "1" will not receive the 5 messages when it comes back online. I noticed that client ID "1" also received the 5 message.

      Now that both clientID 1 and 2 received messages, subscribing a new clientID doesn't receive these message. However, if we send 5 message again to the topic and do not consume it for all clientIDs, it would delivered to any new subscription.

      For new client Id broker must create new session, so messages sent after client 1 disconnected should route to client 1 only since client 2 because it was connected after those 5 messages were sent (hence the new session of client 2 shouldn't contain old messages).

      Here are sample codes to reproduce:

      The producer Python script

      import paho.mqtt.client as mqttClient
      import time
      import ssl
      
      # context = ssl.create_default_context()
      
      Connected = False 
      broker_address= "localhost" 
      port = 1883
      user = "admin"
      password = "admin"
      
      def on_connect(client, userdata, flags, rc):
      
          if rc == 0:
              print("Connected to broker")
              global Connected                #Use global variable
              Connected = True                #Signal connection  
          else:
              print("Connection failed")
      
      client = mqttClient.Client(client_id="producer",transport="tcp") #
      client.username_pw_set(user, password=password) #set username and password
      client.on_connect=on_connect
      # client.tls_set_context(context=context)
      client.connect(broker_address, port=port)
      client.loop_start()
      i=0
      while True:
          time.sleep(1)
          i=i+1
          client.publish("test/data", "This is my test msg 123.",2,False)
          print("Sent"+str(i))
      

      The consumer Python script

      import paho.mqtt.client as mqttClient
      import time
      import ssl
      
      Connected = False 
      broker_address= "localhost" 
      port = 1883
      user = "admin"
      password = "admin"
      
      def on_connect(client, userdata, flags, rc):
      
          if rc == 0:
              print("Connected to broker")
              global Connected                #Use global variable
              Connected = True                #Signal connection  
              print(flags)
              client.subscribe("test/data",2)
          else:
              print("Connection failed")
      
      def on_message(client, userdata, msg):
          print(msg.topic+" "+str(msg.payload))
      
      client = mqttClient.Client(client_id="id2",transport="tcp",clean_session=False) #
      client.username_pw_set(user, password=password) #set username and password
      client.on_connect=on_connect
      client.on_message=on_message
      # client.tls_set_context(context=context)
      client.connect(broker_address, port=port)
      client.loop_start()
      
      while Connected != True:
          time.sleep(0.1)
      
      time.sleep(1000)
      

      Attachments

        Activity

          People

            jbonofre Jean-Baptiste Onofré
            kenliao Ken Liao
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: