Kafka
  1. Kafka
  2. KAFKA-129

ZK-based producer can throw an unexpceted exception when sending a message

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7
    • Component/s: None
    • Labels:
      None

      Description

      Here is a log trace when that happens.

      2011/08/26 11:25:20.104 FATAL [EmbeddedConsumer] [kafka-embedded-consumer-firehoseActivity-0] [kafka] java.util.NoSuchElementException: None.getjava.util.NoSuchElementException: None.get
      at scala.None$.get(Option.scala:185)
      at scala.None$.get(Option.scala:183)
      at kafka.producer.Producer$$anonfun$3.apply(Producer.scala:115)
      at kafka.producer.Producer$$anonfun$3.apply(Producer.scala:101)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
      at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:32)
      at kafka.producer.Producer.send(Producer.scala:101)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:136)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:134)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
      at kafka.consumer.KafkaMessageStream.foreach(KafkaMessageStream.scala:29)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1.run(KafkaServerStartable.scala:134)
      at java.lang.Thread.run(Thread.java:619)

      1. KAFKA-129.patch
        32 kB
        Neha Narkhede
      2. KAFKA-129.patch
        32 kB
        Neha Narkhede
      3. KAFKA-129.patch
        33 kB
        Neha Narkhede

        Activity

        Hide
        Neha Narkhede added a comment -

        The producer using the zookeeper software load balancer maintains a ZK cache that gets updated by the zookeeper watcher listeners. During some events like a broker bounce, the producer ZK cache can get into an inconsistent state, for a small time period. In this time period, it could end up picking a broker partition that is unavailable, to complete the send operation.

        When this happens, the ZK cache needs to be updated, and the process of picking a broker partition for the current event should be repeated. This is repeated for a configurable number of retries, defaulting to 3. Arguably, if it takes more than 1-2 retries to get consistent data from zookeeper, something is really wrong, and we should throw an exception back to the user and fail the send operation.

        This patch also adds a check around the debug and trace level logging in the producer

        Show
        Neha Narkhede added a comment - The producer using the zookeeper software load balancer maintains a ZK cache that gets updated by the zookeeper watcher listeners. During some events like a broker bounce, the producer ZK cache can get into an inconsistent state, for a small time period. In this time period, it could end up picking a broker partition that is unavailable, to complete the send operation. When this happens, the ZK cache needs to be updated, and the process of picking a broker partition for the current event should be repeated. This is repeated for a configurable number of retries, defaulting to 3. Arguably, if it takes more than 1-2 retries to get consistent data from zookeeper, something is really wrong, and we should throw an exception back to the user and fail the send operation. This patch also adds a check around the debug and trace level logging in the producer
        Hide
        Jun Rao added a comment -

        1. import collection.SortedSet is not used in Producer.

        2. In ZKBrokerPatitionInfo, it seem that we need to synchronize on zkWatcherLock in getBrokerInfo, to prevent seeing inconsistent info between allBrokers and the syncProducer list in ProducerPool.

        3. In ProducerTest.testSendSingleMessage, the comment says that we want to send the request to a random partition, why is the partition number changed from -1 to 0?

        Show
        Jun Rao added a comment - 1. import collection.SortedSet is not used in Producer. 2. In ZKBrokerPatitionInfo, it seem that we need to synchronize on zkWatcherLock in getBrokerInfo, to prevent seeing inconsistent info between allBrokers and the syncProducer list in ProducerPool. 3. In ProducerTest.testSendSingleMessage, the comment says that we want to send the request to a random partition, why is the partition number changed from -1 to 0?
        Hide
        Neha Narkhede added a comment -

        2. Good catch. Updated the patch to include this.
        3. While I was making this change, I found a bug in the partitioning approach of the producer when broker.list option is used. Previously, it choose a random broker, and then created a produce request with -1 as the partition. This is not, however, we intend to do partitioning. We let the default partitioner pick the right broker partition from amongst all available. So we never end up with a request with -1 as the partition. That test was also written with this buggy logic. It is using the StaticPartitioner, so the broker partition is deterministically selected. I fixed the bug as well as the tests to expose the right behavior.

        Show
        Neha Narkhede added a comment - 2. Good catch. Updated the patch to include this. 3. While I was making this change, I found a bug in the partitioning approach of the producer when broker.list option is used. Previously, it choose a random broker, and then created a produce request with -1 as the partition. This is not, however, we intend to do partitioning. We let the default partitioner pick the right broker partition from amongst all available. So we never end up with a request with -1 as the partition. That test was also written with this buggy logic. It is using the StaticPartitioner, so the broker partition is deterministically selected. I fixed the bug as well as the tests to expose the right behavior.
        Hide
        Jun Rao added a comment -

        OK, for item 3, could you change the comment accordingly?

        Show
        Jun Rao added a comment - OK, for item 3, could you change the comment accordingly?
        Hide
        Neha Narkhede added a comment -

        Yeah, the comment in the test didn't quite match the new behavior. Fixed it

        Show
        Neha Narkhede added a comment - Yeah, the comment in the test didn't quite match the new behavior. Fixed it
        Hide
        Neha Narkhede added a comment -

        This patch fixes the comments that didn't match the expected behavior in the code

        Show
        Neha Narkhede added a comment - This patch fixes the comments that didn't match the expected behavior in the code
        Hide
        Jun Rao added a comment -

        +1. Thanks for the patch.

        Show
        Jun Rao added a comment - +1. Thanks for the patch.

          People

          • Assignee:
            Unassigned
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development