From f1b623f9f354cd0b54d8ff42ba4cf375c8eba027 Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 24 Apr 2015 13:57:14 -0700 Subject: [PATCH 1/8] Patch for KAFKA-2142 --- .../org/apache/kafka/clients/NetworkClient.java | 4 +- .../producer/internals/RecordAccumulator.java | 130 ++++++++++++--------- .../clients/producer/internals/RecordBatch.java | 2 + .../kafka/clients/producer/internals/Sender.java | 50 +++----- .../producer/internals/RecordAccumulatorTest.java | 69 +++++------ clients/src/test/resources/log4j.properties | 1 - core/src/test/resources/log4j.properties | 4 +- 7 files changed, 130 insertions(+), 130 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index b7ae595..b8ed203 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -155,10 +155,12 @@ public class NetworkClient implements KafkaClient { @Override public boolean isReady(Node node, long now) { int nodeId = node.id(); - if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0) + if (!this.metadataFetchInProgress || this.metadata.timeToNextUpdate(now) == 0) { // if we need to update our metadata now declare all requests unready to make metadata requests first // priority + log.info("Node not ready! metadataFetchInProgress=" + this.metadataFetchInProgress); return false; + } else // otherwise we are ready if we are connected and can send more requests return isSendable(nodeId); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 49a9883..23a5e30 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -196,58 +196,89 @@ public final class RecordAccumulator { } /** - * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable - * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated - * partition batches. + * Get the time from now that at least one partition in the accumulator will be ready for sending. + * This time will be used to calculate the timeout for poll() in the sender thread. *

- * A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the - * following are true : + * All partitions will be considered as ready to send if: *

    - *
  1. The record set is full - *
  2. The record set has sat in the accumulator for at least lingerMs milliseconds *
  3. The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are * immediately considered ready). *
  4. The accumulator has been closed + *
  5. A flush is in progress. *
+ * If no partition has any batch, returns Long.Max_Value. */ - public ReadyCheckResult ready(Cluster cluster, long nowMs) { - Set readyNodes = new HashSet(); - long nextReadyCheckDelayMs = Long.MAX_VALUE; - boolean unknownLeadersExist = false; - - boolean exhausted = this.free.queued() > 0; + public long ready(Cluster cluster, long now) { + long nextDataReadyCheckDelayMs = Long.MAX_VALUE; for (Map.Entry> entry : this.batches.entrySet()) { - TopicPartition part = entry.getKey(); + TopicPartition tp = entry.getKey(); Deque deque = entry.getValue(); - - Node leader = cluster.leaderFor(part); - if (leader == null) { - unknownLeadersExist = true; - } else if (!readyNodes.contains(leader)) { - synchronized (deque) { - RecordBatch batch = deque.peekFirst(); - if (batch != null) { - boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; - long waitedTimeMs = nowMs - batch.lastAttemptMs; - long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; - long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); - boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = waitedTimeMs >= timeToWaitMs; - boolean sendable = full || expired || exhausted || closed || flushInProgress(); - if (sendable && !backingOff) { - readyNodes.add(leader); - } else { - // Note that this results in a conservative estimate since an un-sendable partition may have - // a leader that will later be found to have sendable data. However, this is good enough - // since we'll just wake up and then sleep again for the remaining time. - nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); - } - } - } + // If leader for partition is not available, ready time will be Long.MAX_VALUE. + if (cluster.leaderFor(tp) != null) { + long partitionNextReadyCheckDelayMs = partitionReady(deque, now); + nextDataReadyCheckDelayMs = Math.min(nextDataReadyCheckDelayMs, partitionNextReadyCheckDelayMs); } + // Save some iteration if we already know there is a ready batch + if (nextDataReadyCheckDelayMs == 0) + break; } + return nextDataReadyCheckDelayMs; + } - return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); + /** + * Get the time in milliseconds from now to when the partition will have at least one batch ready to send. + * If the partition does not have any batch. The time to ready is Long.Max_Value. Otherwise it will be the time + * before the first batch is ready. + * @param deque The deque corresponding to the partition. + * @param now current time + * @return The time to wait before the partition has at least one batch ready to send. + * Return Long.Max_Value if there is no batch for this partition. + */ + public long partitionReady(Deque deque, long now) { + long partitionNextReadyCheckDelayMs = Long.MAX_VALUE; + synchronized (deque) { + if (!deque.isEmpty()) + partitionNextReadyCheckDelayMs = batchReady(deque.peekFirst(), now); + } + return partitionNextReadyCheckDelayMs; + } + + /** + * Get the time in milliseconds of how long we should wait until the batch is ready. Return 0 if the batch is ready. + * The waiting time of batches will be one of the followings: + * 1. For non-retry batch + * 1.1 The batch is ready if it is full. + * 1.2 The batch is ready if flush is in progress + * 1.3 The batch is ready if it has sat in accumulator for more than linger.ms + * 1.4 The batch should wait until linger.ms + * 2. For retrying batch + * 2.1 The batch is ready if retry backoff time has passed. + * 2.2 The batch should wait until batch retry backoff time. + * + * + * @param batch The batch to check + * @param now current time + * @return The time to wait before the batch is ready. Return 0 if the batch is ready. + */ + public long batchReady(RecordBatch batch, long now) { + boolean exhausted = this.free.queued() > 0; + if (exhausted || closed || flushInProgress()) + return 0; + // Is record set full? + boolean full = batch.records.isFull(); + // Is it a retrying batch? + boolean retrying = batch.attempts > 0; + // Situation 1.1 and 1.2 + if (!retrying && (full || flushInProgress())) + return 0; + + // How long it should wait? + long timeToWaitMs = retrying ? retryBackoffMs : lingerMs; + // How long has it been waiting? + long waitedTimeMs = now - batch.lastAttemptMs; + // How long more should it wait? + long timeLeftMs = timeToWaitMs - waitedTimeMs; + return Math.max(timeLeftMs, 0); } /** @@ -290,6 +321,7 @@ public final class RecordAccumulator { Deque deque = dequeFor(new TopicPartition(part.topic(), part.partition())); if (deque != null) { synchronized (deque) { + assert false; RecordBatch first = deque.peekFirst(); if (first != null) { boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; @@ -313,7 +345,8 @@ public final class RecordAccumulator { } this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); - batches.put(node.id(), ready); + if (ready.size() > 0) + batches.put(node.id(), ready); } return batches; } @@ -382,21 +415,6 @@ public final class RecordAccumulator { this.newBatchCreated = newBatchCreated; } } - - /* - * The set of nodes that have at least one complete record batch in the accumulator - */ - public final static class ReadyCheckResult { - public final Set readyNodes; - public final long nextReadyCheckDelayMs; - public final boolean unknownLeadersExist; - - public ReadyCheckResult(Set readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) { - this.readyNodes = readyNodes; - this.nextReadyCheckDelayMs = nextReadyCheckDelayMs; - this.unknownLeadersExist = unknownLeadersExist; - } - } /* * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 06182db..8a2776f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -54,11 +54,13 @@ public final class RecordBatch { /** * Append the record to the current record set and return the relative offset within that record set + * Close the batch if no enough space is left. * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) { if (!this.records.hasRoomFor(key, value)) { + this.records.close(); return null; } else { this.records.append(0L, key, value); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index b2db91c..847f278 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -13,12 +13,7 @@ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; @@ -155,46 +150,39 @@ public class Sender implements Runnable { * The current POSIX time in milliseconds */ public void run(long now) { + // Check node readiness + Set readyNodes = new HashSet(); Cluster cluster = metadata.fetch(); - // get the list of partitions with data ready to send - RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); - - // if there are any partitions whose leaders are not known yet, force metadata update - if (result.unknownLeadersExist) - this.metadata.requestUpdate(); - - // remove any nodes we aren't ready to send to - Iterator iter = result.readyNodes.iterator(); - long notReadyTimeout = Long.MAX_VALUE; - while (iter.hasNext()) { - Node node = iter.next(); - if (!this.client.ready(node, now)) { - iter.remove(); - notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); - } + long nextNodeReadyCheckDelay = Long.MAX_VALUE; + for (Node node: cluster.nodes()) { + if (this.client.ready(node, now)) + readyNodes.add(node); + else + nextNodeReadyCheckDelay = Math.min(nextNodeReadyCheckDelay, this.client.connectionDelay(node, now)); } - + log.info("ready nodes:" + readyNodes); + long beforeDrain = this.accumulator.ready(cluster, now); // create produce requests Map> batches = this.accumulator.drain(cluster, - result.readyNodes, + readyNodes, this.maxRequestSize, now); + if (beforeDrain == 0) + assert batches.size() > 0; sensors.updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); + // When should we start next drain? + long nextDrainDelayMs = this.accumulator.ready(cluster, now); + log.info("NextReadyTime = " + nextDrainDelayMs + ", next node ready check = " + nextNodeReadyCheckDelay); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. - long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); - if (result.readyNodes.size() > 0) { - log.trace("Nodes with data ready to send: {}", result.readyNodes); - log.trace("Created {} produce requests: {}", requests.size(), requests); - pollTimeout = 0; - } + long pollTimeout = Math.min(nextDrainDelayMs, nextNodeReadyCheckDelay); + for (ClientRequest request : requests) client.send(request); - // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index baa48e7..4fb5ceb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -13,19 +13,11 @@ package org.apache.kafka.clients.producer.internals; import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; +//import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -68,10 +60,10 @@ public class RecordAccumulatorTest { int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, null); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); + assertTrue("No partitions should be ready.", accum.ready(cluster, now) > 0); } accum.append(tp1, key, value, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, time.milliseconds())); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); @@ -90,18 +82,22 @@ public class RecordAccumulatorTest { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, time.milliseconds())); } @Test public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + long beforeAppend = time.milliseconds(); accum.append(tp1, key, value, null); - assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + long afterAppend = time.milliseconds(); + long readyTime = accum.ready(cluster, afterAppend); + assertTrue("No partitions should be ready", + readyTime >= lingerMs - (afterAppend - beforeAppend) && readyTime <= lingerMs); time.sleep(10); - assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); - List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); + assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, time.milliseconds())); + List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds()).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); batch.records.flip(); @@ -121,7 +117,7 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp, key, value, null); } - assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + assertEquals("Partition's leader should be ready", 0, accum.ready(cluster, time.milliseconds())); List batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); @@ -152,9 +148,9 @@ public class RecordAccumulatorTest { t.start(); int read = 0; long now = time.milliseconds(); + Set readyNodes = new HashSet(cluster.nodes()); while (read < numThreads * msgs) { - Set nodes = accum.ready(cluster, now).readyNodes; - List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id()); + List batches = accum.drain(cluster, readyNodes, 5 * 1024, time.milliseconds()).get(node1.id()); if (batches != null) { for (RecordBatch batch : batches) { batch.records.flip(); @@ -169,7 +165,6 @@ public class RecordAccumulatorTest { t.join(); } - @Test public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff @@ -181,27 +176,23 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) accum.append(tp1, key, value, null); - RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); - assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); + assertEquals("Next check time should be the linger time", lingerMs, accum.ready(cluster, time.milliseconds())); time.sleep(lingerMs / 2); // Add partition on node2 only for (int i = 0; i < appends; i++) accum.append(tp3, key, value, null); - result = accum.ready(cluster, time.milliseconds()); - assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); + assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, accum.ready(cluster, time.milliseconds())); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) accum.append(tp2, key, value, null); - result = accum.ready(cluster, time.milliseconds()); - assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); - // Note this can actually be < linger time because it may use delays from partitions that aren't sendable - // but have leaders with other sendable data. - assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); + assertEquals("Node1 should be ready", 0, accum.ready(cluster, time.milliseconds())); + Set readyNodes = new HashSet(cluster.nodes()); + accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + // After drain the messages from tp2, now the next check delay should be determined by tp1 again. + assertTrue("Next check time should be defined by node2, at most linger time", accum.ready(cluster, time.milliseconds()) <= lingerMs / 2); } @Test @@ -248,18 +239,18 @@ public class RecordAccumulatorTest { final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) accum.append(new TopicPartition(topic, i % 3), key, value, null); - RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); - assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - + assertTrue("No partition should be ready.", accum.ready(cluster, time.milliseconds()) > 0); + accum.beginFlush(); - result = accum.ready(cluster, time.milliseconds()); - + assertEquals("All partitions should be ready", 0, accum.ready(cluster, time.milliseconds())); + // drain and deallocate all batches - Map> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Set readyNodes = new HashSet(cluster.nodes()); + Map> results = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); for (List batches: results.values()) for (RecordBatch batch: batches) accum.deallocate(batch); - + // should be complete with no unsent records. accum.awaitFlushCompletion(); assertFalse(accum.hasUnsent()); diff --git a/clients/src/test/resources/log4j.properties b/clients/src/test/resources/log4j.properties index b1d5b7f..6f48238 100644 --- a/clients/src/test/resources/log4j.properties +++ b/clients/src/test/resources/log4j.properties @@ -18,4 +18,3 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.org.apache.kafka=ERROR diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index 1b7d5d8..bb3c2b7 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.kafka=ERROR -log4j.logger.org.apache.kafka=ERROR +log4j.logger.org.apache.kafka=DEBUG, stdout # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN -- 1.8.3.4 (Apple Git-47) From 8edbfb094cacf5bd70e0e84cbee46131aa0ebbd6 Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 24 Apr 2015 18:33:49 -0700 Subject: [PATCH 2/8] Patch for KAFKA-2142 --- core/src/test/resources/log4j.properties | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index bb3c2b7..d7d03ea 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,14 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=INFO, stdout +log4j.rootLogger=OFF, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.kafka=ERROR -log4j.logger.org.apache.kafka=DEBUG, stdout # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN -- 1.8.3.4 (Apple Git-47) From 2b98ccfb68a304b156b61f11c392060dfb236246 Mon Sep 17 00:00:00 2001 From: jqin Date: Sat, 25 Apr 2015 10:42:03 -0700 Subject: [PATCH 3/8] Patch for KAFAK-2142 --- .../java/org/apache/kafka/clients/Metadata.java | 12 +- .../org/apache/kafka/clients/NetworkClient.java | 7 +- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../producer/internals/RecordAccumulator.java | 197 ++++++++++++++------- .../kafka/clients/producer/internals/Sender.java | 24 +-- .../org/apache/kafka/clients/MetadataTest.java | 6 +- .../apache/kafka/clients/NetworkClientTest.java | 2 +- .../consumer/internals/CoordinatorTest.java | 7 +- .../clients/consumer/internals/FetcherTest.java | 7 +- .../producer/internals/RecordAccumulatorTest.java | 98 +++++----- .../clients/producer/internals/SenderTest.java | 2 +- 12 files changed, 224 insertions(+), 142 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 07f1cdb..643a7d7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -34,11 +35,12 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; + private final Set topics; private int version; private long lastRefreshMs; private Cluster cluster; private boolean needUpdate; - private final Set topics; + private Set errorTopics; /** * Create a metadata instance with reasonable defaults @@ -61,6 +63,7 @@ public final class Metadata { this.cluster = Cluster.empty(); this.needUpdate = false; this.topics = new HashSet(); + this.errorTopics = Collections.emptySet(); } /** @@ -143,11 +146,12 @@ public final class Metadata { /** * Update the cluster metadata */ - public synchronized void update(Cluster cluster, long now) { + public synchronized void update(Cluster cluster, Set errorTopics, long now) { this.needUpdate = false; this.lastRefreshMs = now; this.version += 1; this.cluster = cluster; + this.errorTopics = errorTopics; notifyAll(); log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } @@ -180,4 +184,8 @@ public final class Metadata { public long refreshBackoff() { return refreshBackoffMs; } + + public synchronized Set errorTopics() { + return errorTopics; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index b8ed203..301b290 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -155,12 +155,10 @@ public class NetworkClient implements KafkaClient { @Override public boolean isReady(Node node, long now) { int nodeId = node.id(); - if (!this.metadataFetchInProgress || this.metadata.timeToNextUpdate(now) == 0) { + if (this.metadataFetchInProgress || this.metadata.timeToNextUpdate(now) == 0) // if we need to update our metadata now declare all requests unready to make metadata requests first // priority - log.info("Node not ready! metadataFetchInProgress=" + this.metadataFetchInProgress); return false; - } else // otherwise we are ready if we are connected and can send more requests return isSendable(nodeId); @@ -393,10 +391,11 @@ public class NetworkClient implements KafkaClient { this.metadataFetchInProgress = false; MetadataResponse response = new MetadataResponse(body); Cluster cluster = response.cluster(); + Set errorTopics = response.errors().keySet(); // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) { - this.metadata.update(cluster, now); + this.metadata.update(cluster, errorTopics, now); } else { log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); this.metadata.failedUpdate(now); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d301be4..5b10415 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -467,7 +467,7 @@ public class KafkaConsumer implements Consumer { this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), 0); + this.metadata.update(Cluster.bootstrap(addresses), null, 0); String metricGrpPrefix = "consumer"; Map metricsTags = new LinkedHashMap(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 42b1292..aae3b59 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -226,7 +226,7 @@ public class KafkaProducer implements Producer { time, metricTags); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); + this.metadata.update(Cluster.bootstrap(addresses), null, time.milliseconds()); NetworkClient client = new NetworkClient(new Selector(this.metrics, time, "producer", metricTags), this.metadata, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 23a5e30..0ab527f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -65,6 +65,7 @@ public final class RecordAccumulator { private final BufferPool free; private final Time time; private final ConcurrentMap> batches; + private final ConcurrentMap, Long>> batchesByTopic; private final IncompleteRecordBatches incomplete; /** @@ -101,6 +102,7 @@ public final class RecordAccumulator { this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); + this.batchesByTopic = new CopyOnWriteMap, Long>>(); String metricGrpName = "producer-metrics"; this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); this.incomplete = new IncompleteRecordBatches(); @@ -196,31 +198,26 @@ public final class RecordAccumulator { } /** - * Get the time from now that at least one partition in the accumulator will be ready for sending. + * Get the time from now that at least one partition for ready nodes in the accumulator will be ready for sending. * This time will be used to calculate the timeout for poll() in the sender thread. - *

- * All partitions will be considered as ready to send if: - *

    - *
  1. The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are - * immediately considered ready). - *
  2. The accumulator has been closed - *
  3. A flush is in progress. - *
+ * * If no partition has any batch, returns Long.Max_Value. */ - public long ready(Cluster cluster, long now) { + public long ready(Cluster cluster, Set readyNodes, long now) { long nextDataReadyCheckDelayMs = Long.MAX_VALUE; - for (Map.Entry> entry : this.batches.entrySet()) { - TopicPartition tp = entry.getKey(); - Deque deque = entry.getValue(); - // If leader for partition is not available, ready time will be Long.MAX_VALUE. - if (cluster.leaderFor(tp) != null) { - long partitionNextReadyCheckDelayMs = partitionReady(deque, now); - nextDataReadyCheckDelayMs = Math.min(nextDataReadyCheckDelayMs, partitionNextReadyCheckDelayMs); + // Only check the data readiness for the ready nodes. + for (Node node: readyNodes) { + for (PartitionInfo part: cluster.partitionsForNode(node.id())) { + TopicPartition tp = new TopicPartition(part.topic(), part.partition()); + // If leader of the partition is not available, ready time will be Long.MAX_VALUE. + if (cluster.leaderFor(tp) != null) { + long partitionNextReadyCheckDelayMs = partitionReady(tp, now); + nextDataReadyCheckDelayMs = Math.min(nextDataReadyCheckDelayMs, partitionNextReadyCheckDelayMs); + } + // Save some iterations if we already know there is a ready batch + if (nextDataReadyCheckDelayMs == 0) + return nextDataReadyCheckDelayMs; } - // Save some iteration if we already know there is a ready batch - if (nextDataReadyCheckDelayMs == 0) - break; } return nextDataReadyCheckDelayMs; } @@ -229,48 +226,57 @@ public final class RecordAccumulator { * Get the time in milliseconds from now to when the partition will have at least one batch ready to send. * If the partition does not have any batch. The time to ready is Long.Max_Value. Otherwise it will be the time * before the first batch is ready. - * @param deque The deque corresponding to the partition. + * @param tp The partition to check * @param now current time * @return The time to wait before the partition has at least one batch ready to send. * Return Long.Max_Value if there is no batch for this partition. */ - public long partitionReady(Deque deque, long now) { + public long partitionReady(TopicPartition tp, long now) { long partitionNextReadyCheckDelayMs = Long.MAX_VALUE; - synchronized (deque) { - if (!deque.isEmpty()) - partitionNextReadyCheckDelayMs = batchReady(deque.peekFirst(), now); + Deque deque = batches.get(tp); + if (deque != null) { + synchronized (deque) { + if (!deque.isEmpty()) + partitionNextReadyCheckDelayMs = batchReady(deque.peekFirst(), now); + } } return partitionNextReadyCheckDelayMs; } /** * Get the time in milliseconds of how long we should wait until the batch is ready. Return 0 if the batch is ready. - * The waiting time of batches will be one of the followings: - * 1. For non-retry batch - * 1.1 The batch is ready if it is full. - * 1.2 The batch is ready if flush is in progress - * 1.3 The batch is ready if it has sat in accumulator for more than linger.ms - * 1.4 The batch should wait until linger.ms - * 2. For retrying batch - * 2.1 The batch is ready if retry backoff time has passed. - * 2.2 The batch should wait until batch retry backoff time. + *

+ * All non-retrying bathes will be considered as ready to send if: + *

    + *
  1. The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are + * immediately considered ready). + *
  2. The accumulator has been closed + *
  3. A flush is in progress. + *
* + *

+ * The waiting time of batches will be one of the followings: + *

    + *
  1. A non-retry batch is ready if it is full. + *
  2. A non-retry batch is ready if flush is in progress + *
  3. A non-retry batch is ready if it has sat in accumulator for more than linger.ms + *
  4. A non-retry batch should wait until linger.ms + *
  5. A retrying batch is ready if retry backoff time has passed. + *
  6. A retrying batch should wait until batch retry backoff time. *
* @param batch The batch to check * @param now current time * @return The time to wait before the batch is ready. Return 0 if the batch is ready. */ public long batchReady(RecordBatch batch, long now) { - boolean exhausted = this.free.queued() > 0; - if (exhausted || closed || flushInProgress()) - return 0; - // Is record set full? - boolean full = batch.records.isFull(); - // Is it a retrying batch? boolean retrying = batch.attempts > 0; - // Situation 1.1 and 1.2 - if (!retrying && (full || flushInProgress())) - return 0; + // Is it a retrying batch? + if (!retrying) { + boolean exhausted = this.free.queued() > 0; + boolean full = batch.records.isFull(); + if (exhausted || closed || full || flushInProgress()) + return 0; + } // How long it should wait? long timeToWaitMs = retrying ? retryBackoffMs : lingerMs; @@ -298,47 +304,70 @@ public final class RecordAccumulator { /** * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. + * + * This method will also check if there is data for the partitions whose metadata is missing. If so, + * we return a {@link DrainResult} with no data and unknownLeaderExist = true to trigger a metadata update. * * @param cluster The current cluster metadata + * @param errorTopics The topics with error in their metadata * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain * @param now The current unix time in milliseconds - * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. + * @return {@link DrainResult} */ - public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) { - if (nodes.isEmpty()) - return Collections.emptyMap(); + public DrainResult drain(Cluster cluster, Set errorTopics, Set nodes, int maxSize, long now) { + if (nodes.isEmpty()) { + Map> batches = Collections.emptyMap(); + return new DrainResult(false, batches); + } + + // Now let's check the error topics to see if there is a partition who does not have leader but has data. + // Metadata only needs to be updated if we have data to send but don't know where we should send it. + if (errorTopics != null) { + for (String topic : errorTopics) { + for (Deque deque : dequesForTopic(topic).keySet()) { + synchronized (deque) { + if (deque.peekFirst() != null) { + Map> batches = Collections.emptyMap(); + return new DrainResult(true, batches); + } + } + } + } + } Map> batches = new HashMap>(); for (Node node : nodes) { int size = 0; List parts = cluster.partitionsForNode(node.id()); + // The node has no partition + if (parts.isEmpty()) + continue; + List ready = new ArrayList(); /* to make starvation less likely this loop doesn't start at 0 */ int start = drainIndex = drainIndex % parts.size(); do { PartitionInfo part = parts.get(drainIndex); - Deque deque = dequeFor(new TopicPartition(part.topic(), part.partition())); + TopicPartition tp = new TopicPartition(part.topic(), part.partition()); + Deque deque = dequeFor(tp); if (deque != null) { synchronized (deque) { - assert false; + updateLastLeaderAvailableTime(tp, now); RecordBatch first = deque.peekFirst(); - if (first != null) { - boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; - // Only drain the batch if it is not during backoff period. - if (!backoff) { - if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { - // there is a rare case that a single batch size is larger than the request size due - // to compression; in this case we will still eventually send this batch in a single - // request - break; - } else { - RecordBatch batch = deque.pollFirst(); - batch.records.close(); - size += batch.records.sizeInBytes(); - ready.add(batch); - batch.drainedMs = now; - } + // only send the batch if the batch is ready + if (first != null && batchReady(first, now) == 0) { + if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { + // there is a rare case that a single batch size is larger than the request size due + // to compression; in this case we will still eventually send this batch in a single + // request + break; + } else { + RecordBatch batch = deque.pollFirst(); + batch.records.close(); + size += batch.records.sizeInBytes(); + ready.add(batch); + batch.drainedMs = now; } } } @@ -348,19 +377,51 @@ public final class RecordAccumulator { if (ready.size() > 0) batches.put(node.id(), ready); } - return batches; + + return new DrainResult(false, batches); + } + + /** + * A container class that holds the following information. + *
    + *
  1. A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. + *
  2. Whether unknown leader exist. + *
+ */ + public class DrainResult { + public final boolean unknownLeaderExist; + public final Map> batches; + + public DrainResult(boolean unknownLeaderExist, Map> batches) { + this.unknownLeaderExist = unknownLeaderExist; + this.batches = batches; + } } /** * Get the deque for the given topic-partition, creating it if necessary. Since new topics will only be added rarely - * we copy-on-write the hashmap + * we copy-on-write the hashmap. */ private Deque dequeFor(TopicPartition tp) { Deque d = this.batches.get(tp); if (d != null) return d; this.batches.putIfAbsent(tp, new ArrayDeque()); - return this.batches.get(tp); + + Deque deque = this.batches.get(tp); + this.batchesByTopic.putIfAbsent(tp.topic(), new CopyOnWriteMap, Long>()); + dequesForTopic(tp.topic()).putIfAbsent(deque, time.milliseconds()); + + return deque; + } + + private ConcurrentMap, Long> dequesForTopic(String topic) { + return this.batchesByTopic.get(topic); + } + + private void updateLastLeaderAvailableTime(TopicPartition tp, long now) { + Deque deque = dequeFor(tp); + dequesForTopic(tp.topic()).replace(deque, now); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 847f278..c8e7a91 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -144,13 +144,13 @@ public class Sender implements Runnable { } /** - * Run a single iteration of sending + * Run a single iteration of sending. * * @param now * The current POSIX time in milliseconds */ public void run(long now) { - // Check node readiness + // Check node readiness and get the next node ready check delay Set readyNodes = new HashSet(); Cluster cluster = metadata.fetch(); long nextNodeReadyCheckDelay = Long.MAX_VALUE; @@ -160,23 +160,25 @@ public class Sender implements Runnable { else nextNodeReadyCheckDelay = Math.min(nextNodeReadyCheckDelay, this.client.connectionDelay(node, now)); } - log.info("ready nodes:" + readyNodes); - long beforeDrain = this.accumulator.ready(cluster, now); + log.trace("Ready nodes: " + readyNodes); // create produce requests - Map> batches = this.accumulator.drain(cluster, + RecordAccumulator.DrainResult drainResult = this.accumulator.drain(cluster, + this.metadata.errorTopics(), readyNodes, this.maxRequestSize, now); - if (beforeDrain == 0) - assert batches.size() > 0; + Map> batches = drainResult.batches; + log.trace("Drained batches for {} nodes", batches.size()); + if (drainResult.unknownLeaderExist) + this.metadata.requestUpdate(); sensors.updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); // When should we start next drain? - long nextDrainDelayMs = this.accumulator.ready(cluster, now); - log.info("NextReadyTime = " + nextDrainDelayMs + ", next node ready check = " + nextNodeReadyCheckDelay); - // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately - // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data + //If we have any nodes that are ready to send + have sendable data, poll time will be 0 + long nextDrainDelayMs = this.accumulator.ready(cluster, readyNodes, now); + log.trace("Drain data in = " + nextDrainDelayMs + "ms, check node ready in = " + nextNodeReadyCheckDelay); + // Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(nextDrainDelayMs, nextNodeReadyCheckDelay); diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 928087d..9d4f15c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -37,7 +37,7 @@ public class MetadataTest { @Test public void testMetadata() throws Exception { long time = 0; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), null, time); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); metadata.requestUpdate(); assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0); @@ -48,7 +48,7 @@ public class MetadataTest { Thread t2 = asyncFetch(topic); assertTrue("Awaiting update", t1.isAlive()); assertTrue("Awaiting update", t2.isAlive()); - metadata.update(TestUtils.singletonCluster(topic, 1), time); + metadata.update(TestUtils.singletonCluster(topic, 1), null, time); t1.join(); t2.join(); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); @@ -66,7 +66,7 @@ public class MetadataTest { @Test public void testMetadataUpdateWaitTime() throws Exception { long time = 0; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), null, time); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); // first try with a max wait time of 0 and ensure that this returns back without waiting forever try { diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 8b27889..9fa0596 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -55,7 +55,7 @@ public class NetworkClientTest { @Before public void setup() { - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, null, time.milliseconds()); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index b06c4a7..9b31c3a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -35,10 +35,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.junit.Before; import org.junit.Test; @@ -75,7 +72,7 @@ public class CoordinatorTest { @Before public void setup() { - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, null, time.milliseconds()); client.setNode(node); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4195410..7b26619 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -36,10 +36,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.junit.Before; import org.junit.Test; @@ -83,7 +80,7 @@ public class FetcherTest { @Before public void setup() throws Exception { - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, null, time.milliseconds()); client.setNode(node); records.append(1L, "key".getBytes(), "value-1".getBytes()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 4fb5ceb..dfd496a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -60,11 +60,11 @@ public class RecordAccumulatorTest { int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, null); - assertTrue("No partitions should be ready.", accum.ready(cluster, now) > 0); + assertTrue("No partitions should be ready.", accum.ready(cluster, Collections.singleton(node1), now) > 0); } accum.append(tp1, key, value, null); - assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, time.milliseconds())); - List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); + assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, Collections.singleton(node1), time.milliseconds())); + List batches = accum.drain(cluster, null, Collections.singleton(node1), Integer.MAX_VALUE, 0).batches.get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); batch.records.flip(); @@ -82,7 +82,7 @@ public class RecordAccumulatorTest { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], null); - assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, time.milliseconds())); + assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, Collections.singleton(node1), time.milliseconds())); } @Test @@ -92,12 +92,12 @@ public class RecordAccumulatorTest { long beforeAppend = time.milliseconds(); accum.append(tp1, key, value, null); long afterAppend = time.milliseconds(); - long readyTime = accum.ready(cluster, afterAppend); + long readyTime = accum.ready(cluster, Collections.singleton(node1), afterAppend); assertTrue("No partitions should be ready", readyTime >= lingerMs - (afterAppend - beforeAppend) && readyTime <= lingerMs); time.sleep(10); - assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, time.milliseconds())); - List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds()).get(node1.id()); + assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, Collections.singleton(node1), time.milliseconds())); + List batches = accum.drain(cluster, null, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds()).batches.get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); batch.records.flip(); @@ -117,9 +117,9 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp, key, value, null); } - assertEquals("Partition's leader should be ready", 0, accum.ready(cluster, time.milliseconds())); + assertEquals("Partition's leader should be ready", 0, accum.ready(cluster, Collections.singleton(node1), time.milliseconds())); - List batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id()); + List batches = accum.drain(cluster, null, Collections.singleton(node1), 1024, 0).batches.get(node1.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); } @@ -150,7 +150,7 @@ public class RecordAccumulatorTest { long now = time.milliseconds(); Set readyNodes = new HashSet(cluster.nodes()); while (read < numThreads * msgs) { - List batches = accum.drain(cluster, readyNodes, 5 * 1024, time.milliseconds()).get(node1.id()); + List batches = accum.drain(cluster, null, readyNodes, 5 * 1024, time.milliseconds()).batches.get(node1.id()); if (batches != null) { for (RecordBatch batch : batches) { batch.records.flip(); @@ -169,84 +169,102 @@ public class RecordAccumulatorTest { public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; + Set readyNodes = new HashSet(cluster.nodes()); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; - // Partition on node1 only for (int i = 0; i < appends; i++) accum.append(tp1, key, value, null); - assertEquals("Next check time should be the linger time", lingerMs, accum.ready(cluster, time.milliseconds())); + assertEquals("Next check time should be the linger time", lingerMs, accum.ready(cluster, readyNodes, time.milliseconds())); time.sleep(lingerMs / 2); // Add partition on node2 only for (int i = 0; i < appends; i++) accum.append(tp3, key, value, null); - assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, accum.ready(cluster, time.milliseconds())); + assertEquals("Next check time should be defined by node1, half remaining linger time", + lingerMs / 2, accum.ready(cluster, readyNodes, time.milliseconds())); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) accum.append(tp2, key, value, null); - assertEquals("Node1 should be ready", 0, accum.ready(cluster, time.milliseconds())); - Set readyNodes = new HashSet(cluster.nodes()); - accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("Node1 should be ready", 0, accum.ready(cluster, readyNodes, time.milliseconds())); + accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, time.milliseconds()); // After drain the messages from tp2, now the next check delay should be determined by tp1 again. - assertTrue("Next check time should be defined by node2, at most linger time", accum.ready(cluster, time.milliseconds()) <= lingerMs / 2); + assertTrue("Next check time should be defined by node2, at most linger time", + accum.ready(cluster, readyNodes, time.milliseconds()) <= lingerMs / 2); + assertTrue("If only node1 is ready, the next ready check time should be less than lingerMs/2.", + accum.ready(cluster, Collections.singleton(node1), time.milliseconds()) <= lingerMs / 2); + long readyTime = accum.ready(cluster, Collections.singleton(node2), time.milliseconds()); + assertTrue("If only node2 is ready, the next ready check time should be between lingerMs/2 and lingerMs", + lingerMs / 2 <= readyTime && readyTime <= lingerMs); } - + @Test public void testRetryBackoff() throws Exception { long lingerMs = Long.MAX_VALUE / 4; long retryBackoffMs = Long.MAX_VALUE / 2; final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags); - + Set readyNodes = Collections.singleton(node1); long now = time.milliseconds(); accum.append(tp1, key, value, null); - RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); - assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); - Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); - assertEquals("Node1 should be the only ready node.", 1, batches.size()); - assertEquals("Partition 0 should only have one batch drained.", 1, batches.get(0).size()); + assertEquals("Our batch should be ready", 0, accum.ready(cluster, readyNodes, now + lingerMs + 1)); + Map> batches = accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, now + lingerMs + 1).batches; + assertEquals("Our batch should get drained.", 1, batches.size()); + assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); // Reenqueue the batch now = time.milliseconds(); accum.reenqueue(batches.get(0).get(0), now); + assertEquals("Our batch for tp1 should back off.", retryBackoffMs, accum.ready(cluster, readyNodes, now)); - // Put message for partition 1 into accumulator + // Put message for tp2 into accumulator accum.append(tp2, key, value, null); - result = accum.ready(cluster, now + lingerMs + 1); - assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); + assertEquals("Our batch for tp2 should be ready", 0, accum.ready(cluster, readyNodes, now + lingerMs + 1)); // tp1 should backoff while tp2 should not - batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); - assertEquals("Node1 should be the only ready node.", 1, batches.size()); + batches = accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, now + lingerMs + 1).batches; + assertEquals("Node1 should be the only node having data.", 1, batches.size()); assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); - assertEquals("Node1 should only have one batch for partition 1.", tp2, batches.get(0).get(0).topicPartition); + assertEquals("Node1 should only have one batch for tp2.", tp2, batches.get(0).get(0).topicPartition); // Partition 0 can be drained after retry backoff - result = accum.ready(cluster, now + retryBackoffMs + 1); - assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); - batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1); - assertEquals("Node1 should be the only ready node.", 1, batches.size()); + assertEquals("Node1 should be ready", 0, accum.ready(cluster, readyNodes, now + retryBackoffMs + 1)); + batches = accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1).batches; + assertEquals("Node1 should be the only node having data.", 1, batches.size()); assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); - assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition); + assertEquals("Node1 should only have one batch for tp1.", tp1, batches.get(0).get(0).topicPartition); } - + + @Test + public void unknownLeaderExistTest() throws Exception { + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + long now = time.milliseconds(); + accum.append(tp1, key, value, null); + assertEquals("Our batch for tp1 should be ready", + 0, accum.ready(cluster, Collections.singleton(node1), now + 10L)); + Cluster clusterWithUnknownLeader = new Cluster(asList(node1, node2), asList(part2, part3)); + Set errorTopics = new HashSet(asList(tp1.topic())); + RecordAccumulator.DrainResult result = accum.drain(clusterWithUnknownLeader, errorTopics, Collections.singleton(node1), Integer.MAX_VALUE, now + 10L); + assertTrue("unknownLeaderExist should be true.", result.unknownLeaderExist); + assertTrue("No batch should be drained.", result.batches.isEmpty()); + } + @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; + Set readyNodes = new HashSet(cluster.nodes()); final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) accum.append(new TopicPartition(topic, i % 3), key, value, null); - assertTrue("No partition should be ready.", accum.ready(cluster, time.milliseconds()) > 0); + assertTrue("No partition should be ready.", accum.ready(cluster, readyNodes, time.milliseconds()) > 0); accum.beginFlush(); - assertEquals("All partitions should be ready", 0, accum.ready(cluster, time.milliseconds())); + assertEquals("All partitions should be ready", 0, accum.ready(cluster, readyNodes, time.milliseconds())); // drain and deallocate all batches - Set readyNodes = new HashSet(cluster.nodes()); - Map> results = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Map> results = accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, time.milliseconds()).batches; for (List batches: results.values()) for (RecordBatch batch: batches) accum.deallocate(batch); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 8b1805d..e23ab69 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -66,7 +66,7 @@ public class SenderTest { @Before public void setup() { - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, null, time.milliseconds()); } @Test -- 1.8.3.4 (Apple Git-47) From d980920585cd0fabec4166c6522b3dff26137c6f Mon Sep 17 00:00:00 2001 From: jqin Date: Sat, 25 Apr 2015 11:12:20 -0700 Subject: [PATCH 4/8] Minor change in comments --- .../clients/producer/internals/RecordAccumulator.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 0ab527f..da86b8a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -198,10 +198,16 @@ public final class RecordAccumulator { } /** - * Get the time from now that at least one partition for ready nodes in the accumulator will be ready for sending. - * This time will be used to calculate the timeout for poll() in the sender thread. - * + * Get the time in milliseconds from now to when at least one partition in the accumulator will be ready for sending. + * This time will be used to calculate the timeout for poll() in the sender thread. We only check the partitions + * for ready nodes because if the node is not ready, waiting time should be determined by node ready time instead + * of data ready time. + *

* If no partition has any batch, returns Long.Max_Value. + * @param cluster The current cluster metadata + * @param readyNodes The nodes ready to send data + * @param now current time + * @return Time to wait until next batch is ready for sending */ public long ready(Cluster cluster, Set readyNodes, long now) { long nextDataReadyCheckDelayMs = Long.MAX_VALUE; @@ -246,7 +252,7 @@ public final class RecordAccumulator { /** * Get the time in milliseconds of how long we should wait until the batch is ready. Return 0 if the batch is ready. *

- * All non-retrying bathes will be considered as ready to send if: + * All non-retry bathes will be considered as ready to send if: *

    *
  1. The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are * immediately considered ready). @@ -255,10 +261,8 @@ public final class RecordAccumulator { *
* *

- * The waiting time of batches will be one of the followings: + * The waiting time of batches is calculated by following rules: *

    - *
  1. A non-retry batch is ready if it is full. - *
  2. A non-retry batch is ready if flush is in progress *
  3. A non-retry batch is ready if it has sat in accumulator for more than linger.ms *
  4. A non-retry batch should wait until linger.ms *
  5. A retrying batch is ready if retry backoff time has passed. -- 1.8.3.4 (Apple Git-47) From 6ee859bf7032d8562333c42cebe9a0cab001ea04 Mon Sep 17 00:00:00 2001 From: jqin Date: Sat, 25 Apr 2015 11:47:40 -0700 Subject: [PATCH 5/8] Fix Null pointer --- .../kafka/clients/producer/internals/RecordAccumulator.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index da86b8a..c4ae09d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -329,11 +329,14 @@ public final class RecordAccumulator { // Metadata only needs to be updated if we have data to send but don't know where we should send it. if (errorTopics != null) { for (String topic : errorTopics) { - for (Deque deque : dequesForTopic(topic).keySet()) { - synchronized (deque) { - if (deque.peekFirst() != null) { - Map> batches = Collections.emptyMap(); - return new DrainResult(true, batches); + Map, Long> deques = dequesForTopic(topic); + if (deques != null) { + for (Deque deque : dequesForTopic(topic).keySet()) { + synchronized (deque) { + if (deque.peekFirst() != null) { + Map> batches = Collections.emptyMap(); + return new DrainResult(true, batches); + } } } } -- 1.8.3.4 (Apple Git-47) From 1e599cb4f9af35533a44c0b0071d608775921c17 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 29 Apr 2015 13:23:32 -0700 Subject: [PATCH 6/8] Add fix to KAFKA-1788 --- .../kafka/clients/producer/KafkaProducer.java | 1 + .../producer/internals/RecordAccumulator.java | 48 +++++++++++++++++----- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index aae3b59..f8e5898 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -221,6 +221,7 @@ public class KafkaProducer implements Producer { this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, + this.metadataFetchTimeoutMs, config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index c4ae09d..3e558d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -62,6 +63,7 @@ public final class RecordAccumulator { private final CompressionType compression; private final long lingerMs; private final long retryBackoffMs; + private final long metadataFetchTimeout; private final BufferPool free; private final Time time; private final ConcurrentMap> batches; @@ -90,6 +92,7 @@ public final class RecordAccumulator { CompressionType compression, long lingerMs, long retryBackoffMs, + long metadataFetchTimeout, boolean blockOnBufferFull, Metrics metrics, Time time, @@ -101,6 +104,7 @@ public final class RecordAccumulator { this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; + this.metadataFetchTimeout = metadataFetchTimeout; this.batches = new CopyOnWriteMap>(); this.batchesByTopic = new CopyOnWriteMap, Long>>(); String metricGrpName = "producer-metrics"; @@ -329,16 +333,13 @@ public final class RecordAccumulator { // Metadata only needs to be updated if we have data to send but don't know where we should send it. if (errorTopics != null) { for (String topic : errorTopics) { - Map, Long> deques = dequesForTopic(topic); - if (deques != null) { - for (Deque deque : dequesForTopic(topic).keySet()) { - synchronized (deque) { - if (deque.peekFirst() != null) { - Map> batches = Collections.emptyMap(); - return new DrainResult(true, batches); - } - } - } + Map, Long> dequesLastLeaderAvailableMs = dequesForTopic(topic); + // We check if there is batch waiting for leader information. If the batch has been waiting + // for too long, we expire it. Otherwise we continue to wait for metadata. + if (dequesLastLeaderAvailableMs != null && + checkUnknownLeaderAndMaybeExpireBatches(dequesLastLeaderAvailableMs, now)) { + Map> batches = Collections.emptyMap(); + return new DrainResult(false, batches); } } } @@ -422,6 +423,33 @@ public final class RecordAccumulator { return deque; } + private boolean checkUnknownLeaderAndMaybeExpireBatches(Map, Long> dequesLastLeaderAvailableMs, long now) { + boolean unknownLeaderExist = false; + for (Deque deque : dequesLastLeaderAvailableMs.keySet()) { + synchronized (deque) { + if (deque.peekFirst() != null) { + // We have data to send, check if we should continue to wait for metadata + long lastLeaderAvailableMs = dequesLastLeaderAvailableMs.get(deque); + if (lastLeaderAvailableMs + metadataFetchTimeout > now) + expireBatches(deque); + else + unknownLeaderExist = true; + } + } + } + return unknownLeaderExist; + } + + private void expireBatches(Deque deque) { + synchronized (deque) { + for (RecordBatch batch: deque) { + batch.records.close(); + batch.done(-1L, new TimeoutException("Failed to send messages for " + batch.topicPartition + " due to unknown leader.")); + deallocate(batch); + } + } + } + private ConcurrentMap, Long> dequesForTopic(String topic) { return this.batchesByTopic.get(topic); } -- 1.8.3.4 (Apple Git-47) From dca5f66cb7b1e5c902890771218c98ecebfa1be2 Mon Sep 17 00:00:00 2001 From: jqin Date: Sat, 2 May 2015 11:46:49 -0700 Subject: [PATCH 7/8] Rebase on trunk --- .../clients/producer/internals/RecordAccumulator.java | 12 +++++++++--- .../producer/internals/RecordAccumulatorTest.java | 18 +++++++++--------- .../kafka/clients/producer/internals/SenderTest.java | 2 +- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 3e558d8..9f7a965 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -328,7 +328,7 @@ public final class RecordAccumulator { Map> batches = Collections.emptyMap(); return new DrainResult(false, batches); } - + // Now let's check the error topics to see if there is a partition who does not have leader but has data. // Metadata only needs to be updated if we have data to send but don't know where we should send it. if (errorTopics != null) { @@ -339,7 +339,7 @@ public final class RecordAccumulator { if (dequesLastLeaderAvailableMs != null && checkUnknownLeaderAndMaybeExpireBatches(dequesLastLeaderAvailableMs, now)) { Map> batches = Collections.emptyMap(); - return new DrainResult(false, batches); + return new DrainResult(true, batches); } } } @@ -423,6 +423,12 @@ public final class RecordAccumulator { return deque; } + /** + * Helper function to check if unknown leader exists and also expire the batches if needed. + * @param dequesLastLeaderAvailableMs Last time we know the leader of this deque + * @param now current time + * @return true if unknown leader exists, false otherwise. + */ private boolean checkUnknownLeaderAndMaybeExpireBatches(Map, Long> dequesLastLeaderAvailableMs, long now) { boolean unknownLeaderExist = false; for (Deque deque : dequesLastLeaderAvailableMs.keySet()) { @@ -430,7 +436,7 @@ public final class RecordAccumulator { if (deque.peekFirst() != null) { // We have data to send, check if we should continue to wait for metadata long lastLeaderAvailableMs = dequesLastLeaderAvailableMs.get(deque); - if (lastLeaderAvailableMs + metadataFetchTimeout > now) + if (lastLeaderAvailableMs + metadataFetchTimeout < now) expireBatches(deque); else unknownLeaderExist = true; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index dfd496a..4cb1f4e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -56,7 +56,7 @@ public class RecordAccumulatorTest { @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, 6000, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, null); @@ -80,7 +80,7 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, 6000, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], null); assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, Collections.singleton(node1), time.milliseconds())); } @@ -88,7 +88,7 @@ public class RecordAccumulatorTest { @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, 6000, false, metrics, time, metricTags); long beforeAppend = time.milliseconds(); accum.append(tp1, key, value, null); long afterAppend = time.milliseconds(); @@ -110,7 +110,7 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, 6000, false, metrics, time, metricTags); int appends = 1024 / msgSize + 1; List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { @@ -129,7 +129,7 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, true, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, 6000, true, metrics, time, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -170,7 +170,7 @@ public class RecordAccumulatorTest { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; Set readyNodes = new HashSet(cluster.nodes()); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, 6000, false, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; // Partition on node1 only @@ -205,7 +205,7 @@ public class RecordAccumulatorTest { public void testRetryBackoff() throws Exception { long lingerMs = Long.MAX_VALUE / 4; long retryBackoffMs = Long.MAX_VALUE / 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, 6000, false, metrics, time, metricTags); Set readyNodes = Collections.singleton(node1); long now = time.milliseconds(); accum.append(tp1, key, value, null); @@ -239,7 +239,7 @@ public class RecordAccumulatorTest { @Test public void unknownLeaderExistTest() throws Exception { - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, 6000, false, metrics, time, metricTags); long now = time.milliseconds(); accum.append(tp1, key, value, null); assertEquals("Our batch for tp1 should be ready", @@ -255,7 +255,7 @@ public class RecordAccumulatorTest { public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; Set readyNodes = new HashSet(cluster.nodes()); - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, 6000, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) accum.append(new TopicPartition(topic, i % 3), key, value, null); assertTrue("No partition should be ready.", accum.ready(cluster, readyNodes, time.milliseconds()) > 0); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index e23ab69..ce96c00 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -52,7 +52,7 @@ public class SenderTest { private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags); + private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, 6000, false, metrics, time, metricTags); private Sender sender = new Sender(client, metadata, this.accumulator, -- 1.8.3.4 (Apple Git-47) From f88fcb60c4f9a46a8c8fa625605763c9bd424c62 Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 4 May 2015 18:14:11 -0700 Subject: [PATCH 8/8] Addressed Guozhang's comments. --- .../org/apache/kafka/clients/NetworkClient.java | 2 +- .../producer/internals/RecordAccumulator.java | 12 +++---- .../kafka/clients/producer/internals/Sender.java | 10 ++++-- .../producer/internals/RecordAccumulatorTest.java | 38 +++++++++++----------- 4 files changed, 34 insertions(+), 28 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 301b290..502b808 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -155,7 +155,7 @@ public class NetworkClient implements KafkaClient { @Override public boolean isReady(Node node, long now) { int nodeId = node.id(); - if (this.metadataFetchInProgress || this.metadata.timeToNextUpdate(now) == 0) + if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0) // if we need to update our metadata now declare all requests unready to make metadata requests first // priority return false; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 9f7a965..7cf72df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -213,7 +213,7 @@ public final class RecordAccumulator { * @param now current time * @return Time to wait until next batch is ready for sending */ - public long ready(Cluster cluster, Set readyNodes, long now) { + public long timeToReady(Cluster cluster, Set readyNodes, long now) { long nextDataReadyCheckDelayMs = Long.MAX_VALUE; // Only check the data readiness for the ready nodes. for (Node node: readyNodes) { @@ -221,7 +221,7 @@ public final class RecordAccumulator { TopicPartition tp = new TopicPartition(part.topic(), part.partition()); // If leader of the partition is not available, ready time will be Long.MAX_VALUE. if (cluster.leaderFor(tp) != null) { - long partitionNextReadyCheckDelayMs = partitionReady(tp, now); + long partitionNextReadyCheckDelayMs = timeToPartitionReady(tp, now); nextDataReadyCheckDelayMs = Math.min(nextDataReadyCheckDelayMs, partitionNextReadyCheckDelayMs); } // Save some iterations if we already know there is a ready batch @@ -241,13 +241,13 @@ public final class RecordAccumulator { * @return The time to wait before the partition has at least one batch ready to send. * Return Long.Max_Value if there is no batch for this partition. */ - public long partitionReady(TopicPartition tp, long now) { + public long timeToPartitionReady(TopicPartition tp, long now) { long partitionNextReadyCheckDelayMs = Long.MAX_VALUE; Deque deque = batches.get(tp); if (deque != null) { synchronized (deque) { if (!deque.isEmpty()) - partitionNextReadyCheckDelayMs = batchReady(deque.peekFirst(), now); + partitionNextReadyCheckDelayMs = timeToBatchReady(deque.peekFirst(), now); } } return partitionNextReadyCheckDelayMs; @@ -276,7 +276,7 @@ public final class RecordAccumulator { * @param now current time * @return The time to wait before the batch is ready. Return 0 if the batch is ready. */ - public long batchReady(RecordBatch batch, long now) { + public long timeToBatchReady(RecordBatch batch, long now) { boolean retrying = batch.attempts > 0; // Is it a retrying batch? if (!retrying) { @@ -364,7 +364,7 @@ public final class RecordAccumulator { updateLastLeaderAvailableTime(tp, now); RecordBatch first = deque.peekFirst(); // only send the batch if the batch is ready - if (first != null && batchReady(first, now) == 0) { + if (first != null && timeToBatchReady(first, now) == 0) { if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; in this case we will still eventually send this batch in a single diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index c8e7a91..74fab93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -13,7 +13,13 @@ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; @@ -176,7 +182,7 @@ public class Sender implements Runnable { // When should we start next drain? //If we have any nodes that are ready to send + have sendable data, poll time will be 0 - long nextDrainDelayMs = this.accumulator.ready(cluster, readyNodes, now); + long nextDrainDelayMs = this.accumulator.timeToReady(cluster, readyNodes, now); log.trace("Drain data in = " + nextDrainDelayMs + "ms, check node ready in = " + nextNodeReadyCheckDelay); // Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 4cb1f4e..5e1a982 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -60,10 +60,10 @@ public class RecordAccumulatorTest { int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, null); - assertTrue("No partitions should be ready.", accum.ready(cluster, Collections.singleton(node1), now) > 0); + assertTrue("No partitions should be ready.", accum.timeToReady(cluster, Collections.singleton(node1), now) > 0); } accum.append(tp1, key, value, null); - assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, Collections.singleton(node1), time.milliseconds())); + assertEquals("Our partition's leader should be ready", 0, accum.timeToReady(cluster, Collections.singleton(node1), time.milliseconds())); List batches = accum.drain(cluster, null, Collections.singleton(node1), Integer.MAX_VALUE, 0).batches.get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); @@ -82,7 +82,7 @@ public class RecordAccumulatorTest { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, 6000, false, metrics, time, metricTags); accum.append(tp1, key, new byte[2 * batchSize], null); - assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, Collections.singleton(node1), time.milliseconds())); + assertEquals("Our partition's leader should be ready", 0, accum.timeToReady(cluster, Collections.singleton(node1), time.milliseconds())); } @Test @@ -92,11 +92,11 @@ public class RecordAccumulatorTest { long beforeAppend = time.milliseconds(); accum.append(tp1, key, value, null); long afterAppend = time.milliseconds(); - long readyTime = accum.ready(cluster, Collections.singleton(node1), afterAppend); + long readyTime = accum.timeToReady(cluster, Collections.singleton(node1), afterAppend); assertTrue("No partitions should be ready", readyTime >= lingerMs - (afterAppend - beforeAppend) && readyTime <= lingerMs); time.sleep(10); - assertEquals("Our partition's leader should be ready", 0, accum.ready(cluster, Collections.singleton(node1), time.milliseconds())); + assertEquals("Our partition's leader should be ready", 0, accum.timeToReady(cluster, Collections.singleton(node1), time.milliseconds())); List batches = accum.drain(cluster, null, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds()).batches.get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); @@ -117,7 +117,7 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp, key, value, null); } - assertEquals("Partition's leader should be ready", 0, accum.ready(cluster, Collections.singleton(node1), time.milliseconds())); + assertEquals("Partition's leader should be ready", 0, accum.timeToReady(cluster, Collections.singleton(node1), time.milliseconds())); List batches = accum.drain(cluster, null, Collections.singleton(node1), 1024, 0).batches.get(node1.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); @@ -176,7 +176,7 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) accum.append(tp1, key, value, null); - assertEquals("Next check time should be the linger time", lingerMs, accum.ready(cluster, readyNodes, time.milliseconds())); + assertEquals("Next check time should be the linger time", lingerMs, accum.timeToReady(cluster, readyNodes, time.milliseconds())); time.sleep(lingerMs / 2); @@ -184,19 +184,19 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp3, key, value, null); assertEquals("Next check time should be defined by node1, half remaining linger time", - lingerMs / 2, accum.ready(cluster, readyNodes, time.milliseconds())); + lingerMs / 2, accum.timeToReady(cluster, readyNodes, time.milliseconds())); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) accum.append(tp2, key, value, null); - assertEquals("Node1 should be ready", 0, accum.ready(cluster, readyNodes, time.milliseconds())); + assertEquals("Node1 should be ready", 0, accum.timeToReady(cluster, readyNodes, time.milliseconds())); accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, time.milliseconds()); // After drain the messages from tp2, now the next check delay should be determined by tp1 again. assertTrue("Next check time should be defined by node2, at most linger time", - accum.ready(cluster, readyNodes, time.milliseconds()) <= lingerMs / 2); + accum.timeToReady(cluster, readyNodes, time.milliseconds()) <= lingerMs / 2); assertTrue("If only node1 is ready, the next ready check time should be less than lingerMs/2.", - accum.ready(cluster, Collections.singleton(node1), time.milliseconds()) <= lingerMs / 2); - long readyTime = accum.ready(cluster, Collections.singleton(node2), time.milliseconds()); + accum.timeToReady(cluster, Collections.singleton(node1), time.milliseconds()) <= lingerMs / 2); + long readyTime = accum.timeToReady(cluster, Collections.singleton(node2), time.milliseconds()); assertTrue("If only node2 is ready, the next ready check time should be between lingerMs/2 and lingerMs", lingerMs / 2 <= readyTime && readyTime <= lingerMs); } @@ -209,7 +209,7 @@ public class RecordAccumulatorTest { Set readyNodes = Collections.singleton(node1); long now = time.milliseconds(); accum.append(tp1, key, value, null); - assertEquals("Our batch should be ready", 0, accum.ready(cluster, readyNodes, now + lingerMs + 1)); + assertEquals("Our batch should be ready", 0, accum.timeToReady(cluster, readyNodes, now + lingerMs + 1)); Map> batches = accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, now + lingerMs + 1).batches; assertEquals("Our batch should get drained.", 1, batches.size()); assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); @@ -217,11 +217,11 @@ public class RecordAccumulatorTest { // Reenqueue the batch now = time.milliseconds(); accum.reenqueue(batches.get(0).get(0), now); - assertEquals("Our batch for tp1 should back off.", retryBackoffMs, accum.ready(cluster, readyNodes, now)); + assertEquals("Our batch for tp1 should back off.", retryBackoffMs, accum.timeToReady(cluster, readyNodes, now)); // Put message for tp2 into accumulator accum.append(tp2, key, value, null); - assertEquals("Our batch for tp2 should be ready", 0, accum.ready(cluster, readyNodes, now + lingerMs + 1)); + assertEquals("Our batch for tp2 should be ready", 0, accum.timeToReady(cluster, readyNodes, now + lingerMs + 1)); // tp1 should backoff while tp2 should not batches = accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, now + lingerMs + 1).batches; @@ -230,7 +230,7 @@ public class RecordAccumulatorTest { assertEquals("Node1 should only have one batch for tp2.", tp2, batches.get(0).get(0).topicPartition); // Partition 0 can be drained after retry backoff - assertEquals("Node1 should be ready", 0, accum.ready(cluster, readyNodes, now + retryBackoffMs + 1)); + assertEquals("Node1 should be ready", 0, accum.timeToReady(cluster, readyNodes, now + retryBackoffMs + 1)); batches = accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1).batches; assertEquals("Node1 should be the only node having data.", 1, batches.size()); assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); @@ -243,7 +243,7 @@ public class RecordAccumulatorTest { long now = time.milliseconds(); accum.append(tp1, key, value, null); assertEquals("Our batch for tp1 should be ready", - 0, accum.ready(cluster, Collections.singleton(node1), now + 10L)); + 0, accum.timeToReady(cluster, Collections.singleton(node1), now + 10L)); Cluster clusterWithUnknownLeader = new Cluster(asList(node1, node2), asList(part2, part3)); Set errorTopics = new HashSet(asList(tp1.topic())); RecordAccumulator.DrainResult result = accum.drain(clusterWithUnknownLeader, errorTopics, Collections.singleton(node1), Integer.MAX_VALUE, now + 10L); @@ -258,10 +258,10 @@ public class RecordAccumulatorTest { final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, 6000, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) accum.append(new TopicPartition(topic, i % 3), key, value, null); - assertTrue("No partition should be ready.", accum.ready(cluster, readyNodes, time.milliseconds()) > 0); + assertTrue("No partition should be ready.", accum.timeToReady(cluster, readyNodes, time.milliseconds()) > 0); accum.beginFlush(); - assertEquals("All partitions should be ready", 0, accum.ready(cluster, readyNodes, time.milliseconds())); + assertEquals("All partitions should be ready", 0, accum.timeToReady(cluster, readyNodes, time.milliseconds())); // drain and deallocate all batches Map> results = accum.drain(cluster, null, readyNodes, Integer.MAX_VALUE, time.milliseconds()).batches; -- 1.8.3.4 (Apple Git-47)