diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 0f62819..6cccd16 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -104,6 +104,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("prop") .ofType(classOf[String]) + val deleteConsumerPathOpt = parser.accepts("delete-consumer-path", "deletes zookeeper consumers dir"); val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + "start with the earliest message present in the log rather than the latest message.") val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms") @@ -191,14 +192,14 @@ object ConsoleConsumer extends Logging { val connector = Consumer.create(config) - if(options.has(resetBeginningOpt)) + if(options.has(deleteConsumerPathOpt)) ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connector.shutdown() // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!options.has(groupIdOpt)) + if(!options.has(groupIdOpt)) ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) } }) @@ -291,7 +292,7 @@ class DefaultMessageFormatter extends MessageFormatter { var printKey = false var keySeparator = "\t".getBytes var lineSeparator = "\n".getBytes - + override def init(props: Properties) { if(props.containsKey("print.key")) printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") @@ -300,7 +301,7 @@ class DefaultMessageFormatter extends MessageFormatter { if(props.containsKey("line.separator")) lineSeparator = props.getProperty("line.separator").getBytes } - + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { if(printKey) { output.write(if (key == null) "null".getBytes() else key)