diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 616e100..7a03f38 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -205,6 +205,20 @@ public final class RecordAccumulator { } /** + * @return Whether there is any unsent record in the accumulator. + */ + public boolean hasUnsent() { + for (Map.Entry> entry : this.batches.entrySet()) { + Deque deque = entry.getValue(); + synchronized (deque) { + if (deque.size() > 0) + return true; + } + } + return false; + } + + /** * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts * to avoid choosing the same topic-partitions over and over. * diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 2acb96d..7b5d144 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -159,14 +159,13 @@ public class Sender implements Runnable { // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. - int unsent = 0; do { try { - unsent = run(time.milliseconds()); + run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } - } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0); + } while (this.accumulator.hasUnsent() || this.inFlightRequests.totalInFlightRequests() > 0); // close all the connections this.selector.close(); @@ -178,9 +177,8 @@ public class Sender implements Runnable { * Run a single iteration of sending * * @param now The current time - * @return The total number of topic/partitions that had data ready (regardless of what we actually sent) */ - public int run(long now) { + public void run(long now) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send List ready = this.accumulator.ready(now); @@ -220,8 +218,6 @@ public class Sender implements Runnable { handleResponses(this.selector.completedReceives(), time.milliseconds()); handleDisconnects(this.selector.disconnected(), time.milliseconds()); handleConnects(this.selector.connected()); - - return ready.size(); } /** diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties index c628412..6becbab 100644 --- a/system_test/replication_testsuite/config/server.properties +++ b/system_test/replication_testsuite/config/server.properties @@ -136,5 +136,5 @@ replica.socket.timeout.ms=30000 replica.socket.receive.buffer.bytes=65536 replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 -replica.fetch.min.bytes=4096 +replica.fetch.min.bytes=1 num.replica.fetchers=1