diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 7fefc2ed6fed18bd172e35fc98b10e1207781e92..00265f9f4a4b6c6a9aa023e5be5faf297f77bf31 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -27,6 +27,7 @@ import java.util.Properties import java.io._ import joptsimple._ +import org.apache.kafka.clients.producer.ProducerConfig object ConsoleProducer { @@ -34,54 +35,14 @@ object ConsoleProducer { val config = new ProducerConfig(args) val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] - val props = new Properties - props.putAll(config.cmdLineProps) - props.put("topic", config.topic) - reader.init(System.in, props) + reader.init(System.in, getReaderProps(config)) try { val producer = if(config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig - - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) - props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) - props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) - props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) - props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) - if(config.queueEnqueueTimeoutMs != -1) - props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) - props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - - new NewShinyProducer(props) + new NewShinyProducer(getNewProducerProps(config)) } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec) - props.put("producer.type", if(config.sync) "sync" else "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("message.send.max.retries", config.messageSendMaxRetries.toString) - props.put("retry.backoff.ms", config.retryBackoffMs.toString) - props.put("queue.buffering.max.ms", config.sendTimeout.toString) - props.put("queue.buffering.max.messages", config.queueSize.toString) - props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", config.requestRequiredAcks.toString) - props.put("request.timeout.ms", config.requestTimeoutMs.toString) - props.put("key.serializer.class", config.keyEncoderClass) - props.put("serializer.class", config.valueEncoderClass) - props.put("send.buffer.bytes", config.socketBuffer.toString) - props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) - props.put("client.id", "console-producer") - - new OldProducer(props) + new OldProducer(getOldProducerProps(config)) } Runtime.getRuntime.addShutdownHook(new Thread() { @@ -104,6 +65,66 @@ object ConsoleProducer { System.exit(0) } + def getReaderProps(config: ProducerConfig): Properties = { + val props = new Properties + props.put("topic",config.topic) + props.putAll(config.cmdLineProps) + props + } + + def getOldProducerProps(config: ProducerConfig): Properties = { + + val props = new Properties; + + props.putAll(config.extraProducerProps) + + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec) + props.put("producer.type", if(config.sync) "sync" else "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("message.send.max.retries", config.messageSendMaxRetries.toString) + props.put("retry.backoff.ms", config.retryBackoffMs.toString) + props.put("queue.buffering.max.ms", config.sendTimeout.toString) + props.put("queue.buffering.max.messages", config.queueSize.toString) + props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) + props.put("request.required.acks", config.requestRequiredAcks.toString) + props.put("request.timeout.ms", config.requestTimeoutMs.toString) + props.put("key.serializer.class", config.keyEncoderClass) + props.put("serializer.class", config.valueEncoderClass) + props.put("send.buffer.bytes", config.socketBuffer.toString) + props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) + props.put("client.id", "console-producer") + + props + } + + def getNewProducerProps(config: ProducerConfig): Properties = { + + val props = new Properties; + + props.putAll(config.extraProducerProps) + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) + props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) + if(config.queueEnqueueTimeoutMs != -1) + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + + props + } + class ProducerConfig(args: Array[String]) { val parser = new OptionParser val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") @@ -211,6 +232,10 @@ object ConsoleProducer { .withRequiredArg .describedAs("prop") .ofType(classOf[String]) + val producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ") + .withRequiredArg + .describedAs("producer_prop") + .ofType(classOf[String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") val options = parser.parse(args : _*) @@ -243,6 +268,7 @@ object ConsoleProducer { val readerClass = options.valueOf(messageReaderOpt) val socketBuffer = options.valueOf(socketBufferSizeOpt) val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt)) + val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt)) /* new producer related configs */ val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt) val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt) diff --git a/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..6d1f51c866e237ba85bd8f6d2523d01d7d63f321 --- /dev/null +++ b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import kafka.producer +import kafka.tools.ConsoleProducer.{LineMessageReader, MessageReader,ProducerConfig} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} +import joptsimple.UnrecognizedOptionException +import org.junit.Assert +import org.junit.Test + + +class ConsoleProducerTest { + + val validArgs: Array[String] = Array( + "--broker-list", + "localhost:1001,localhost:1002", + "--topic", + "t3", + "--property", + "parse.key=true", + "--property", + "key.separator=#" + ) + + val invalidArgs: Array[String] = Array( + "--t", // not a valid argument + "t3" + ) + + @Test + def testValidConfigsNewProducer() { + val config = new ConsoleProducer.ProducerConfig(validArgs) + // New ProducerConfig constructor is package private, so we can't call it directly + // Creating new Producer to validate instead + new KafkaProducer[Array[Byte],Array[Byte]](ConsoleProducer.getNewProducerProps(config)) + } + + @Test + def testValidConfigsOldProducer() { + val config = new ConsoleProducer.ProducerConfig(validArgs) + new producer.ProducerConfig(ConsoleProducer.getOldProducerProps(config)); + } + + @Test + def testInvalidConfigs() { + try { + val config = new ConsoleProducer.ProducerConfig(invalidArgs) + Assert.fail("Should have thrown an UnrecognizedOptionException") + } catch { + case e: joptsimple.OptionException => // expected exception + } + } + + @Test + def testParseKeyProp(): Unit = { + val config = new ConsoleProducer.ProducerConfig(validArgs) + val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[LineMessageReader]; + reader.init(System.in,ConsoleProducer.getReaderProps(config)) + assert(reader.keySeparator == "#") + assert(reader.parseKey == true) + } + +}