diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index e455cb9..0ac0640 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -83,6 +83,10 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) + val statusIntervalOpt = parser.accepts("print-every", "Prints the number of messages consumed every specified message interval") + .withOptionalArg + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") @@ -156,6 +160,7 @@ object ConsoleConsumer extends Logging { var numMessages = 0L val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) + val statusInterval = options.valueOf(statusIntervalOpt) try { val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) val iter = if(maxMessages >= 0) @@ -167,6 +172,8 @@ object ConsoleConsumer extends Logging { try { formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) numMessages += 1 + if (statusInterval > 0 && numMessages % statusInterval == 0) + System.err.println("Consumed %d messages".format(numMessages)); } catch { case e: Throwable => if (skipMessageOnError)