Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-5390

MQTT pending durable subscriber messages are not delievered after broker restart

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 5.11.0
    • None
    • MQTT
    • None

    Description

      If there are pending messages to be delivered to a subscriber and if the broker is restarted at this point, the pending messages are not delivered to the subscriber when it connects after broker restart.

      I modified existing test case testReceiveMessageSentWhileOffline() and added test case testReceiveMessageSentWhileOfflineAndBrokerRestart() shown below:
      changes:

      • use standalone broker as I was not sure if embedded broker persists messages on permanent store.
      • manually need to restart when test prompts to restart broker
      @Test(timeout = 60 * 1000)
          public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception {
              final byte[] payload = new byte[1024 * 32];
              for (int i = 0; i < payload.length; i++) {
                  payload[i] = '2';
              }
      
              int numberOfRuns = 100;
              int messagesPerRun = 2;
      
              final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
              final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
              mqttPub.setHost("tcp://localhost:1883");
              mqttSub.setHost("tcp://localhost:1883");
      
              final BlockingConnection connectionPub = mqttPub.blockingConnection();
              connectionPub.connect();
      
              BlockingConnection connectionSub = mqttSub.blockingConnection();
              connectionSub.connect();
      
              Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
              connectionSub.subscribe(topics);
      
              for (int i = 0; i < messagesPerRun; ++i) {
                  connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
              }
      
              int received = 0;
              for (int i = 0; i < messagesPerRun; ++i) {
                  Message message = connectionSub.receive(5, TimeUnit.SECONDS);
                  assertNotNull(message);
                  received++;
                  assertTrue(Arrays.equals(payload, message.getPayload()));
                  message.ack();
              }
              connectionSub.disconnect();
      
              for (int j = 0; j < numberOfRuns; j++) {
      
                  for (int i = 0; i < messagesPerRun; ++i) {
                      connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
                  }
                  
                  System.out.println("Restart broker here.....");
      
                  Thread.sleep(30000);
                  
                  connectionSub = mqttSub.blockingConnection();
                  connectionSub.connect();
                  connectionSub.subscribe(topics);
      
                  for (int i = 0; i < messagesPerRun; ++i) {
                      Message message = connectionSub.receive(5, TimeUnit.SECONDS);
                      assertNotNull(message);
                      received++;
                      assertTrue(Arrays.equals(payload, message.getPayload()));
                      message.ack();
                  }
                  connectionSub.disconnect();
              }
              assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
          }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            workanandr AR
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: