From 3302ab555388f051acebffe08a2e3873493b5a47 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 1 Dec 2014 10:53:39 -0800 Subject: [PATCH 1/5] KAFKA-1642 While waiting for metadata fetch to complete, use long poll timeouts to avoid busy looping. --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 525b95e..76f346b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -180,9 +180,10 @@ public class NetworkClient implements KafkaClient { // should we update our metadata? long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); - long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now; + long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0); + long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0); // if there is no node available to connect, back off refreshing metadata - long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt); + long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch); if (!this.metadataFetchInProgress && metadataTimeout == 0) maybeUpdateMetadata(sends, now); -- 2.2.1 From 5fecd760a1028f07cdcc0597d153668b84f32d46 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 1 Dec 2014 13:31:40 -0800 Subject: [PATCH 2/5] KAFKA-1642 Make nodes as connecting before making connect request so exceptions are handled properly. --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 76f346b..47c8856 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -401,8 +401,8 @@ public class NetworkClient implements KafkaClient { private void initiateConnect(Node node, long now) { try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); - selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); this.connectionStates.connecting(node.id(), now); + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(node.id()); -- 2.2.1 From a09556f939d8ed473e4449bf6acaad8da7059f4d Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 1 Dec 2014 13:32:20 -0800 Subject: [PATCH 3/5] KAFKA-1642 Update last time no nodes were available when updating metadata has to initiate a connection. --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 47c8856..5150c18 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -392,6 +392,7 @@ public class NetworkClient implements KafkaClient { // we don't have a connection to this node right now, make one log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); initiateConnect(node, now); + this.lastNoNodeAvailableMs = now; } } -- 2.2.1 From 0a5cad6250b24aa0b605c506b0dbeca03807d63e Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 1 Dec 2014 15:42:27 -0800 Subject: [PATCH 4/5] KAFKA-1642 Don't busy wait trying to send metadata to a node that is connected but has too many in flight requests. --- clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 5150c18..ef41d85 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -393,6 +393,8 @@ public class NetworkClient implements KafkaClient { log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); initiateConnect(node, now); this.lastNoNodeAvailableMs = now; + } else { // connected, but can't send more + this.lastNoNodeAvailableMs = now; } } -- 2.2.1 From 48f120437b5ed31cda5f991e76f115a496d7f3b8 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 5 Jan 2015 18:46:43 -0800 Subject: [PATCH 5/5] Improve handling of fast-fail connection attempts in NetworkClient and add some comments explaining how each branch of maybeUpdateMetadata works. --- .../src/main/java/org/apache/kafka/clients/NetworkClient.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index ef41d85..6746275 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -372,6 +372,8 @@ public class NetworkClient implements KafkaClient { * Add a metadata request to the list of sends if we can make one */ private void maybeUpdateMetadata(List sends, long now) { + // Beware that the behavior of this method and the computation of timeouts for poll() are + // highly dependent on the behavior of leastLoadedNode. Node node = this.leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); @@ -392,8 +394,13 @@ public class NetworkClient implements KafkaClient { // we don't have a connection to this node right now, make one log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); initiateConnect(node, now); - this.lastNoNodeAvailableMs = now; - } else { // connected, but can't send more + // If initiateConnect failed immediately, this node will be put into blackout and we + // should allow immediately retrying in case there is another candidate node. If it + // is still connecting, the worst case is that we end up setting a longer timeout + // on the next round and then wait for the response. + } else { // connected, but can't send more OR connecting + // In either case, we just need to wait for a network event to let us know the selected + // connection might be usable again. this.lastNoNodeAvailableMs = now; } } -- 2.2.1