Kafka
  1. Kafka
  2. KAFKA-820

Topic metadata request handling fails to return all metadata about replicas

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:

      Description

      The admin utility that fetches topic metadata needs improvement in error handling. While fetching replica and isr broker information, if one of the replicas is offline, it fails to fetch the replica and isr info for the rest of them. This creates confusion on the client since it seems to the client the rest of the brokers are offline as well.

      1. kafka-820-v3.patch
        5 kB
        Neha Narkhede
      2. kafka-820-v2.patch
        5 kB
        Neha Narkhede
      3. kafka-820-v1.patch
        4 kB
        Neha Narkhede

        Activity

        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open In Progress In Progress
        19m 5s 1 Neha Narkhede 20/Mar/13 00:52
        In Progress In Progress Patch Available Patch Available
        11m 26s 1 Neha Narkhede 20/Mar/13 01:03
        Patch Available Patch Available Resolved Resolved
        2d 15h 9m 1 Neha Narkhede 22/Mar/13 16:13
        Resolved Resolved Closed Closed
        2s 1 Neha Narkhede 22/Mar/13 16:13
        Tony Stevenson made changes -
        Workflow Apache Kafka Workflow [ 13053007 ] no-reopen-closed, patch-avail [ 13055565 ]
        Tony Stevenson made changes -
        Workflow no-reopen-closed, patch-avail [ 12772281 ] Apache Kafka Workflow [ 13053007 ]
        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Neha Narkhede added a comment -

        Checked in after suggested fixes

        Show
        Neha Narkhede added a comment - Checked in after suggested fixes
        Hide
        Jun Rao added a comment -

        Thanks for patch v3. A few minor comments. Once addressed, the patch can be checked in.

        30. AdminUtil.getBrokerInfoFromCache(): The following line uses double negation
        brokerMetadata.filterNot(!.isDefined).map(.get)
        It will be easier to read if we do
        brokerMetadata.filter(.isDefined).map(.get)

        31. AdminUtil.fetchTopicMetadataFromZk():
        31.1 In the following statement, should the catch clause be moved to after the first two statements inside try? Otherwise, we will unnecessarily wrap a ReplicaNotAvailableException over another ReplicaNotAvailableException.
        try

        { replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) if(replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) if(isrInfo.size < inSyncReplicas.size) throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) }

        catch

        { case e => throw new ReplicaNotAvailableException(e) }

        31.2 Similarly, in the following try/catch clause, should we move the try/catch clause inside case Some(l)? Otherwise, we will be wrapping a LeaderNotAvailableException over another LeaderNotAvailableException.
        try {
        leaderInfo = leader match

        { case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) }

        } catch

        { case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e) }
        Show
        Jun Rao added a comment - Thanks for patch v3. A few minor comments. Once addressed, the patch can be checked in. 30. AdminUtil.getBrokerInfoFromCache(): The following line uses double negation brokerMetadata.filterNot(! .isDefined).map( .get) It will be easier to read if we do brokerMetadata.filter( .isDefined).map( .get) 31. AdminUtil.fetchTopicMetadataFromZk(): 31.1 In the following statement, should the catch clause be moved to after the first two statements inside try? Otherwise, we will unnecessarily wrap a ReplicaNotAvailableException over another ReplicaNotAvailableException. try { replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) if(replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) if(isrInfo.size < inSyncReplicas.size) throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) } catch { case e => throw new ReplicaNotAvailableException(e) } 31.2 Similarly, in the following try/catch clause, should we move the try/catch clause inside case Some(l)? Otherwise, we will be wrapping a LeaderNotAvailableException over another LeaderNotAvailableException. try { leaderInfo = leader match { case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) } } catch { case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e) }
        Neha Narkhede made changes -
        Attachment kafka-820-v3.patch [ 12574812 ]
        Hide
        Neha Narkhede added a comment -
        • Fixed a bug in BrokerPartitionInfo while printing error code. In v2, I was printing the topic's error code instead of the partition's error code
        • As pointed out by Jun offline, dropWhile does not work the way I thought it did. Replaced that with filterNot.
        Show
        Neha Narkhede added a comment - Fixed a bug in BrokerPartitionInfo while printing error code. In v2, I was printing the topic's error code instead of the partition's error code As pointed out by Jun offline, dropWhile does not work the way I thought it did. Replaced that with filterNot.
        Neha Narkhede made changes -
        Attachment kafka-820-v2.patch [ 12574588 ]
        Hide
        Neha Narkhede added a comment -

        You are right, attached v2 patch to fix the issue

        Show
        Neha Narkhede added a comment - You are right, attached v2 patch to fix the issue
        Hide
        Jun Rao added a comment -

        When an exception is thrown in getBrokerInfoFromCache(), no value is returned, right?

        Show
        Jun Rao added a comment - When an exception is thrown in getBrokerInfoFromCache(), no value is returned, right?
        Hide
        Neha Narkhede added a comment -

        The return value is not irrelevant. As I explained, even if one broker is down, it aborts sending the broker data for the rest of the brokers. The impression it gives the client is that all the brokers are dead. The reason it throws the exception at the end is because we need to send the appropriate error code to the client if at least one broker is down.

        Show
        Neha Narkhede added a comment - The return value is not irrelevant. As I explained, even if one broker is down, it aborts sending the broker data for the rest of the brokers. The impression it gives the client is that all the brokers are dead. The reason it throws the exception at the end is because we need to send the appropriate error code to the client if at least one broker is down.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Not sure that I understand the changes in AdminUtils.getBrokerInfoFromCache(). It seems to me that with or w/o the patch, the method will throw an exception if at least one of the items in brokerIds can't be converted to a Broker object, in which case the return value is irrelevant.

        Show
        Jun Rao added a comment - Thanks for the patch. Not sure that I understand the changes in AdminUtils.getBrokerInfoFromCache(). It seems to me that with or w/o the patch, the method will throw an exception if at least one of the items in brokerIds can't be converted to a Broker object, in which case the return value is irrelevant.
        Neha Narkhede made changes -
        Status In Progress [ 3 ] Patch Available [ 10002 ]
        Neha Narkhede made changes -
        Attachment kafka-820-v1.patch [ 12574467 ]
        Hide
        Neha Narkhede added a comment -

        Two changes -

        1. Modified topic metadata request handling to return the broker metadata for the rest of the brokers, even if one of them is offline. Also, modified it to throw an exception if even one of the brokers is offline so that error code will get returned to the client
        2. ReplicaNotAvailableException is irrelevant to the producer. It should not log the stack trace for it inside BrokerPartitionInfo.updateInfo(). This should only happen for UnknownTopicOrPartitionException and LeaderNotAvailableException

        Show
        Neha Narkhede added a comment - Two changes - 1. Modified topic metadata request handling to return the broker metadata for the rest of the brokers, even if one of them is offline. Also, modified it to throw an exception if even one of the brokers is offline so that error code will get returned to the client 2. ReplicaNotAvailableException is irrelevant to the producer. It should not log the stack trace for it inside BrokerPartitionInfo.updateInfo(). This should only happen for UnknownTopicOrPartitionException and LeaderNotAvailableException
        Neha Narkhede made changes -
        Status Open [ 1 ] In Progress [ 3 ]
        Neha Narkhede made changes -
        Field Original Value New Value
        Assignee Neha Narkhede [ nehanarkhede ]
        Neha Narkhede created issue -

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development