diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 5a81f35..9687757 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -23,12 +23,13 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.Time; - /** * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes. @@ -284,8 +285,12 @@ public class Sender implements Runnable { private void handleMetadataResponse(Struct body, long now) { this.metadataFetchInProgress = false; - Cluster cluster = ProtoUtils.parseMetadataResponse(body); - this.metadata.update(cluster, now); + MetadataResponse response = new MetadataResponse(body); + Cluster cluster = response.cluster(); + // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being + // created which means we will get errors and no nodes until it exists + if (cluster.nodes().size() > 0) + this.metadata.update(cluster, now); } /** @@ -322,11 +327,8 @@ public class Sender implements Runnable { * Create a metadata request for the given topics */ private InFlightRequest metadataRequest(int node, Set topics) { - String[] ts = new String[topics.size()]; - topics.toArray(ts); - Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id)); - body.set("topics", topics.toArray()); - RequestSend send = new RequestSend(node, new RequestHeader(ApiKeys.METADATA.id, clientId, correlation++), body); + MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); + RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct()); return new InFlightRequest(true, send, null); } @@ -387,11 +389,14 @@ public class Sender implements Runnable { } produce.set("topic_data", topicDatas.toArray()); - RequestHeader header = new RequestHeader(ApiKeys.PRODUCE.id, clientId, correlation++); - RequestSend send = new RequestSend(destination, header, produce); + RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce); return new InFlightRequest(acks != 0, send, batchesByPartition); } + private RequestHeader header(ApiKeys key) { + return new RequestHeader(key.id, clientId, correlation++); + } + /** * Wake up the selector associated with this send thread */ diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java index b34bf79..440caa8 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java @@ -1,18 +1,10 @@ package org.apache.kafka.common.protocol; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; - public class ProtoUtils { private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) { @@ -54,45 +46,4 @@ public class ProtoUtils { return (Struct) currentResponseSchema(apiKey).read(buffer); } - public static Cluster parseMetadataResponse(Struct response) { - Map brokers = new HashMap(); - Object[] brokerStructs = (Object[]) response.get("brokers"); - for (int i = 0; i < brokerStructs.length; i++) { - Struct broker = (Struct) brokerStructs[i]; - int nodeId = (Integer) broker.get("node_id"); - String host = (String) broker.get("host"); - int port = (Integer) broker.get("port"); - brokers.put(nodeId, new Node(nodeId, host, port)); - } - List partitions = new ArrayList(); - Object[] topicInfos = (Object[]) response.get("topic_metadata"); - for (int i = 0; i < topicInfos.length; i++) { - Struct topicInfo = (Struct) topicInfos[i]; - short topicError = topicInfo.getShort("topic_error_code"); - if (topicError == Errors.NONE.code()) { - String topic = topicInfo.getString("topic"); - Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata"); - for (int j = 0; j < partitionInfos.length; j++) { - Struct partitionInfo = (Struct) partitionInfos[j]; - short partError = partitionInfo.getShort("partition_error_code"); - if (partError == Errors.NONE.code()) { - int partition = partitionInfo.getInt("partition_id"); - int leader = partitionInfo.getInt("leader"); - Node leaderNode = leader == -1 ? null : brokers.get(leader); - Object[] replicas = (Object[]) partitionInfo.get("replicas"); - Node[] replicaNodes = new Node[replicas.length]; - for (int k = 0; k < replicas.length; k++) - replicaNodes[k] = brokers.get(replicas[k]); - Object[] isr = (Object[]) partitionInfo.get("isr"); - Node[] isrNodes = new Node[isr.length]; - for (int k = 0; k < isr.length; k++) - isrNodes[k] = brokers.get(isr[k]); - partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); - } - } - } - } - return new Cluster(brokers.values(), partitions); - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java new file mode 100644 index 0000000..91b9d64 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -0,0 +1,25 @@ +package org.apache.kafka.common.requests; + +import java.util.List; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class MetadataRequest { + + private final List topics; + + public MetadataRequest(List topics) { + this.topics = topics; + } + + public Struct toStruct() { + String[] ts = new String[topics.size()]; + topics.toArray(ts); + Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id)); + body.set("topics", topics.toArray()); + return body; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java new file mode 100644 index 0000000..73b7006 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -0,0 +1,77 @@ +package org.apache.kafka.common.requests; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +public class MetadataResponse { + + private final Cluster cluster; + private final Map errors; + + public MetadataResponse(Cluster cluster, Map errors) { + this.cluster = cluster; + this.errors = errors; + } + + public MetadataResponse(Struct struct) { + Map errors = new HashMap(); + Map brokers = new HashMap(); + Object[] brokerStructs = (Object[]) struct.get("brokers"); + for (int i = 0; i < brokerStructs.length; i++) { + Struct broker = (Struct) brokerStructs[i]; + int nodeId = (Integer) broker.get("node_id"); + String host = (String) broker.get("host"); + int port = (Integer) broker.get("port"); + brokers.put(nodeId, new Node(nodeId, host, port)); + } + List partitions = new ArrayList(); + Object[] topicInfos = (Object[]) struct.get("topic_metadata"); + for (int i = 0; i < topicInfos.length; i++) { + Struct topicInfo = (Struct) topicInfos[i]; + short topicError = topicInfo.getShort("topic_error_code"); + String topic = topicInfo.getString("topic"); + if (topicError == Errors.NONE.code()) { + Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata"); + for (int j = 0; j < partitionInfos.length; j++) { + Struct partitionInfo = (Struct) partitionInfos[j]; + short partError = partitionInfo.getShort("partition_error_code"); + if (partError == Errors.NONE.code()) { + int partition = partitionInfo.getInt("partition_id"); + int leader = partitionInfo.getInt("leader"); + Node leaderNode = leader == -1 ? null : brokers.get(leader); + Object[] replicas = (Object[]) partitionInfo.get("replicas"); + Node[] replicaNodes = new Node[replicas.length]; + for (int k = 0; k < replicas.length; k++) + replicaNodes[k] = brokers.get(replicas[k]); + Object[] isr = (Object[]) partitionInfo.get("isr"); + Node[] isrNodes = new Node[isr.length]; + for (int k = 0; k < isr.length; k++) + isrNodes[k] = brokers.get(isr[k]); + partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); + } + } + } else { + errors.put(topic, Errors.forCode(topicError)); + } + } + this.errors = errors; + this.cluster = new Cluster(brokers.values(), partitions); + } + + public Map errors() { + return this.errors; + } + + public Cluster cluster() { + return this.cluster; + } + +}