From 8913ecbfdcd4c95951c4f3055a5abb2b91d68824 Mon Sep 17 00:00:00 2001 From: Balaji Seshadri Date: Thu, 4 Sep 2014 15:37:44 -0600 Subject: [PATCH] Patch for KAFKA-1618 --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index da4dad4..ac0bbdc 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -22,11 +22,10 @@ import kafka.message._ import kafka.serializer._ import kafka.utils.CommandLineUtils import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage} - import java.util.Properties import java.io._ - import joptsimple._ +import scala.util.matching.Regex object ConsoleProducer { @@ -112,6 +111,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("broker-list") .ofType(classOf[String]) + val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + "If specified without value, than it defaults to 'gzip'") @@ -219,7 +219,12 @@ object ConsoleProducer { import scala.collection.JavaConversions._ val useNewProducer = options.has(useNewProducerOpt) val topic = options.valueOf(topicOpt) - val brokerList = options.valueOf(brokerListOpt) + val regex = new Regex(":[0-9].*") + val brokerList = if(regex.findAllMatchIn(options.valueOf(brokerListOpt)).size==0) + options.valueOf(brokerListOpt)+":9091" + else + options.valueOf(brokerListOpt) + val sync = options.has(syncOpt) val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) val compressionCodec = if (options.has(compressionCodecOpt)) -- 1.8.5.2 (Apple Git-48)