From d913d17b58e2d9fb18ab5954b69280e8504840ff Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 18 Dec 2014 11:07:59 -0800 Subject: [PATCH 1/5] KAFKA-1824 - fix ConsoleProducer so parse.key and key.separator will work again --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 1061cc7..e011e42 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -36,10 +36,8 @@ object ConsoleProducer { val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] val props = new Properties props.putAll(config.cmdLineProps) - - val readerProps = new Properties(props) - readerProps.put("topic", config.topic) - reader.init(System.in, readerProps) + props.put("topic", config.topic) + reader.init(System.in, props) try { val producer = -- 1.9.3 (Apple Git-50) From 94e65ca1f7a3302f2bdc921b886f62de0f4ae4e8 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 18 Dec 2014 17:55:15 -0800 Subject: [PATCH 2/5] fixing accidental return of "WARN Property topic is not valid" --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index e011e42..fc11dd7 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -264,12 +264,19 @@ object ConsoleProducer { override def init(inputStream: InputStream, props: Properties) { topic = props.getProperty("topic") - if(props.containsKey("parse.key")) + props.remove("topic") + if(props.containsKey("parse.key")) { parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) + props.remove("parse.key") + } + if(props.containsKey("key.separator")) { keySeparator = props.getProperty("key.separator") - if(props.containsKey("ignore.error")) + props.remove("key.separator") + } + if(props.containsKey("ignore.error")) { ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") + props.remove("ignore.error") + } reader = new BufferedReader(new InputStreamReader(inputStream)) } -- 1.9.3 (Apple Git-50) From 34bee6072ad3a8637972f9881e3e71b36786a891 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Mon, 22 Dec 2014 16:16:47 -0800 Subject: [PATCH 3/5] refactoring ConsoleProducer to allow testing and added unit tests --- .../main/scala/kafka/tools/ConsoleProducer.scala | 116 +++++++++++---------- .../scala/kafka/tools/ConsoleProducerTest.scala | 79 ++++++++++++++ 2 files changed, 142 insertions(+), 53 deletions(-) create mode 100644 core/src/test/scala/kafka/tools/ConsoleProducerTest.scala diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index fc11dd7..fb71ec5 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -27,6 +27,7 @@ import java.util.Properties import java.io._ import joptsimple._ +import org.apache.kafka.clients.producer.ProducerConfig object ConsoleProducer { @@ -34,52 +35,14 @@ object ConsoleProducer { val config = new ProducerConfig(args) val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] - val props = new Properties - props.putAll(config.cmdLineProps) - props.put("topic", config.topic) - reader.init(System.in, props) + reader.init(System.in, getReaderProps(config)) try { val producer = if(config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig - - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) - props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) - props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) - props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) - props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) - if(config.queueEnqueueTimeoutMs != -1) - props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) - props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") - - new NewShinyProducer(props) + new NewShinyProducer(getNewProducerProps(config)) } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec) - props.put("producer.type", if(config.sync) "sync" else "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("message.send.max.retries", config.messageSendMaxRetries.toString) - props.put("retry.backoff.ms", config.retryBackoffMs.toString) - props.put("queue.buffering.max.ms", config.sendTimeout.toString) - props.put("queue.buffering.max.messages", config.queueSize.toString) - props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", config.requestRequiredAcks.toString) - props.put("request.timeout.ms", config.requestTimeoutMs.toString) - props.put("key.serializer.class", config.keyEncoderClass) - props.put("serializer.class", config.valueEncoderClass) - props.put("send.buffer.bytes", config.socketBuffer.toString) - props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) - props.put("client.id", "console-producer") - - new OldProducer(props) + new OldProducer(getOldProducerProps(config)) } Runtime.getRuntime.addShutdownHook(new Thread() { @@ -102,6 +65,60 @@ object ConsoleProducer { System.exit(0) } + def getReaderProps(config: ProducerConfig): Properties = { + val props = new Properties + props.put("topic",config.topic) + props.putAll(config.cmdLineProps) + props + } + + def getOldProducerProps(config: ProducerConfig): Properties = { + + val props = new Properties; + + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec) + props.put("producer.type", if(config.sync) "sync" else "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("message.send.max.retries", config.messageSendMaxRetries.toString) + props.put("retry.backoff.ms", config.retryBackoffMs.toString) + props.put("queue.buffering.max.ms", config.sendTimeout.toString) + props.put("queue.buffering.max.messages", config.queueSize.toString) + props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) + props.put("request.required.acks", config.requestRequiredAcks.toString) + props.put("request.timeout.ms", config.requestTimeoutMs.toString) + props.put("key.serializer.class", config.keyEncoderClass) + props.put("serializer.class", config.valueEncoderClass) + props.put("send.buffer.bytes", config.socketBuffer.toString) + props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) + props.put("client.id", "console-producer") + + props + } + + def getNewProducerProps(config: ProducerConfig): Properties = { + + val props = new Properties; + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) + props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) + if(config.queueEnqueueTimeoutMs != -1) + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + + props + } + class ProducerConfig(args: Array[String]) { val parser = new OptionParser val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") @@ -132,13 +149,13 @@ object ConsoleProducer { .ofType(classOf[java.lang.Integer]) .defaultsTo(100) val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + - " a message will queue awaiting suffient batch size. The value is given in ms.") + " a message will queue awaiting sufficient batch size. The value is given in ms.") .withRequiredArg .describedAs("timeout_ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + - " messages will queue awaiting suffient batch size.") + " messages will queue awaiting sufficient batch size.") .withRequiredArg .describedAs("queue_size") .ofType(classOf[java.lang.Integer]) @@ -264,19 +281,12 @@ object ConsoleProducer { override def init(inputStream: InputStream, props: Properties) { topic = props.getProperty("topic") - props.remove("topic") - if(props.containsKey("parse.key")) { + if(props.containsKey("parse.key")) parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") - props.remove("parse.key") - } - if(props.containsKey("key.separator")) { + if(props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator") - props.remove("key.separator") - } - if(props.containsKey("ignore.error")) { + if(props.containsKey("ignore.error")) ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") - props.remove("ignore.error") - } reader = new BufferedReader(new InputStreamReader(inputStream)) } diff --git a/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala new file mode 100644 index 0000000..6d1f51c --- /dev/null +++ b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import kafka.producer +import kafka.tools.ConsoleProducer.{LineMessageReader, MessageReader,ProducerConfig} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} +import joptsimple.UnrecognizedOptionException +import org.junit.Assert +import org.junit.Test + + +class ConsoleProducerTest { + + val validArgs: Array[String] = Array( + "--broker-list", + "localhost:1001,localhost:1002", + "--topic", + "t3", + "--property", + "parse.key=true", + "--property", + "key.separator=#" + ) + + val invalidArgs: Array[String] = Array( + "--t", // not a valid argument + "t3" + ) + + @Test + def testValidConfigsNewProducer() { + val config = new ConsoleProducer.ProducerConfig(validArgs) + // New ProducerConfig constructor is package private, so we can't call it directly + // Creating new Producer to validate instead + new KafkaProducer[Array[Byte],Array[Byte]](ConsoleProducer.getNewProducerProps(config)) + } + + @Test + def testValidConfigsOldProducer() { + val config = new ConsoleProducer.ProducerConfig(validArgs) + new producer.ProducerConfig(ConsoleProducer.getOldProducerProps(config)); + } + + @Test + def testInvalidConfigs() { + try { + val config = new ConsoleProducer.ProducerConfig(invalidArgs) + Assert.fail("Should have thrown an UnrecognizedOptionException") + } catch { + case e: joptsimple.OptionException => // expected exception + } + } + + @Test + def testParseKeyProp(): Unit = { + val config = new ConsoleProducer.ProducerConfig(validArgs) + val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[LineMessageReader]; + reader.init(System.in,ConsoleProducer.getReaderProps(config)) + assert(reader.keySeparator == "#") + assert(reader.parseKey == true) + } + +} -- 1.9.3 (Apple Git-50) From cf3f4e46b19d09a04b29aff569edf3ec09b83015 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Mon, 22 Dec 2014 17:02:00 -0800 Subject: [PATCH 4/5] added producer-properties to ConsoleProducer but no tests yet --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index fb71ec5..ff6ff80 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -76,6 +76,8 @@ object ConsoleProducer { val props = new Properties; + props.putAll(config.extraProducerProps) + props.put("metadata.broker.list", config.brokerList) props.put("compression.codec", config.compressionCodec) props.put("producer.type", if(config.sync) "sync" else "async") @@ -100,6 +102,8 @@ object ConsoleProducer { val props = new Properties; + props.putAll(config.extraProducerProps) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) @@ -226,6 +230,10 @@ object ConsoleProducer { .withRequiredArg .describedAs("prop") .ofType(classOf[String]) + val producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ") + .withRequiredArg + .describedAs("producer_prop") + .ofType(classOf[String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") val options = parser.parse(args : _*) @@ -258,6 +266,7 @@ object ConsoleProducer { val readerClass = options.valueOf(messageReaderOpt) val socketBuffer = options.valueOf(socketBufferSizeOpt) val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt)) + val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valueOf(producerPropertyOpt)) /* new producer related configs */ val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt) val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt) -- 1.9.3 (Apple Git-50) From ccb61c96541136588eea505ddd17dcae8a18de41 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Mon, 22 Dec 2014 17:12:10 -0800 Subject: [PATCH 5/5] fixing an issue with producer properties. --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index ff6ff80..fc7de27 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -266,7 +266,7 @@ object ConsoleProducer { val readerClass = options.valueOf(messageReaderOpt) val socketBuffer = options.valueOf(socketBufferSizeOpt) val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt)) - val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valueOf(producerPropertyOpt)) + val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt)) /* new producer related configs */ val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt) val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt) -- 1.9.3 (Apple Git-50)