Kafka
  1. Kafka
  2. KAFKA-279

kafka-console-producer does not take in customized values of --batch-size or --timeout

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7.1
    • Component/s: contrib
    • Labels:
      None
    • Environment:
      Ubuntu 10.04, openjdk1.6 with default installation of 0.7

      Description

      1. While the default console-producer, console-consumer paradigm works great, when I try modiying the batch size

      bin/kafka-console-producer.sh --batch-size 300 --zookeeper localhost:2181 --topic test1

      it gives me a

      Exception in thread "main" java.lang.NumberFormatException: null
      at java.lang.Integer.parseInt(Integer.java:443)
      at java.lang.Integer.parseInt(Integer.java:514)
      at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207)
      at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
      at kafka.utils.Utils$.getIntInRange(Utils.scala:189)
      at kafka.utils.Utils$.getInt(Utils.scala:174)
      at kafka.producer.async.AsyncProducerConfigShared$class.$init$(AsyncProducerConfig.scala:45)
      at kafka.producer.ProducerConfig.<init>(ProducerConfig.scala:25)
      at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:108)
      at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)

      I have looked at the code and can't figure out what's wrong

      2. When I do bin/kafka-console-producer.sh --timeout 30000 --zookeeper localhost:2181 --topic test1

      I would think that console-producer would wait for 30s if the batch size (default 200) is not full. It doesn't. It takes the same time without the timeout parameter (default 1000) and dumps whatever the batch size.

      Resolution from Jun

      1. The code does the following to set batch size
      props.put("batch.size", batchSize)
      Instead, it should do
      props.put("batch.size", batchSize.toString)

      2. It sets the wrong property name for timeout. Instead of doing
      props.put("queue.enqueueTimeout.ms", sendTimeout.toString)
      it should do
      props.put("queue.time", sendTimeout.toString)

        Activity

        Hide
        Jun Rao added a comment -

        Edward,

        Thanks for the review. Just committed the patch to trunk.

        Show
        Jun Rao added a comment - Edward, Thanks for the review. Just committed the patch to trunk.
        Hide
        Edward Smith added a comment -

        I tested this and verified that this fixes the error for --batch-size.

        Show
        Edward Smith added a comment - I tested this and verified that this fixes the error for --batch-size.
        Hide
        Neha Narkhede added a comment -

        Milind, does the attached patch fix your problem ?

        Show
        Neha Narkhede added a comment - Milind, does the attached patch fix your problem ?
        Hide
        Jun Rao added a comment -

        That typically means that you are sending data at a rate faster than the broker can persist. Try increasing flush.interval to sth like 10000 on the broker to increase server throughput.

        Show
        Jun Rao added a comment - That typically means that you are sending data at a rate faster than the broker can persist. Try increasing flush.interval to sth like 10000 on the broker to increase server throughput.
        Hide
        Steve Arch added a comment -

        I tried the patch and issuing the same command get the following stack trace part-way through an import of a 100000 line file (--batchsize 300):
        [2012-02-27 16:44:37,911] ERROR Event queue is full of unsent messages, could not send event: 7300043|103|60|1329080400|en|1987973|269118099490000000000000103153898086 (kafka.producer.async.AsyncProducer)
        Exception in thread "main" kafka.producer.async.QueueFullException: Event queue is full of unsent messages, could not send event: 7300043|103|60|1329080400|en|1987973|269118099490000000000000103153898086
        at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:121)
        at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
        at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
        at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
        at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
        at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
        at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
        at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
        at kafka.producer.Producer.zkSend(Producer.scala:143)
        at kafka.producer.Producer.send(Producer.scala:105)
        at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:120)
        at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)

        Show
        Steve Arch added a comment - I tried the patch and issuing the same command get the following stack trace part-way through an import of a 100000 line file (--batchsize 300): [2012-02-27 16:44:37,911] ERROR Event queue is full of unsent messages, could not send event: 7300043|103|60|1329080400|en|1987973|269118099490000000000000103153898086 (kafka.producer.async.AsyncProducer) Exception in thread "main" kafka.producer.async.QueueFullException: Event queue is full of unsent messages, could not send event: 7300043|103|60|1329080400|en|1987973|269118099490000000000000103153898086 at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:121) at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131) at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131) at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130) at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) at kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.producer.ProducerPool.send(ProducerPool.scala:102) at kafka.producer.Producer.zkSend(Producer.scala:143) at kafka.producer.Producer.send(Producer.scala:105) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:120) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
        Hide
        Jun Rao added a comment -

        Milind,

        Attached is a patch to trunk. Could you try it out and see if it fixes your problem?

        Show
        Jun Rao added a comment - Milind, Attached is a patch to trunk. Could you try it out and see if it fixes your problem?

          People

          • Assignee:
            Jun Rao
            Reporter:
            milind parikh
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development