diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index a15b350..ec7e4be 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -385,8 +385,10 @@ public class KafkaMigrationTool { try{ while(true) { KeyedMessage data = producerDataChannel.receiveRequest(); - if(!data.equals(shutdownMessage)) + if(!data.equals(shutdownMessage)) { producer.send(data); + logger.debug("Sending message %s".format(new String(data.message()))); + } else break; } @@ -410,6 +412,7 @@ public class KafkaMigrationTool { public void awaitShutdown() { try { shutdownComplete.await(); + producer.close(); logger.info("Producer thread " + threadName + " shutdown complete"); } catch(InterruptedException ie) { logger.warn("Interrupt during shutdown of ProducerThread", ie);