From 0c164d6d640e16dab0828d7c9d549816b7dacb1d Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 01:17:05 -0700 Subject: [PATCH 1/2] Patch for KAFKA-2042. Do not update metadata for empty topic set in new producer --- .../main/java/org/apache/kafka/clients/NetworkClient.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index f429502..2d4f89c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -469,11 +469,14 @@ public class NetworkClient implements KafkaClient { if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set topics = metadata.topics(); - this.metadataFetchInProgress = true; - ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); - log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - this.selector.send(metadataRequest.request()); - this.inFlightRequests.add(metadataRequest); + // Only update metadata if topics is not empty + if (!topics.isEmpty()) { + this.metadataFetchInProgress = true; + ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); + this.selector.send(metadataRequest.request()); + this.inFlightRequests.add(metadataRequest); + } } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); -- 1.8.3.4 (Apple Git-47) From e2fc1397caf7c9b33f3b5542470274e5dbabcb10 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 13:36:48 -0700 Subject: [PATCH 2/2] Move the change to KafkaProducer after talking to Guozhang offline. --- .../main/java/org/apache/kafka/clients/NetworkClient.java | 13 +++++-------- .../org/apache/kafka/clients/producer/KafkaProducer.java | 4 +++- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 2d4f89c..f429502 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -469,14 +469,11 @@ public class NetworkClient implements KafkaClient { if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set topics = metadata.topics(); - // Only update metadata if topics is not empty - if (!topics.isEmpty()) { - this.metadataFetchInProgress = true; - ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); - log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - this.selector.send(metadataRequest.request()); - this.inFlightRequests.add(metadataRequest); - } + this.metadataFetchInProgress = true; + ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); + this.selector.send(metadataRequest.request()); + this.inFlightRequests.add(metadataRequest); } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index feda9c9..0f15c25 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -406,6 +406,9 @@ public class KafkaProducer implements Producer { * @param maxWaitMs The maximum time in ms for waiting on the metadata */ private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { + // Add topic to metadata topic list if it is not there already. + if (!this.metadata.topics().contains(topic)) + this.metadata.add(topic); if (metadata.fetch().partitionsForTopic(topic) != null) { return; } else { @@ -414,7 +417,6 @@ public class KafkaProducer implements Producer { while (metadata.fetch().partitionsForTopic(topic) == null) { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate(); - metadata.add(topic); sender.wakeup(); metadata.awaitUpdate(version, remainingWaitMs); long elapsed = time.milliseconds() - begin; -- 1.8.3.4 (Apple Git-47)