From 9cd6b60025aabc790565323e7f38ad90da995ac7 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Sun, 19 Oct 2014 18:32:37 -0700 Subject: [PATCH 1/3] KAFKA-1642 Compute correct poll timeout when some nodes are not ready for sending or there are multiple partitions per node. Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. --- .../producer/internals/RecordAccumulator.java | 18 ++++-- .../kafka/clients/producer/internals/Sender.java | 17 +++++- .../clients/producer/RecordAccumulatorTest.java | 64 ++++++++++++++++++---- 3 files changed, 79 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index c5d4700..c15485d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -183,9 +183,9 @@ public final class RecordAccumulator { } /** - * Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no - * partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0. Also return the flag - * for whether there are any unknown leaders for the accumulated partition batches. + * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable + * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated + * partition batches. *

* A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the * following are true : @@ -219,11 +219,17 @@ public final class RecordAccumulator { long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = waitedTimeMs >= lingerMs; + boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed; - if (sendable && !backingOff) + if (sendable && !backingOff) { readyNodes.add(leader); - nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); + } + else { + // Note that this results in a conservative estimate since an un-sendable partition may have + // a leader that will later be found to have sendable data. However, this is good enough + // since we'll just wake up and then sleep again for the remaining time. + nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); + } } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8ebe7ed..329ebad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -151,11 +151,14 @@ public class Sender implements Runnable { this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to + boolean filteredNodes = false; Iterator iter = result.readyNodes.iterator(); while (iter.hasNext()) { Node node = iter.next(); - if (!this.client.ready(node, now)) + if (!this.client.ready(node, now)) { iter.remove(); + filteredNodes = true; + } } // create produce requests @@ -163,16 +166,26 @@ public class Sender implements Runnable { List requests = createProduceRequests(batches, now); sensors.updateProduceRequestMetrics(requests); + // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately + // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data + // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes + // with sendable data that aren't ready to send since they would cause busy looping. + long pollTimeout = result.nextReadyCheckDelayMs; if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); + pollTimeout = 0; + } else if (filteredNodes) { + // If we ended up filtering out all sendable nodes, make sure we wake up again. + // (Initial connection, reconnecting, need to check what events trigger wakeup). + pollTimeout = 100; } // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; - List responses = this.client.poll(requests, result.nextReadyCheckDelayMs, now); + List responses = this.client.poll(requests, pollTimeout, now); for (ClientResponse response : responses) { if (response.wasDisconnected()) handleDisconnect(response, now); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 0762b35..b61448e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -14,6 +14,7 @@ package org.apache.kafka.clients.producer; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import java.nio.ByteBuffer; @@ -43,16 +44,20 @@ public class RecordAccumulatorTest { private String topic = "test"; private int partition1 = 0; private int partition2 = 1; - private Node node = new Node(0, "localhost", 1111); + private int partition3 = 2; + private Node node1 = new Node(0, "localhost", 1111); + private Node node2 = new Node(1, "localhost", 1112); private TopicPartition tp1 = new TopicPartition(topic, partition1); private TopicPartition tp2 = new TopicPartition(topic, partition2); - private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null); - private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null); + private TopicPartition tp3 = new TopicPartition(topic, partition3); + private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null); + private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null); + private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null); private MockTime time = new MockTime(); private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); - private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2)); + private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); @Test @@ -65,8 +70,8 @@ public class RecordAccumulatorTest { assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); - List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -83,7 +88,7 @@ public class RecordAccumulatorTest { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test @@ -93,8 +98,8 @@ public class RecordAccumulatorTest { accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); - List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -113,9 +118,9 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp, key, value, CompressionType.NONE, null); } - assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); + assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); - List batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id()); + List batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); } @@ -145,7 +150,7 @@ public class RecordAccumulatorTest { long now = time.milliseconds(); while (read < numThreads * msgs) { Set nodes = accum.ready(cluster, now).readyNodes; - List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id()); + List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id()); if (batches != null) { for (RecordBatch batch : batches) { for (LogEntry entry : batch.records) @@ -159,4 +164,39 @@ public class RecordAccumulatorTest { t.join(); } + + @Test + public void testNextReadyCheckDelay() throws Exception { + // Next check time will use lingerMs since this test won't trigger any retries/backoff + long lingerMs = 10L; + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); + // Just short of going over the limit so we trigger linger time + int appends = 1024 / msgSize; + + // Partition on node1 only + for (int i = 0; i < appends; i++) + accum.append(tp1, key, value, CompressionType.NONE, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); + + time.sleep(lingerMs / 2); + + // Add partition on node2 only + for (int i = 0; i < appends; i++) + accum.append(tp3, key, value, CompressionType.NONE, null); + result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); + + // Add data for another partition on node1, enough to make data sendable immediately + for (int i = 0; i < appends+1; i++) + accum.append(tp2, key, value, CompressionType.NONE, null); + result = accum.ready(cluster, time.milliseconds()); + assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); + // Note this can actually be < linger time because it may use delays from partitions that aren't sendable + // but are have leaders with other sendable data. + assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); + } + } -- 2.1.2 From 5a9877242c26e20c76702c273675e2742ca963cf Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 20 Oct 2014 13:39:32 -0700 Subject: [PATCH 2/3] KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. --- .../kafka/clients/ClusterConnectionStates.java | 21 +++++++++++++++++++++ .../java/org/apache/kafka/clients/KafkaClient.java | 10 ++++++++++ .../org/apache/kafka/clients/NetworkClient.java | 13 +++++++++++++ .../kafka/clients/producer/internals/Sender.java | 10 +++------- .../java/org/apache/kafka/clients/MockClient.java | 5 +++++ 5 files changed, 52 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index d304660..8aece7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -57,6 +57,27 @@ final class ClusterConnectionStates { } /** + * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When + * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled + * connections. + * @param node The node to check + * @param now The current time in ms + */ + public long connectionDelay(int node, long now) { + NodeConnectionState state = nodeState.get(node); + if (state == null) return 0; + long timeWaited = now - state.lastConnectAttemptMs; + if (state.state == ConnectionState.DISCONNECTED) { + return Math.max(this.reconnectBackoffMs - timeWaited, 0); + } + else { + // When connecting or connected, we should be able to delay indefinitely since other events (connection or + // data acked) will cause a wakeup once data can be sent. + return Long.MAX_VALUE; + } + } + + /** * Enter the connecting state for the given node. * @param node The id of the node we are connecting to * @param now The current time. diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 29658d4..3976955 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -41,6 +41,16 @@ public interface KafkaClient { public boolean ready(Node node, long now); /** + * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When + * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled + * connections. + * @param node The node to check + * @param now The current timestamp + * @return The number of milliseconds to wait. + */ + public long connectionDelay(Node node, long now); + + /** * Initiate the sending of the given requests and return any completed responses. Requests can only be sent on ready * connections. * @param requests The requests to send diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index eea270a..525b95e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -119,6 +119,19 @@ public class NetworkClient implements KafkaClient { } /** + * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When + * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled + * connections. + * @param node The node to check + * @param now The current timestamp + * @return The number of milliseconds to wait. + */ + @Override + public long connectionDelay(Node node, long now) { + return connectionStates.connectionDelay(node.id(), now); + } + + /** * Check if the node with the given id is ready to send more requests. * @param node The given node id * @param now The current time in ms diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 329ebad..84a7a07 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -151,13 +151,13 @@ public class Sender implements Runnable { this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to - boolean filteredNodes = false; Iterator iter = result.readyNodes.iterator(); + long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); - filteredNodes = true; + notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } @@ -170,15 +170,11 @@ public class Sender implements Runnable { // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. - long pollTimeout = result.nextReadyCheckDelayMs; + long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; - } else if (filteredNodes) { - // If we ended up filtering out all sendable nodes, make sure we wake up again. - // (Initial connection, reconnecting, need to check what events trigger wakeup). - pollTimeout = 100; } // if some partitions are already ready to be sent, the select time would be 0; diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index aae8d4a..47b5d4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -41,6 +41,11 @@ public class MockClient implements KafkaClient { return found; } + @Override + public long connectionDelay(Node node, long now) { + return 0; + } + public void disconnect(Integer node) { Iterator iter = requests.iterator(); while (iter.hasNext()) { -- 2.1.2 From 01d72707b4caff420d636cf89c455cc55456c9d6 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 23 Oct 2014 16:17:29 -0700 Subject: [PATCH 3/3] Addressing Jun's comments. --- .../main/java/org/apache/kafka/clients/ClusterConnectionStates.java | 3 ++- .../apache/kafka/clients/producer/internals/RecordAccumulator.java | 2 +- .../main/java/org/apache/kafka/clients/producer/internals/Sender.java | 4 ---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 8aece7e..4765439 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -65,7 +65,8 @@ final class ClusterConnectionStates { */ public long connectionDelay(int node, long now) { NodeConnectionState state = nodeState.get(node); - if (state == null) return 0; + if (state == null) + return 0; long timeWaited = now - state.lastConnectAttemptMs; if (state.state == ConnectionState.DISCONNECTED) { return Math.max(this.reconnectBackoffMs - timeWaited, 0); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index c15485d..f5bba4c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -219,7 +219,7 @@ public final class RecordAccumulator { long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = waitedTimeMs >= timeToWaitMs; + boolean expired = timeLeftMs <= 0; boolean sendable = full || expired || exhausted || closed; if (sendable && !backingOff) { readyNodes.add(leader); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 84a7a07..f2da04f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -177,10 +177,6 @@ public class Sender implements Runnable { pollTimeout = 0; } - // if some partitions are already ready to be sent, the select time would be 0; - // otherwise if some partition already has some data accumulated but not ready yet, - // the select time will be the time difference between now and its linger expiry time; - // otherwise the select time will be the time difference between now and the metadata expiry time; List responses = this.client.poll(requests, pollTimeout, now); for (ClientResponse response : responses) { if (response.wasDisconnected()) -- 2.1.2