From 9c89d31a57762258644783091eae6c87f6e4ffd5 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Fri, 22 Aug 2014 16:20:11 -0700 Subject: [PATCH] KAFKA-1609; New producer metadata response handling should not exclude a PartitionInfo when there is partition error. --- .../kafka/common/requests/MetadataResponse.java | 27 ++++++++++------------ 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 7d90fce..d97962d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -117,21 +117,18 @@ public class MetadataResponse extends AbstractRequestResponse { Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME); for (int j = 0; j < partitionInfos.length; j++) { Struct partitionInfo = (Struct) partitionInfos[j]; - short partError = partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME); - if (partError == Errors.NONE.code()) { - int partition = partitionInfo.getInt(PARTITION_KEY_NAME); - int leader = partitionInfo.getInt(LEADER_KEY_NAME); - Node leaderNode = leader == -1 ? null : brokers.get(leader); - Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); - Node[] replicaNodes = new Node[replicas.length]; - for (int k = 0; k < replicas.length; k++) - replicaNodes[k] = brokers.get(replicas[k]); - Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); - Node[] isrNodes = new Node[isr.length]; - for (int k = 0; k < isr.length; k++) - isrNodes[k] = brokers.get(isr[k]); - partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); - } + int partition = partitionInfo.getInt(PARTITION_KEY_NAME); + int leader = partitionInfo.getInt(LEADER_KEY_NAME); + Node leaderNode = leader == -1 ? null : brokers.get(leader); + Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); + Node[] replicaNodes = new Node[replicas.length]; + for (int k = 0; k < replicas.length; k++) + replicaNodes[k] = brokers.get(replicas[k]); + Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); + Node[] isrNodes = new Node[isr.length]; + for (int k = 0; k < isr.length; k++) + isrNodes[k] = brokers.get(isr[k]); + partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); } } else { errors.put(topic, Errors.forCode(topicError)); -- 1.7.12.4