diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 7e84043..d6c4a51 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -290,6 +290,11 @@ class DefaultMessageFormatter extends MessageFormatter { } } +class NoOpMessageFormatter extends MessageFormatter { + override def init(props: Properties) {} + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {} +} + class ChecksumMessageFormatter extends MessageFormatter { private var topicStr: String = _ diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 8f274df..3cfa384 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -84,6 +84,11 @@ object SimpleConsumerShell extends Logging { .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) + val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume") + .withRequiredArg + .describedAs("max-messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Integer.MAX_VALUE) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", @@ -105,6 +110,7 @@ object SimpleConsumerShell extends Logging { val fetchSize = options.valueOf(fetchSizeOpt).intValue val clientId = options.valueOf(clientIdOpt).toString val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() + val maxMessages = options.valueOf(maxMessagesOpt).intValue val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val printOffsets = if(options.has(printOffsetOpt)) true else false @@ -181,14 +187,16 @@ object SimpleConsumerShell extends Logging { val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) - info("Starting simple consumer shell to partition [%s, %d], replica [%d], host and port: [%s, %d], from offset [%d]" - .format(topic, partitionId, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) + val replicaString = if(replicaId > 0) "leader" else "replica" + info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]" + .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId) val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { def run() { var offset = startingOffset + var numMessagesConsumed = 0 try { - while(true) { + while(numMessagesConsumed < maxMessages) { val fetchRequest = fetchRequestBuilder .addFetch(topic, partitionId, offset, fetchSize) .build() @@ -199,7 +207,7 @@ object SimpleConsumerShell extends Logging { return } debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset) - for(messageAndOffset <- messageSet) { + for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) { try { offset = messageAndOffset.nextOffset if(printOffsets) @@ -207,6 +215,7 @@ object SimpleConsumerShell extends Logging { val message = messageAndOffset.message val key = if(message.hasKey) Utils.readBytes(message.key) else null formatter.writeTo(key, Utils.readBytes(message.payload), System.out) + numMessagesConsumed += 1 } catch { case e => if (skipMessageOnError) @@ -226,6 +235,8 @@ object SimpleConsumerShell extends Logging { } catch { case e: Throwable => error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e) + }finally { + info("Consumed " + numMessagesConsumed + " messages") } } }, false)