diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 0f62819..73668cd 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -104,6 +104,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("prop") .ofType(classOf[String]) + val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + "start with the earliest message present in the log rather than the latest message.") val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms") @@ -159,6 +160,16 @@ object ConsoleConsumer extends Logging { KafkaMetricsReporter.startReporters(verifiableProps) } + if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && + checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + options.valueOf(groupIdOpt)+ "/offsets")) { + error("Found previous offset information for this group "+options.valueOf(groupIdOpt) + +". Please use --delete-consumer-offsets to delete previous offsets metadata") + System.exit(1) + } + + if(options.has(deleteConsumerOffsetsOpt)) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) + val offsetsStorage = options.valueOf(offsetsStorageOpt) val props = new Properties() props.put("group.id", options.valueOf(groupIdOpt)) @@ -191,14 +202,12 @@ object ConsoleConsumer extends Logging { val connector = Consumer.create(config) - if(options.has(resetBeginningOpt)) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connector.shutdown() // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!options.has(groupIdOpt)) + if(!options.has(groupIdOpt)) ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) } }) @@ -253,20 +262,16 @@ object ConsoleConsumer extends Logging { } } - def tryCleanupZookeeper(zkUrl: String, groupId: String) { + def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { - val dir = "/consumers/" + groupId - info("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - zk.deleteRecursive(dir) - zk.close() + val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); + zk.exists(path) } catch { - case _: Throwable => // swallow + case _: Throwable => false } } } - object MessageFormatter { def tryParseFormatterArgs(args: Iterable[String]): Properties = { val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) @@ -291,7 +296,7 @@ class DefaultMessageFormatter extends MessageFormatter { var printKey = false var keySeparator = "\t".getBytes var lineSeparator = "\n".getBytes - + override def init(props: Properties) { if(props.containsKey("print.key")) printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") @@ -300,7 +305,7 @@ class DefaultMessageFormatter extends MessageFormatter { if(props.containsKey("line.separator")) lineSeparator = props.getProperty("line.separator").getBytes } - + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { if(printKey) { output.write(if (key == null) "null".getBytes() else key)