Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-807

LineMessageReader doesn't correctly parse the key separator



    • Bug
    • Status: Resolved
    • Trivial
    • Resolution: Fixed
    • 0.8.0
    • 0.8.0
    • tools


      Typo in key name prevents extracting the key separator. The patch follows; what's the recommended way to submit patches?

      Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
      IDEA additional info:
      Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
      Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
      <+>/*\n * Licensed to the Apache Software Foundation (ASF) under one or more\n * contributor license agreements. See the NOTICE file distributed with\n * this work for additional information regarding copyright ownership.\n * The ASF licenses this file to You under the Apache License, Version 2.0\n * (the \"License\"); you may not use this file except in compliance with\n * the License. You may obtain a copy of the License at\n * \n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\npackage kafka.producer\n\nimport scala.collection.JavaConversions.\nimport joptsimple.\nimport java.util.Properties\nimport java.io.\nimport kafka.common.\nimport kafka.message.\nimport kafka.serializer.\n\nobject ConsoleProducer { \n\n def main(args: Array[String]) { \n val parser = new OptionParser\n val topicOpt = parser.accepts(\"topic\", \"REQUIRED: The topic id to produce messages to.\")\n .withRequiredArg\n .describedAs(\"topic\")\n .ofType(classOf[String])\n val brokerListOpt = parser.accepts(\"broker-list\", \"REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.\")\n .withRequiredArg\n .describedAs(\"broker-list\")\n .ofType(classOf[String])\n val syncOpt = parser.accepts(\"sync\", \"If set message send requests to the brokers are synchronously, one at a time as they arrive.\")\n val compressOpt = parser.accepts(\"compress\", \"If set, messages batches are sent compressed\")\n val batchSizeOpt = parser.accepts(\"batch-size\", \"Number of messages to send in a single batch if they are not being sent synchronously.\")\n .withRequiredArg\n .describedAs(\"size\")\n .ofType(classOf[java.lang.Integer])\n .defaultsTo(200)\n val sendTimeoutOpt = parser.accepts(\"timeout\", \"If set and the producer is running in asynchronous mode, this gives the maximum amount of time\" + \n \" a message will queue awaiting suffient batch size. The value is given in ms.\")\n .withRequiredArg\n .describedAs(\"timeout_ms\")\n .ofType(classOf[java.lang.Long])\n .defaultsTo(1000)\n val queueSizeOpt = parser.accepts(\"queue-size\", \"If set and the producer is running in asynchronous mode, this gives the maximum amount of \" + \n \" messages will queue awaiting suffient batch size.\")\n .withRequiredArg\n .describedAs(\"queue_size\")\n .ofType(classOf[java.lang.Long])\n .defaultsTo(10000)\n val queueEnqueueTimeoutMsOpt = parser.accepts(\"queue-enqueuetimeout-ms\", \"Timeout for event enqueue\")\n .withRequiredArg\n .describedAs(\"queue enqueuetimeout ms\")\n .ofType(classOf[java.lang.Long])\n .defaultsTo(0)\n val requestRequiredAcksOpt = parser.accepts(\"request-required-acks\", \"The required acks of the producer requests\")\n .withRequiredArg\n .describedAs(\"request required acks\")\n .ofType(classOf[java.lang.Integer])\n .defaultsTo(0)\n val requestTimeoutMsOpt = parser.accepts(\"request-timeout-ms\", \"The ack timeout of the producer requests. Value must be non-negative and non-zero\")\n .withRequiredArg\n .describedAs(\"request timeout ms\")\n .ofType(classOf[java.lang.Integer])\n .defaultsTo(1500)\n val valueEncoderOpt = parser.accepts(\"value-serializer\", \"The class name of the message encoder implementation to use for serializing values.\")\n .withRequiredArg\n .describedAs(\"encoder_class\")\n .ofType(classOf[java.lang.String])\n .defaultsTo(classOf[StringEncoder].getName)\n val keyEncoderOpt = parser.accepts(\"key-serializer\", \"The class name of the message encoder implementation to use for serializing keys.\")\n .withRequiredArg\n .describedAs(\"encoder_class\")\n .ofType(classOf[java.lang.String])\n .defaultsTo(classOf[StringEncoder].getName)\n val messageReaderOpt = parser.accepts(\"line-reader\", \"The class name of the class to use for reading lines from standard in. \" + \n \"By default each line is read as a separate message.\")\n .withRequiredArg\n .describedAs(\"reader_class\")\n .ofType(classOf[java.lang.String])\n .defaultsTo(classOf[LineMessageReader].getName)\n val socketBufferSizeOpt = parser.accepts(\"socket-buffer-size\", \"The size of the tcp RECV size.\")\n .withRequiredArg\n .describedAs(\"size\")\n .ofType(classOf[java.lang.Integer])\n .defaultsTo(1024*100)\n val propertyOpt = parser.accepts(\"property\", \"A mechanism to pass user-defined properties in the form key=value to the message reader. \" +\n \"This allows custom configuration for a user-defined message reader.\")\n .withRequiredArg\n .describedAs(\"prop\")\n .ofType(classOf[String])\n\n\n val options = parser.parse(args : _)\n for(arg <- List(topicOpt, brokerListOpt)) {\n if(!options.has(arg))

      {\n System.err.println(\"Missing required argument \\\"\" + arg + \"\\\"\")\n parser.printHelpOn(System.err)\n System.exit(1)\n }

      \n }\n\n val topic = options.valueOf(topicOpt)\n val brokerList = options.valueOf(brokerListOpt)\n val sync = options.has(syncOpt)\n val compress = options.has(compressOpt)\n val batchSize = options.valueOf(batchSizeOpt)\n val sendTimeout = options.valueOf(sendTimeoutOpt)\n val queueSize = options.valueOf(queueSizeOpt)\n val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)\n val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)\n val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)\n val keyEncoderClass = options.valueOf(keyEncoderOpt)\n val valueEncoderClass = options.valueOf(valueEncoderOpt)\n val readerClass = options.valueOf(messageReaderOpt)\n val socketBuffer = options.valueOf(socketBufferSizeOpt)\n val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))\n cmdLineProps.put(\"topic\", topic)\n\n val props = new Properties()\n props.put(\"broker.list\", brokerList)\n val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec\n props.put(\"compression.codec\", codec.toString)\n props.put(\"producer.type\", if(sync) \"sync\" else \"async\")\n if(options.has(batchSizeOpt))\n props.put(\"batch.num.messages\", batchSize.toString)\n props.put(\"queue.buffering.max.ms\", sendTimeout.toString)\n props.put(\"queue.buffering.max.messages\", queueSize.toString)\n props.put(\"queue.enqueue.timeout.ms\", queueEnqueueTimeoutMs.toString)\n props.put(\"request.required.acks\", requestRequiredAcks.toString)\n props.put(\"request.timeout.ms\", requestTimeoutMs.toString)\n props.put(\"key.serializer.class\", keyEncoderClass)\n props.put(\"serializer.class\", valueEncoderClass)\n props.put(\"send.buffer.bytes\", socketBuffer.toString)\n val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]\n reader.init(System.in, cmdLineProps)\n\n try {\n val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))\n\n Runtime.getRuntime.addShutdownHook(new Thread() {\n override def run()

      {\n producer.close()\n }

      \n })\n\n var message: KeyedMessage[AnyRef, AnyRef] = null\n do

      {\n message = reader.readMessage()\n if(message != null)\n producer.send(message)\n }

      while(message != null)\n } catch

      {\n case e: Exception =>\n e.printStackTrace\n System.exit(1)\n }

      \n System.exit(0)\n }\n\n def parseLineReaderArgs(args: Iterable[String]): Properties = {\n val splits = args.map(_ split \"=\").filterNot(_ == null).filterNot(.length == 0)\n if(!splits.forall(.length == 2))

      {\n System.err.println(\"Invalid line reader properties: \" + args.mkString(\" \"))\n System.exit(1)\n }

      \n val props = new Properties\n for(a <- splits)\n props.put(a(0), a(1))\n props\n }\n\n trait MessageReader[K,V] { \n def init(inputStream: InputStream, props: Properties) {}\n def readMessage(): KeyedMessage[K,V]\n def close() {}\n }\n\n class LineMessageReader extends MessageReader[String, String] {\n var topic: String = null\n var reader: BufferedReader = null\n var parseKey = false\n var keySeparator = \"\\t\"\n var ignoreError = false\n var lineNumber = 0\n\n override def init(inputStream: InputStream, props: Properties)

      {\n topic = props.getProperty(\"topic\")\n if(props.containsKey(\"parse.key\"))\n parseKey = props.getProperty(\"parse.key\").trim.toLowerCase.equals(\"true\")\n if(props.containsKey(\"key.seperator\"))\n keySeparator = props.getProperty(\"key.separator\")\n if(props.containsKey(\"ignore.error\"))\n ignoreError = props.getProperty(\"ignore.error\").trim.toLowerCase.equals(\"true\")\n reader = new BufferedReader(new InputStreamReader(inputStream))\n }

      \n\n override def readMessage() = {\n lineNumber += 1\n (reader.readLine(), parseKey) match {\n case (null, _) => null\n case (line, true) =>\n line.indexOf(keySeparator) match

      {\n case -1 =>\n if(ignoreError)\n new KeyedMessage(topic, line)\n else\n throw new KafkaException(\"No key found on line \" + lineNumber + \": \" + line)\n case n =>\n new KeyedMessage(topic,\n line.substring(0, n), \n if(n + keySeparator.size > line.size) \"\" else line.substring(n + keySeparator.size))\n }

      \n case (line, false) =>\n new KeyedMessage(topic, line)\n }\n }\n }\n}\n
      — core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision 290d5e0eac38e9917c64353a131154821b899f26)
      +++ core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision )
      @@ -196,7 +196,7 @@
      topic = props.getProperty("topic")
      parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")

      • if(props.containsKey("key.seperator"))
        + if(props.containsKey("key.separator"))
        keySeparator = props.getProperty("key.separator")
        ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")




            dragosm Dragos Manolescu
            dragosm Dragos Manolescu
            0 Vote for this issue
            2 Start watching this issue