diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index a6e3573..c7a1271 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -89,7 +89,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("class") .ofType(classOf[String]) - .defaultsTo(classOf[NewlineMessageFormatter].getName) + .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property") .withRequiredArg .describedAs("prop") @@ -256,10 +256,27 @@ trait MessageFormatter { def close() {} } -class NewlineMessageFormatter extends MessageFormatter { +class DefaultMessageFormatter extends MessageFormatter { + var printKey = false + var keySeparator = "\t".getBytes + var lineSeparator = "\n".getBytes + + override def init(props: Properties) { + if(props.containsKey("print.key")) + printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") + if(props.containsKey("key.seperator")) + keySeparator = props.getProperty("key.separator").getBytes + if(props.containsKey("line.seperator")) + lineSeparator = props.getProperty("line.separator").getBytes + } + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { + if(printKey) { + output.write(key) + output.write(keySeparator) + } output.write(value) - output.write('\n') + output.write(lineSeparator) } } diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 8664cb1..e7f50e4 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -20,7 +20,9 @@ package kafka.producer import scala.collection.JavaConversions._ import joptsimple._ import java.util.Properties +import java.util.regex._ import java.io._ +import kafka.common._ import kafka.message._ import kafka.serializer._ @@ -49,13 +51,18 @@ object ConsoleProducer { .describedAs("timeout_ms") .ofType(classOf[java.lang.Long]) .defaultsTo(1000) - val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message encoder implementation to use.") + val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.") + .withRequiredArg + .describedAs("encoder_class") + .ofType(classOf[java.lang.String]) + .defaultsTo(classOf[StringEncoder].getName) + val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.") .withRequiredArg .describedAs("encoder_class") .ofType(classOf[java.lang.String]) .defaultsTo(classOf[StringEncoder].getName) val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + - "By default each line is read as a seperate message.") + "By default each line is read as a separate message.") .withRequiredArg .describedAs("reader_class") .ofType(classOf[java.lang.String]) @@ -82,9 +89,11 @@ object ConsoleProducer { val compress = options.has(compressOpt) val batchSize = options.valueOf(batchSizeOpt) val sendTimeout = options.valueOf(sendTimeoutOpt) - val encoderClass = options.valueOf(messageEncoderOpt) + val keyEncoderClass = options.valueOf(keyEncoderOpt) + val valueEncoderClass = options.valueOf(valueEncoderOpt) val readerClass = options.valueOf(messageReaderOpt) val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt)) + cmdLineProps.put("topic", topic) val props = new Properties() props.put("broker.list", brokerList) @@ -94,12 +103,13 @@ object ConsoleProducer { if(options.has(batchSizeOpt)) props.put("batch.size", batchSize.toString) props.put("queue.time", sendTimeout.toString) - props.put("serializer.class", encoderClass) + props.put("key.serializer.class", keyEncoderClass) + props.put("serializer.class", valueEncoderClass) - val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader] + val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]] reader.init(System.in, cmdLineProps) - val producer = new Producer[Any, Any](new ProducerConfig(props)) + val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props)) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -107,11 +117,11 @@ object ConsoleProducer { } }) - var message: AnyRef = null + var message: KeyedMessage[AnyRef, AnyRef] = null do { message = reader.readMessage() if(message != null) - producer.send(new KeyedMessage(topic, message)) + producer.send(message) } while(message != null) } @@ -127,19 +137,49 @@ object ConsoleProducer { props } - trait MessageReader { + trait MessageReader[K,V] { def init(inputStream: InputStream, props: Properties) {} - def readMessage(): AnyRef + def readMessage(): KeyedMessage[K,V] def close() {} } - class LineMessageReader extends MessageReader { + class LineMessageReader extends MessageReader[String, String] { + var topic: String = null var reader: BufferedReader = null - - override def init(inputStream: InputStream, props: Properties) { + var parseKey = false + var keySeparator = "\t" + var ignoreError = false + var lineNumber = 0 + + override def init(inputStream: InputStream, props: Properties) { + topic = props.getProperty("topic") + if(props.containsKey("parse.key")) + parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") + if(props.containsKey("key.seperator")) + keySeparator = props.getProperty("key.separator") + if(props.containsKey("ignore.error")) + ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") reader = new BufferedReader(new InputStreamReader(inputStream)) } - override def readMessage() = reader.readLine() + override def readMessage() = { + lineNumber += 1 + val line = reader.readLine() + if(parseKey) { + line.indexOf(keySeparator) match { + case -1 => + if(ignoreError) + new KeyedMessage(topic, line) + else + throw new KafkaException("No key found on line " + lineNumber + ": " + line) + case n => + new KeyedMessage(topic, + line.substring(0, n), + if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)) + } + } else { + new KeyedMessage(topic, line) + } + } } } diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 46ea7d4..c900c45 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -19,7 +19,7 @@ package kafka.producer.async import kafka.utils.{SystemTime, Logging} import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue} -import collection.mutable.ListBuffer +import collection.mutable.ArrayBuffer import kafka.producer.KeyedMessage import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge @@ -57,7 +57,7 @@ class ProducerSendThread[K,V](val threadName: String, private def processEvents() { var lastSend = SystemTime.milliseconds - var events = new ListBuffer[KeyedMessage[K,V]] + var events = new ArrayBuffer[KeyedMessage[K,V]] var full: Boolean = false // drain the queue until you get a shutdown command @@ -85,7 +85,7 @@ class ProducerSendThread[K,V](val threadName: String, // if either queue time has reached or batch size has reached, dispatch to event handler tryToHandle(events) lastSend = SystemTime.milliseconds - events = new ListBuffer[KeyedMessage[K,V]] + events = new ArrayBuffer[KeyedMessage[K,V]] } } // send the last batch of events diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala index 9da8370..6fc3b1d 100644 --- a/core/src/main/scala/kafka/serializer/Decoder.scala +++ b/core/src/main/scala/kafka/serializer/Decoder.scala @@ -37,13 +37,6 @@ class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[B } /** - * Decode messages without any key - */ -class KeylessMessageDecoder(props: VerifiableProperties = null) extends Decoder[Message] { - def fromBytes(bytes: Array[Byte]) = new Message(bytes) -} - -/** * The string encoder translates strings into bytes. It uses UTF8 by default but takes * an optional property serializer.encoding to control this. */ diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index dac7056..e34a432 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -74,7 +74,7 @@ object SimpleConsumerShell extends Logging { .withRequiredArg .describedAs("class") .ofType(classOf[String]) - .defaultsTo(classOf[NewlineMessageFormatter].getName) + .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property") .withRequiredArg .describedAs("prop")