diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 8953640..6cecfe3 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -163,6 +163,7 @@ object ConsoleConsumer extends Logging { val props = new Properties props.putAll(config.consumerProps) + props.putAll(config.extraConsumerProps) props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest") props.put("zookeeper.connect", config.zkConnectionStr) @@ -185,6 +186,7 @@ object ConsoleConsumer extends Logging { val props = new Properties props.putAll(config.consumerProps) + props.putAll(config.extraConsumerProps) props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer != null) config.keyDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer") @@ -212,7 +214,11 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("urls") .ofType(classOf[String]) - val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + val consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") + .withRequiredArg + .describedAs("consumer_prop") + .ofType(classOf[String]) + val consumerConfigOpt = parser.accepts("consumer.config", s"Consumer config properties file. Note that ${consumerPropertyOpt} takes precedence over this config.") .withRequiredArg .describedAs("config file") .ofType(classOf[String]) @@ -287,6 +293,7 @@ object ConsoleConsumer extends Logging { topicArg = options.valueOf(topicOrFilterOpt.head) filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) } + val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala) val consumerProps = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) else