Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
5.11.0
-
None
-
None
Description
If there are pending messages to be delivered to a subscriber and if the broker is restarted at this point, the pending messages are not delivered to the subscriber when it connects after broker restart.
I modified existing test case testReceiveMessageSentWhileOffline() and added test case testReceiveMessageSentWhileOfflineAndBrokerRestart() shown below:
changes:
- use standalone broker as I was not sure if embedded broker persists messages on permanent store.
- manually need to restart when test prompts to restart broker
@Test(timeout = 60 * 1000) public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception { final byte[] payload = new byte[1024 * 32]; for (int i = 0; i < payload.length; i++) { payload[i] = '2'; } int numberOfRuns = 100; int messagesPerRun = 2; final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true); final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false); mqttPub.setHost("tcp://localhost:1883"); mqttSub.setHost("tcp://localhost:1883"); final BlockingConnection connectionPub = mqttPub.blockingConnection(); connectionPub.connect(); BlockingConnection connectionSub = mqttSub.blockingConnection(); connectionSub.connect(); Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) }; connectionSub.subscribe(topics); for (int i = 0; i < messagesPerRun; ++i) { connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false); } int received = 0; for (int i = 0; i < messagesPerRun; ++i) { Message message = connectionSub.receive(5, TimeUnit.SECONDS); assertNotNull(message); received++; assertTrue(Arrays.equals(payload, message.getPayload())); message.ack(); } connectionSub.disconnect(); for (int j = 0; j < numberOfRuns; j++) { for (int i = 0; i < messagesPerRun; ++i) { connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false); } System.out.println("Restart broker here....."); Thread.sleep(30000); connectionSub = mqttSub.blockingConnection(); connectionSub.connect(); connectionSub.subscribe(topics); for (int i = 0; i < messagesPerRun; ++i) { Message message = connectionSub.receive(5, TimeUnit.SECONDS); assertNotNull(message); received++; assertTrue(Arrays.equals(payload, message.getPayload())); message.ack(); } connectionSub.disconnect(); } assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received); }