From 7080c386684ae700868567adad3dabb82b6847a2 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 1 Dec 2014 10:53:39 -0800 Subject: [PATCH 1/4] KAFKA-1642 While waiting for metadata fetch to complete, use long poll timeouts to avoid busy looping. --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 525b95e..76f346b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -180,9 +180,10 @@ public class NetworkClient implements KafkaClient { // should we update our metadata? long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); - long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now; + long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0); + long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0); // if there is no node available to connect, back off refreshing metadata - long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt); + long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch); if (!this.metadataFetchInProgress && metadataTimeout == 0) maybeUpdateMetadata(sends, now); -- 2.1.3 From 5cd719947a731c6760e4b52042a6ef6e81052703 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 1 Dec 2014 13:31:40 -0800 Subject: [PATCH 2/4] KAFKA-1642 Make nodes as connecting before making connect request so exceptions are handled properly. --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 76f346b..47c8856 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -401,8 +401,8 @@ public class NetworkClient implements KafkaClient { private void initiateConnect(Node node, long now) { try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); - selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(node.id()); -- 2.1.3 From 62a956e652a40ad59687967f2cece135ce935c13 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 1 Dec 2014 13:32:20 -0800 Subject: [PATCH 3/4] KAFKA-1642 Update last time no nodes were available when updating metadata has to initiate a connection. --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 47c8856..5150c18 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -392,6 +392,7 @@ public class NetworkClient implements KafkaClient { // we don't have a connection to this node right now, make one log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); initiateConnect(node, now); + this.lastNoNodeAvailableMs = now; } } -- 2.1.3 From a95ebf6435d2d757a5aff791ea17e27dca9ce679 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 1 Dec 2014 15:42:27 -0800 Subject: [PATCH 4/4] KAFKA-1642 Don't busy wait trying to send metadata to a node that is connected but has too many in flight requests. --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 5150c18..ef41d85 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -393,6 +393,8 @@ public class NetworkClient implements KafkaClient { log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); initiateConnect(node, now); this.lastNoNodeAvailableMs = now; + } else { // connected, but can't send more + this.lastNoNodeAvailableMs = now; } } -- 2.1.3