Kafka
  1. Kafka
  2. KAFKA-709

Default queue.enqueue.timeout.ms to -1

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: producer
    • Labels:
      None

      Description

      Hey Guys,

      It seems that, by default, producers in 0.8 are async, and have a default queue.enqueue.timeout.ms of 0. This means that anyone who reads messages faster than they're producing them will likely end up eventually hitting this exception:

      Exception in thread "Thread-3" kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(PageViewEventByGroupJson,Missing Page Group,java.nio.HeapByteBuffer[pos=0 lim=125 cap=125])
      at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:111)
      at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:89)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
      at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
      at kafka.producer.Producer.asyncSend(Producer.scala:89)
      at kafka.producer.Producer.send(Producer.scala:77)

      As it says in https://cwiki.apache.org/KAFKA/kafka-mirroring.html, this can result in losing messages, and nasty exceptions in the logs. I think the better default is setting queue.enqueue.timeout.ms to -1, which will just block until the queue frees up.

      1. kafka-709.patch
        0.8 kB
        Neha Narkhede

        Activity

        Hide
        Joe Stein added a comment -

        found a test failing related to this

        the fix I think is best to just update the AsyncProducerTest case for if someone changes the new default value to something like 0 so it goes to offer instead of put

        adding the line props.put("queue.enqueue.timeout.ms", "0")

        going to commit this change to AsyncProducerTest.testProducerQueueSize (line 68) so it passes again

        Show
        Joe Stein added a comment - found a test failing related to this the fix I think is best to just update the AsyncProducerTest case for if someone changes the new default value to something like 0 so it goes to offer instead of put adding the line props.put("queue.enqueue.timeout.ms", "0") going to commit this change to AsyncProducerTest.testProducerQueueSize (line 68) so it passes again
        Hide
        Neha Narkhede added a comment -

        Thanks for the review

        Show
        Neha Narkhede added a comment - Thanks for the review
        Hide
        Jay Kreps added a comment -

        +1

        Show
        Jay Kreps added a comment - +1
        Hide
        Neha Narkhede added a comment -

        Minor one liner change to fix the default for queue.enqueue.timeout.ms

        Show
        Neha Narkhede added a comment - Minor one liner change to fix the default for queue.enqueue.timeout.ms
        Hide
        Jay Kreps added a comment -

        Yeah I agree. Everyone who tries to use it hits this and then basically just thinks kafka is broken because the error message isn't very clear.

        Show
        Jay Kreps added a comment - Yeah I agree. Everyone who tries to use it hits this and then basically just thinks kafka is broken because the error message isn't very clear.

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Chris Riccomini
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development