diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 27b0ec8..b19ab49 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -32,7 +32,10 @@ object ConsoleProducer { val config = new ProducerConfig(args) val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] - reader.init(System.in, config.cmdLineProps) + val props = new Properties + props.put("topic", config.topic) + props.putAll(config.cmdLineProps) + reader.init(System.in, props) try { val producer = @@ -201,7 +204,6 @@ object ConsoleProducer { val readerClass = options.valueOf(messageReaderOpt) val socketBuffer = options.valueOf(socketBufferSizeOpt) val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt)) - cmdLineProps.put("topic", topic) /* new producer related configs */ val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt) val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt) @@ -262,22 +264,24 @@ object ConsoleProducer { } class NewShinyProducer(producerConfig: ProducerConfig) extends Producer { + import org.apache.kafka.clients.producer.ProducerConfig val props = new Properties() - props.put("metadata.broker.list", producerConfig.brokerList) - props.put("compression.type", producerConfig.compressionCodec) - props.put("send.buffer.bytes", producerConfig.socketBuffer.toString) - props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString) - props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString) - props.put("metadata.fetch.timeout.ms", producerConfig.metadataFetchTimeoutMs.toString) - props.put("request.required.acks", producerConfig.requestRequiredAcks.toString) - props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString) - props.put("request.retries", producerConfig.messageSendMaxRetries.toString) - props.put("linger.ms", producerConfig.sendTimeout.toString) + props.putAll(producerConfig.cmdLineProps) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerConfig.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfig.compressionCodec) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, producerConfig.socketBuffer.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, producerConfig.retryBackoffMs.toString) + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, producerConfig.metadataExpiryMs.toString) + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, producerConfig.metadataFetchTimeoutMs.toString) + props.put(ProducerConfig.ACKS_CONFIG, producerConfig.requestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, producerConfig.requestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, producerConfig.messageSendMaxRetries.toString) + props.put(ProducerConfig.LINGER_MS_CONFIG, producerConfig.sendTimeout.toString) if(producerConfig.queueEnqueueTimeoutMs != -1) - props.put("block.on.buffer.full", "false") - props.put("total.memory.bytes", producerConfig.maxMemoryBytes.toString) - props.put("max.partition.bytes", producerConfig.maxPartitionMemoryBytes.toString) - props.put("client.id", "console-producer") + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerConfig.maxMemoryBytes.toString) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, producerConfig.maxPartitionMemoryBytes.toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") val producer = new KafkaProducer(props) def send(topic: String, key: Array[Byte], bytes: Array[Byte]) { @@ -294,6 +298,7 @@ object ConsoleProducer { class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer { val props = new Properties() + props.putAll(producerConfig.cmdLineProps) props.put("metadata.broker.list", producerConfig.brokerList) props.put("compression.codec", producerConfig.compressionCodec) props.put("producer.type", if(producerConfig.sync) "sync" else "async") diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index f2246f9..eb71e49 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -23,7 +23,7 @@ import java.util.Properties import kafka.consumer._ import kafka.utils.{CommandLineUtils, Logging, ZkUtils} import kafka.api.OffsetRequest -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer} +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} object ReplayLogProducer extends Logging { @@ -122,7 +122,7 @@ object ReplayLogProducer extends Logging { val isSync = options.has(syncOpt) import scala.collection.JavaConversions._ val producerProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt)) - producerProps.put("metadata.broker.list", brokerList) + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) } class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging { diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala index ea856c7..37a9ec2 100644 --- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala @@ -19,7 +19,7 @@ package kafka.tools import java.util.Properties import kafka.consumer._ -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer} +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} object TestEndToEndLatency { def main(args: Array[String]) { @@ -46,9 +46,9 @@ object TestEndToEndLatency { val iter = stream.iterator val producerProps = new Properties() - producerProps.put("metadata.broker.list", brokerList) - producerProps.put("linger.ms", "0") - producerProps.put("block.on.buffer.full", "true") + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") + producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") val producer = new KafkaProducer(producerProps) val message = "hello there beautiful".getBytes diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala index edb6e5f..595dc7c 100644 --- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala @@ -26,7 +26,7 @@ import kafka.serializer._ import kafka.utils._ import kafka.log.FileMessageSet import kafka.log.Log -import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer} +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig} /** * This is a torture test that runs against an existing broker. Here is how it works: @@ -240,8 +240,8 @@ object TestLogCleaning { dups: Int, percentDeletes: Int): File = { val producerProps = new Properties - producerProps.setProperty("block.on.buffer.full", "true") - producerProps.setProperty("metadata.broker.list", brokerUrl) + producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) val producer = new KafkaProducer(producerProps) val rand = new Random(1) val keyCount = (messages / dups).toInt