diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index 7f0d1ce..95fbe46 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -134,6 +134,13 @@ public class KafkaMigrationTool { .describedAs("Java regex (String)") .ofType(String.class); + ArgumentAcceptingOptionSpec queueSizeOpt + = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer") + .withRequiredArg() + .describedAs("Queue size in terms of number of messages") + .ofType(Integer.class) + .defaultsTo(10000); + OptionSpecBuilder helpOpt = parser.accepts("help", "Print this message."); @@ -212,7 +219,8 @@ public class KafkaMigrationTool { kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08)); kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); // create a producer channel instead - ProducerDataChannel> producerDataChannel = new ProducerDataChannel>(numProducers); + int queueSize = options.valueOf(queueSizeOpt); + ProducerDataChannel> producerDataChannel = new ProducerDataChannel>(queueSize); int threadId = 0; Runtime.getRuntime().addShutdownHook(new Thread() {