diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 24c9287..1a16c69 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -54,47 +54,11 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("urls") .ofType(classOf[String]) - val groupIdOpt = parser.accepts("group", "The group id to consume on.") + + val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") .withRequiredArg - .describedAs("gid") - .defaultsTo("console-consumer-" + new Random().nextInt(100000)) + .describedAs("config file") .ofType(classOf[String]) - val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) - val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(2 * 1024 * 1024) - val socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.SocketTimeout) - val refreshMetadataBackoffMsOpt = parser.accepts("refresh-leader-backoff-ms", "Backoff time before refreshing metadata") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.RefreshMetadataBackoffMs) - val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " + - "of time without incoming messages") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") .withRequiredArg .describedAs("class") @@ -107,11 +71,6 @@ object ConsoleConsumer extends Logging { val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + "start with the earliest message present in the log rather than the latest message.") - val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.AutoCommitInterval) val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") .withRequiredArg .describedAs("num_messages") @@ -124,14 +83,8 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) - val includeInternalTopicsOpt = parser.accepts("include-internal-topics", "Allow consuming internal topics.") - val offsetsStorageOpt = parser.accepts("offsets-storage", "Specify offsets storage backend (kafka/zookeeper).") - .withRequiredArg - .describedAs("Offsets storage method.") - .ofType(classOf[String]) - .defaultsTo("zookeeper") - val dualCommitEnabledOpt = parser.accepts("dual-commit-enabled", "If offsets storage is kafka and this is set, then commit to zookeeper as well.") + var groupIdPassed = true val options: OptionSet = tryParse(parser, args) CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) @@ -160,55 +113,47 @@ object ConsoleConsumer extends Logging { KafkaMetricsReporter.startReporters(verifiableProps) } + + + val consumerProps = if (options.has(consumerConfigOpt)) + Utils.loadProps(options.valueOf(consumerConfigOpt)) + else + new Properties() + + if(!consumerProps.containsKey("group.id")) { + consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) + groupIdPassed=false + } + consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") + consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt)) + if(!consumerProps.containsKey("dual.commit.enabled")) + consumerProps.put("dual.commit.enabled","false") + if(!consumerProps.containsKey("offsets.storage")) + consumerProps.put("offsets.storage","zookeeper") + if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && - checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + options.valueOf(groupIdOpt)+ "/offsets")) { - System.err.println("Found previous offset information for this group "+options.valueOf(groupIdOpt) + checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) { + System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id") +". Please use --delete-consumer-offsets to delete previous offsets metadata") System.exit(1) } if(options.has(deleteConsumerOffsetsOpt)) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) - - val offsetsStorage = options.valueOf(offsetsStorageOpt) - val props = new Properties() - props.put("group.id", options.valueOf(groupIdOpt)) - props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) - props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString) - props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) - props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString) - props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString) - props.put("auto.commit.enable", "true") - props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) - props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") - props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString) - props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString) - props.put("offsets.storage", offsetsStorage) - if (options.has(includeInternalTopicsOpt)) - props.put("exclude.internal.topics", "false") - if (options.has(dualCommitEnabledOpt)) - props.put("dual.commit.enabled", "true") - else - props.put("dual.commit.enabled", "false") + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id")) - val config = new ConsumerConfig(props) + val config = new ConsumerConfig(consumerProps) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false - val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) - val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 - val connector = Consumer.create(config) - Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connector.shutdown() // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!options.has(groupIdOpt)) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) + if(!groupIdPassed) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) } })