Kafka
  1. Kafka
  2. KAFKA-899

LeaderNotAvailableException the first time a new message for a partition is processed.

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:
      None

      Description

      I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the following, all embedded in the same java process:

      – spins up a zk instance
      – spins up a kafka server using a fresh log directory
      – creates a producer and sends a message
      – creates a high-level consumer and verifies that it can consume the message
      – shuts down the consumer
      – stops the kafka server
      – stops zk

      The test seems to be working fine now, however, I consistently see the following exceptions (which from poking around the mailing list seem to be expected?). If these are expected, can we suppress the logging of these exceptions, since it clutters the output of tests, and presumably, clutters the logs of the running server/consumers, during clean startup and shutdown......

      When I call producer.send(), I get:

      1071 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata partition 0 leader: none replicas: isr: isUnderReplicated: false for topic partition [test-topic,0]: [class kafka.common.LeaderNotAvailableException]
      1081 [main] WARN kafka.producer.async.DefaultEventHandler - Failed to collate messages by topic,partition due to
      kafka.common.LeaderNotAvailableException: No leader for any partition
      at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
      at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
      at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
      at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
      at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
      at kafka.producer.Producer.send(Producer.scala:74)
      at kafka.javaapi.producer.Producer.send(Producer.scala:32)
      at com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      at java.lang.reflect.Method.invoke(Method.java:597)
      at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
      at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
      at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
      at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
      at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
      at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
      at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
      at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
      at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
      at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
      at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
      at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
      at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
      at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
      at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
      at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:77)
      at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:195)
      at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
      1133 [kafka-request-handler-1] WARN kafka.server.HighwaterMarkCheckpoint - No highwatermark file is found. Returning 0 as the highwatermark for partition [test-topic,0]
      ...
      ...

      It would be great if instead of this exception, it would just log a meaningful message, like:

      "No leader was available for partition X, one will now be created"

      Jason

      1. kafka-899_v2.patch
        5 kB
        Jun Rao
      2. kafka-899_v3.patch
        6 kB
        Jun Rao
      3. kafka-899.patch
        5 kB
        Jun Rao

        Activity

        Jason Rosenberg created issue -
        Hide
        Jun Rao added a comment -

        Attach a patch. Jason, could you give it a try?

        Show
        Jun Rao added a comment - Attach a patch. Jason, could you give it a try?
        Jun Rao made changes -
        Field Original Value New Value
        Attachment kafka-899.patch [ 12582535 ]
        Jun Rao made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Assignee Jun Rao [ junrao ]
        Hide
        Jun Rao added a comment -

        Attach patch v2 after the rebase.

        Show
        Jun Rao added a comment - Attach patch v2 after the rebase.
        Jun Rao made changes -
        Attachment kafka-899_v2.patch [ 12585236 ]
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch!

        1. Another place where we can make it easier for the user to know the reason for the send failure -

        error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
        .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))

        Here, we print the entire response. Rather we should only print the partition and corresponding error status for partitions with non-zero error code

        2. When we do print the error status above, we should print the text name of the error instead of the integer error code. For this, we can override toString() in ProducerResponseStatus.

        Show
        Neha Narkhede added a comment - Thanks for the patch! 1. Another place where we can make it easier for the user to know the reason for the send failure - error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) Here, we print the entire response. Rather we should only print the partition and corresponding error status for partitions with non-zero error code 2. When we do print the error status above, we should print the text name of the error instead of the integer error code. For this, we can override toString() in ProducerResponseStatus.
        Hide
        Jun Rao added a comment -

        Thanks for the review. Attach patch v3 that addresses the above issue. Changed the logging level from error to warning since the real error will be reported when all retries have failed.

        Show
        Jun Rao added a comment - Thanks for the review. Attach patch v3 that addresses the above issue. Changed the logging level from error to warning since the real error will be reported when all retries have failed.
        Jun Rao made changes -
        Attachment kafka-899_v3.patch [ 12585349 ]
        Hide
        Neha Narkhede added a comment -

        +1 on the latest patch

        Show
        Neha Narkhede added a comment - +1 on the latest patch
        Hide
        Jun Rao added a comment -

        Thanks for the review. Committed to 0.8 after fixing a bug and a typo.

        Show
        Jun Rao added a comment - Thanks for the review. Committed to 0.8 after fixing a bug and a typo.
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Fix Version/s 0.8 [ 12317244 ]
        Resolution Fixed [ 1 ]

          People

          • Assignee:
            Jun Rao
            Reporter:
            Jason Rosenberg
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development