From 0c164d6d640e16dab0828d7c9d549816b7dacb1d Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 01:17:05 -0700 Subject: [PATCH 1/3] Patch for KAFKA-2042. Do not update metadata for empty topic set in new producer --- .../main/java/org/apache/kafka/clients/NetworkClient.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index f429502..2d4f89c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -469,11 +469,14 @@ public class NetworkClient implements KafkaClient { if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set topics = metadata.topics(); - this.metadataFetchInProgress = true; - ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); - log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - this.selector.send(metadataRequest.request()); - this.inFlightRequests.add(metadataRequest); + // Only update metadata if topics is not empty + if (!topics.isEmpty()) { + this.metadataFetchInProgress = true; + ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); + this.selector.send(metadataRequest.request()); + this.inFlightRequests.add(metadataRequest); + } } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); -- 1.8.3.4 (Apple Git-47) From e2fc1397caf7c9b33f3b5542470274e5dbabcb10 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 13:36:48 -0700 Subject: [PATCH 2/3] Move the change to KafkaProducer after talking to Guozhang offline. --- .../main/java/org/apache/kafka/clients/NetworkClient.java | 13 +++++-------- .../org/apache/kafka/clients/producer/KafkaProducer.java | 4 +++- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 2d4f89c..f429502 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -469,14 +469,11 @@ public class NetworkClient implements KafkaClient { if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set topics = metadata.topics(); - // Only update metadata if topics is not empty - if (!topics.isEmpty()) { - this.metadataFetchInProgress = true; - ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); - log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - this.selector.send(metadataRequest.request()); - this.inFlightRequests.add(metadataRequest); - } + this.metadataFetchInProgress = true; + ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); + this.selector.send(metadataRequest.request()); + this.inFlightRequests.add(metadataRequest); } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index feda9c9..0f15c25 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -406,6 +406,9 @@ public class KafkaProducer implements Producer { * @param maxWaitMs The maximum time in ms for waiting on the metadata */ private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { + // Add topic to metadata topic list if it is not there already. + if (!this.metadata.topics().contains(topic)) + this.metadata.add(topic); if (metadata.fetch().partitionsForTopic(topic) != null) { return; } else { @@ -414,7 +417,6 @@ public class KafkaProducer implements Producer { while (metadata.fetch().partitionsForTopic(topic) == null) { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate(); - metadata.add(topic); sender.wakeup(); metadata.awaitUpdate(version, remainingWaitMs); long elapsed = time.milliseconds() - begin; -- 1.8.3.4 (Apple Git-47) From 9821d1aa0090724f5e3eed331030154ef639f479 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 13:56:29 -0700 Subject: [PATCH 3/3] A less expensive fix --- clients/src/main/java/org/apache/kafka/clients/Metadata.java | 9 +++++++++ .../java/org/apache/kafka/clients/producer/KafkaProducer.java | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index c8bde8b..d0d2b3a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -132,6 +132,15 @@ public final class Metadata { } /** + * Check if a topic is already in the topic set. + * @param topic topic to check + * @return true if the topic exists, false otherwise + */ + public synchronized boolean checkTopic(String topic) { + return this.topics.contains(topic); + } + + /** * Update the cluster metadata */ public synchronized void update(Cluster cluster, long now) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 0f15c25..ee4bc22 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -407,7 +407,7 @@ public class KafkaProducer implements Producer { */ private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { // Add topic to metadata topic list if it is not there already. - if (!this.metadata.topics().contains(topic)) + if (!this.metadata.checkTopic(topic)) this.metadata.add(topic); if (metadata.fetch().partitionsForTopic(topic) != null) { return; -- 1.8.3.4 (Apple Git-47)