Kafka
  1. Kafka
  2. KAFKA-899

LeaderNotAvailableException the first time a new message for a partition is processed.

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.2.0
    • Component/s: core
    • Labels:
      None

      Description

      I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the following, all embedded in the same java process:

      – spins up a zk instance
      – spins up a kafka server using a fresh log directory
      – creates a producer and sends a message
      – creates a high-level consumer and verifies that it can consume the message
      – shuts down the consumer
      – stops the kafka server
      – stops zk

      The test seems to be working fine now, however, I consistently see the following exceptions (which from poking around the mailing list seem to be expected?). If these are expected, can we suppress the logging of these exceptions, since it clutters the output of tests, and presumably, clutters the logs of the running server/consumers, during clean startup and shutdown......

      When I call producer.send(), I get:

      1071 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadata partition 0 leader: none replicas: isr: isUnderReplicated: false for topic partition [test-topic,0]: [class kafka.common.LeaderNotAvailableException]
      1081 [main] WARN kafka.producer.async.DefaultEventHandler - Failed to collate messages by topic,partition due to
      kafka.common.LeaderNotAvailableException: No leader for any partition
      at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
      at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
      at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
      at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
      at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
      at kafka.producer.Producer.send(Producer.scala:74)
      at kafka.javaapi.producer.Producer.send(Producer.scala:32)
      at com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      at java.lang.reflect.Method.invoke(Method.java:597)
      at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
      at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
      at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
      at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
      at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
      at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
      at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
      at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
      at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
      at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
      at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
      at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
      at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
      at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
      at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
      at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:77)
      at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:195)
      at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
      1133 [kafka-request-handler-1] WARN kafka.server.HighwaterMarkCheckpoint - No highwatermark file is found. Returning 0 as the highwatermark for partition [test-topic,0]
      ...
      ...

      It would be great if instead of this exception, it would just log a meaningful message, like:

      "No leader was available for partition X, one will now be created"

      Jason

      1. kafka-899.patch
        5 kB
        Jun Rao
      2. kafka-899_v2.patch
        5 kB
        Jun Rao
      3. kafka-899_v3.patch
        6 kB
        Jun Rao

        Activity

        Hide
        Jun Rao added a comment -

        Attach a patch. Jason, could you give it a try?

        Show
        Jun Rao added a comment - Attach a patch. Jason, could you give it a try?
        Hide
        Jun Rao added a comment -

        Attach patch v2 after the rebase.

        Show
        Jun Rao added a comment - Attach patch v2 after the rebase.
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch!

        1. Another place where we can make it easier for the user to know the reason for the send failure -

        error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
        .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))

        Here, we print the entire response. Rather we should only print the partition and corresponding error status for partitions with non-zero error code

        2. When we do print the error status above, we should print the text name of the error instead of the integer error code. For this, we can override toString() in ProducerResponseStatus.

        Show
        Neha Narkhede added a comment - Thanks for the patch! 1. Another place where we can make it easier for the user to know the reason for the send failure - error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) Here, we print the entire response. Rather we should only print the partition and corresponding error status for partitions with non-zero error code 2. When we do print the error status above, we should print the text name of the error instead of the integer error code. For this, we can override toString() in ProducerResponseStatus.
        Hide
        Jun Rao added a comment -

        Thanks for the review. Attach patch v3 that addresses the above issue. Changed the logging level from error to warning since the real error will be reported when all retries have failed.

        Show
        Jun Rao added a comment - Thanks for the review. Attach patch v3 that addresses the above issue. Changed the logging level from error to warning since the real error will be reported when all retries have failed.
        Hide
        Neha Narkhede added a comment -

        +1 on the latest patch

        Show
        Neha Narkhede added a comment - +1 on the latest patch
        Hide
        Jun Rao added a comment -

        Thanks for the review. Committed to 0.8 after fixing a bug and a typo.

        Show
        Jun Rao added a comment - Thanks for the review. Committed to 0.8 after fixing a bug and a typo.
        Hide
        Andras Hatvani added a comment - - edited

        This isn't fixed in 0.8.1.1 as the behavior is the same.

        As a workaround increase retry.backoff.ms from the default 100 ms to 1000 ms.
        In case this would be not enough for you, you can try to change the values of

        • message.send.max.retries from the default 5 to e.g. 10 and
        • topic.metadata.refresh.interval.ms to 0.

        This is the expected behavior, therefore an exception mustn't be thrown, rather it has to be communicated that the leader election is in progress. Furthermore, suggestions regarding changing the values variables I mentioned should be mandatory.

        Show
        Andras Hatvani added a comment - - edited This isn't fixed in 0.8.1.1 as the behavior is the same. As a workaround increase retry.backoff.ms from the default 100 ms to 1000 ms. In case this would be not enough for you, you can try to change the values of message.send.max.retries from the default 5 to e.g. 10 and topic.metadata.refresh.interval.ms to 0. This is the expected behavior, therefore an exception mustn't be thrown, rather it has to be communicated that the leader election is in progress. Furthermore, suggestions regarding changing the values variables I mentioned should be mandatory.
        Hide
        Jun Rao added a comment -

        This fix actually wasn't included in 0.8.1. Changed the fix version in the jira.

        Show
        Jun Rao added a comment - This fix actually wasn't included in 0.8.1. Changed the fix version in the jira.
        Hide
        Andras Hatvani added a comment -

        Jun, can I do any support regarding this issue (e.g. verify the implementation)?

        Show
        Andras Hatvani added a comment - Jun, can I do any support regarding this issue (e.g. verify the implementation)?
        Hide
        Jun Rao added a comment -

        Andras,

        It seems that your issue is a bit different from this jira. This jira is about removing the stacktrace in the producer log when the metadata is not available. Your issue seems to be that the metadata is not propagated as quickly as you expect. Normally, 100ms should be long enough for a new topic to be created and its metadata be propagated to all brokers. In your case, it seems that process takes more than 1 sec. Could you look at the controller and the state-change log to see where the delay is? For example, is the write to ZK slow or is the propagation of metadata from the controller to the broker slow?

        Show
        Jun Rao added a comment - Andras, It seems that your issue is a bit different from this jira. This jira is about removing the stacktrace in the producer log when the metadata is not available. Your issue seems to be that the metadata is not propagated as quickly as you expect. Normally, 100ms should be long enough for a new topic to be created and its metadata be propagated to all brokers. In your case, it seems that process takes more than 1 sec. Could you look at the controller and the state-change log to see where the delay is? For example, is the write to ZK slow or is the propagation of metadata from the controller to the broker slow?
        Hide
        Andras Hatvani added a comment - - edited

        Jun,

        Although the reasons may be different, the objective is identical (see my last post in the thread "LeaderNotAvailableException, although leader elected" on the Kafka user mailing list): There shouldn't be any exception in case no leader can be communicated to the producer (whether it's because metadata propagation delay or non-completed leader election or any other valid non-erroneous cause), but rather a status message enabling the producer to be tuned.
        This exception should really only cover exceptional cases.

        But you're right, my case will exactly be covered by KAFKA-1494. I'll provide further data in that issue.

        Show
        Andras Hatvani added a comment - - edited Jun, Although the reasons may be different, the objective is identical (see my last post in the thread "LeaderNotAvailableException, although leader elected" on the Kafka user mailing list): There shouldn't be any exception in case no leader can be communicated to the producer (whether it's because metadata propagation delay or non-completed leader election or any other valid non-erroneous cause), but rather a status message enabling the producer to be tuned. This exception should really only cover exceptional cases. But you're right, my case will exactly be covered by KAFKA-1494 . I'll provide further data in that issue.
        Hide
        Jun Rao added a comment -

        Andras,

        If a message couldn't be sent (after all retries), we need to indicate this to the producer client. We currently do that by throwing an exception back to the caller. The caller can decide what to do. Are you suggesting sth else?

        Show
        Jun Rao added a comment - Andras, If a message couldn't be sent (after all retries), we need to indicate this to the producer client. We currently do that by throwing an exception back to the caller. The caller can decide what to do. Are you suggesting sth else?
        Hide
        Andras Hatvani added a comment -

        Jun,

        Yes, I suggest a classification of the server's response so that the client can distinguish between technical failures (e.g. network unavailable) and functional state (e.g. leader election for partition in progress). For example, a topic's state could be: non-existent, being created, existent, leader election in progress, failed (and in this case the reason of the failure, like no disk-space).
        Furthermore, in case of topic auto-creation I'd separate and communicate the fact of creation from the message sending and handle the results and failures separately, too.
        Returning a value instead of void would support both mechanisms. What do you think?

        Show
        Andras Hatvani added a comment - Jun, Yes, I suggest a classification of the server's response so that the client can distinguish between technical failures (e.g. network unavailable) and functional state (e.g. leader election for partition in progress). For example, a topic's state could be: non-existent, being created, existent, leader election in progress, failed (and in this case the reason of the failure, like no disk-space). Furthermore, in case of topic auto-creation I'd separate and communicate the fact of creation from the message sending and handle the results and failures separately, too. Returning a value instead of void would support both mechanisms. What do you think?
        Hide
        Jun Rao added a comment -

        We started doing that classification in the new java producer. For example, there are certain exceptions are of RetriableException. Transient failures like leader not available are in that category. Exceptions like MessageTooLarge are in a different category. Perhaps you can take a look at that in the new producer and see if that makes sense.

        Show
        Jun Rao added a comment - We started doing that classification in the new java producer. For example, there are certain exceptions are of RetriableException. Transient failures like leader not available are in that category. Exceptions like MessageTooLarge are in a different category. Perhaps you can take a look at that in the new producer and see if that makes sense.

          People

          • Assignee:
            Jun Rao
            Reporter:
            Jason Rosenberg
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development