From 4d75671553d86318e668907244a227cb4bd4dac4 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Fri, 17 Oct 2014 11:22:57 -0700 Subject: [PATCH] KAFKA-1642: Compute correct poll timeout when some nodes are not ready for sending or there are multiple partitions per node. Fixes two issues with computation of poll timeouts in Sender/RecordAccumulator. First, the timeout was being computed by RecordAccumulator as it looked up which nodes had data to send, but the timeout cannot be computed until after nodes that aren't ready for sending are filtered since this could result in a node that is currently unreachable always returning a timeout of 0 and triggering a busy loop. The fixed version computes per-node timeouts and only computes the final timeout after nodes that aren't ready for sending are removed. Second, timeouts were only being computed based on the first TopicAndPartition encountered for each node. This could result in incorrect timeouts if the first encountered didn't have the minimum timeout for that node. This now evaluates every TopicAndPartition with a known leader and takes the minimum. --- .../producer/internals/RecordAccumulator.java | 28 ++++++----- .../kafka/clients/producer/internals/Sender.java | 2 +- .../clients/producer/RecordAccumulatorTest.java | 55 +++++++++++++++++----- 3 files changed, 60 insertions(+), 25 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index c5d4700..d8b658c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -18,7 +18,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -198,8 +197,7 @@ public final class RecordAccumulator { * */ public ReadyCheckResult ready(Cluster cluster, long nowMs) { - Set readyNodes = new HashSet(); - long nextReadyCheckDelayMs = Long.MAX_VALUE; + Map readyNodes = new HashMap(); boolean unknownLeadersExist = false; boolean exhausted = this.free.queued() > 0; @@ -210,7 +208,7 @@ public final class RecordAccumulator { Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; - } else if (!readyNodes.contains(leader)) { + } else { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { @@ -221,15 +219,17 @@ public final class RecordAccumulator { boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= lingerMs; boolean sendable = full || expired || exhausted || closed; - if (sendable && !backingOff) - readyNodes.add(leader); - nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); + if (sendable && !backingOff) { + Long nextReadyCheckDelayMs = readyNodes.get(leader); + if (nextReadyCheckDelayMs == null) nextReadyCheckDelayMs = Long.MAX_VALUE; + readyNodes.put(leader, Math.min(timeLeftMs, nextReadyCheckDelayMs)); + } } } } } - return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); + return new ReadyCheckResult(readyNodes, unknownLeadersExist); } /** @@ -337,14 +337,18 @@ public final class RecordAccumulator { } public final static class ReadyCheckResult { + public final Map readyNodesNextCheckDelay; public final Set readyNodes; - public final long nextReadyCheckDelayMs; public final boolean unknownLeadersExist; - public ReadyCheckResult(Set readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) { - this.readyNodes = readyNodes; - this.nextReadyCheckDelayMs = nextReadyCheckDelayMs; + public ReadyCheckResult(Map readyNodesNextCheckDelay, boolean unknownLeadersExist) { + this.readyNodesNextCheckDelay = readyNodesNextCheckDelay; + this.readyNodes = this.readyNodesNextCheckDelay.keySet(); this.unknownLeadersExist = unknownLeadersExist; } + + public long nextReadyCheckDelayMs() { + return (this.readyNodesNextCheckDelay.isEmpty() ? Long.MAX_VALUE : Collections.min(this.readyNodesNextCheckDelay.values())); + } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8ebe7ed..aff5087 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -172,7 +172,7 @@ public class Sender implements Runnable { // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; - List responses = this.client.poll(requests, result.nextReadyCheckDelayMs, now); + List responses = this.client.poll(requests, result.nextReadyCheckDelayMs(), now); for (ClientResponse response : responses) { if (response.wasDisconnected()) handleDisconnect(response, now); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 0762b35..ff19f09 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -43,16 +43,20 @@ public class RecordAccumulatorTest { private String topic = "test"; private int partition1 = 0; private int partition2 = 1; - private Node node = new Node(0, "localhost", 1111); + private int partition3 = 2; + private Node node1 = new Node(0, "localhost", 1111); + private Node node2 = new Node(1, "localhost", 1112); private TopicPartition tp1 = new TopicPartition(topic, partition1); private TopicPartition tp2 = new TopicPartition(topic, partition2); - private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null); - private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null); + private TopicPartition tp3 = new TopicPartition(topic, partition3); + private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null); + private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null); + private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null); private MockTime time = new MockTime(); private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); - private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2)); + private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); @Test @@ -65,8 +69,8 @@ public class RecordAccumulatorTest { assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); - List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -83,7 +87,7 @@ public class RecordAccumulatorTest { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test @@ -93,8 +97,8 @@ public class RecordAccumulatorTest { accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); - List batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -113,9 +117,9 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp, key, value, CompressionType.NONE, null); } - assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); + assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); - List batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id()); + List batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); } @@ -145,7 +149,7 @@ public class RecordAccumulatorTest { long now = time.milliseconds(); while (read < numThreads * msgs) { Set nodes = accum.ready(cluster, now).readyNodes; - List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id()); + List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id()); if (batches != null) { for (RecordBatch batch : batches) { for (LogEntry entry : batch.records) @@ -159,4 +163,31 @@ public class RecordAccumulatorTest { t.join(); } + @Test + public void testNextReadyCheckDelay() throws Exception { + // Next check time will use lingerMs since this test won't trigger any retries/backoff + long lingerMs = 10L; + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); + int appends = 1024 / msgSize + 1; + + // Partition on node1 only + for (int i = 0; i < appends; i++) + accum.append(tp1, key, value, CompressionType.NONE, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("Only node1 should be ready.", 1, result.readyNodes.size()); + assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs()); + + time.sleep(lingerMs/2); + + // Add partition on node2 only + for (int i = 0; i < appends; i++) + accum.append(tp3, key, value, CompressionType.NONE, null); + result = accum.ready(cluster, time.milliseconds()); + assertEquals("Both nodes should be ready.", 2, result.readyNodes.size()); + assertEquals("Next check time should be defined by node1", lingerMs/2, result.nextReadyCheckDelayMs()); + + // Simulate a node that isn't ready for sending + result.readyNodes.remove(node1); + assertEquals("Removing a node should remove it from next ready check calculation", lingerMs, result.nextReadyCheckDelayMs()); + } } -- 2.1.2