Kafka
  1. Kafka
  2. KAFKA-820

Topic metadata request handling fails to return all metadata about replicas

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:

      Description

      The admin utility that fetches topic metadata needs improvement in error handling. While fetching replica and isr broker information, if one of the replicas is offline, it fails to fetch the replica and isr info for the rest of them. This creates confusion on the client since it seems to the client the rest of the brokers are offline as well.

      1. kafka-820-v1.patch
        4 kB
        Neha Narkhede
      2. kafka-820-v2.patch
        5 kB
        Neha Narkhede
      3. kafka-820-v3.patch
        5 kB
        Neha Narkhede

        Activity

        Hide
        Neha Narkhede added a comment -

        Two changes -

        1. Modified topic metadata request handling to return the broker metadata for the rest of the brokers, even if one of them is offline. Also, modified it to throw an exception if even one of the brokers is offline so that error code will get returned to the client
        2. ReplicaNotAvailableException is irrelevant to the producer. It should not log the stack trace for it inside BrokerPartitionInfo.updateInfo(). This should only happen for UnknownTopicOrPartitionException and LeaderNotAvailableException

        Show
        Neha Narkhede added a comment - Two changes - 1. Modified topic metadata request handling to return the broker metadata for the rest of the brokers, even if one of them is offline. Also, modified it to throw an exception if even one of the brokers is offline so that error code will get returned to the client 2. ReplicaNotAvailableException is irrelevant to the producer. It should not log the stack trace for it inside BrokerPartitionInfo.updateInfo(). This should only happen for UnknownTopicOrPartitionException and LeaderNotAvailableException
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Not sure that I understand the changes in AdminUtils.getBrokerInfoFromCache(). It seems to me that with or w/o the patch, the method will throw an exception if at least one of the items in brokerIds can't be converted to a Broker object, in which case the return value is irrelevant.

        Show
        Jun Rao added a comment - Thanks for the patch. Not sure that I understand the changes in AdminUtils.getBrokerInfoFromCache(). It seems to me that with or w/o the patch, the method will throw an exception if at least one of the items in brokerIds can't be converted to a Broker object, in which case the return value is irrelevant.
        Hide
        Neha Narkhede added a comment -

        The return value is not irrelevant. As I explained, even if one broker is down, it aborts sending the broker data for the rest of the brokers. The impression it gives the client is that all the brokers are dead. The reason it throws the exception at the end is because we need to send the appropriate error code to the client if at least one broker is down.

        Show
        Neha Narkhede added a comment - The return value is not irrelevant. As I explained, even if one broker is down, it aborts sending the broker data for the rest of the brokers. The impression it gives the client is that all the brokers are dead. The reason it throws the exception at the end is because we need to send the appropriate error code to the client if at least one broker is down.
        Hide
        Jun Rao added a comment -

        When an exception is thrown in getBrokerInfoFromCache(), no value is returned, right?

        Show
        Jun Rao added a comment - When an exception is thrown in getBrokerInfoFromCache(), no value is returned, right?
        Hide
        Neha Narkhede added a comment -

        You are right, attached v2 patch to fix the issue

        Show
        Neha Narkhede added a comment - You are right, attached v2 patch to fix the issue
        Hide
        Neha Narkhede added a comment -
        • Fixed a bug in BrokerPartitionInfo while printing error code. In v2, I was printing the topic's error code instead of the partition's error code
        • As pointed out by Jun offline, dropWhile does not work the way I thought it did. Replaced that with filterNot.
        Show
        Neha Narkhede added a comment - Fixed a bug in BrokerPartitionInfo while printing error code. In v2, I was printing the topic's error code instead of the partition's error code As pointed out by Jun offline, dropWhile does not work the way I thought it did. Replaced that with filterNot.
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. A few minor comments. Once addressed, the patch can be checked in.

        30. AdminUtil.getBrokerInfoFromCache(): The following line uses double negation
        brokerMetadata.filterNot(!.isDefined).map(.get)
        It will be easier to read if we do
        brokerMetadata.filter(.isDefined).map(.get)

        31. AdminUtil.fetchTopicMetadataFromZk():
        31.1 In the following statement, should the catch clause be moved to after the first two statements inside try? Otherwise, we will unnecessarily wrap a ReplicaNotAvailableException over another ReplicaNotAvailableException.
        try

        { replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) if(replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) if(isrInfo.size < inSyncReplicas.size) throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) }

        catch

        { case e => throw new ReplicaNotAvailableException(e) }

        31.2 Similarly, in the following try/catch clause, should we move the try/catch clause inside case Some(l)? Otherwise, we will be wrapping a LeaderNotAvailableException over another LeaderNotAvailableException.
        try {
        leaderInfo = leader match

        { case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) }

        } catch

        { case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e) }
        Show
        Jun Rao added a comment - Thanks for patch v3. A few minor comments. Once addressed, the patch can be checked in. 30. AdminUtil.getBrokerInfoFromCache(): The following line uses double negation brokerMetadata.filterNot(! .isDefined).map( .get) It will be easier to read if we do brokerMetadata.filter( .isDefined).map( .get) 31. AdminUtil.fetchTopicMetadataFromZk(): 31.1 In the following statement, should the catch clause be moved to after the first two statements inside try? Otherwise, we will unnecessarily wrap a ReplicaNotAvailableException over another ReplicaNotAvailableException. try { replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) if(replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) if(isrInfo.size < inSyncReplicas.size) throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) } catch { case e => throw new ReplicaNotAvailableException(e) } 31.2 Similarly, in the following try/catch clause, should we move the try/catch clause inside case Some(l)? Otherwise, we will be wrapping a LeaderNotAvailableException over another LeaderNotAvailableException. try { leaderInfo = leader match { case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) } } catch { case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e) }
        Hide
        Neha Narkhede added a comment -

        Checked in after suggested fixes

        Show
        Neha Narkhede added a comment - Checked in after suggested fixes

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development