@Test(timeout = 120 * 1000)
public void testRetainedMessageOnVirtualTopic() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive((short) 2);
final String RETAIN = "RETAIN";
final String TOPICA = "VirtualTopic/TopicA";
final String[] clientIds = { null, "foo", "durable" };
for (String clientId : clientIds) {
LOG.info("Running test loop with Client ID: {}", clientId);
mqtt.setClientId(clientId);
mqtt.setCleanSession(!"durable".equals(clientId));
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("No retained message for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(15000, TimeUnit.MILLISECONDS);
assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
connection.unsubscribe(new String[]{"TopicA"});
connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(500, TimeUnit.MILLISECONDS);
assertNull("Retained message not cleared for " + clientId, msg);
connection.unsubscribe(new String[]{"TopicA"});
connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("No reset retained message for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
connection.disconnect();
connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull("No reset retained message for " + clientId, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
connection.unsubscribe(new String[]{"TopicA"});
connection.disconnect();
}
}
The test to demonstrate this is a modification of testRetainedMessage in MQTTTest.java