diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index e8c194c..34f474e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -101,7 +101,9 @@ public class Sender implements Runnable { } } - // send anything left in the accumulator + // okay we stopped accepting requests but there may still be + // requests in the accumulator or waiting for acknowledgment, + // wait until these are completed. int unsent = 0; do { try { @@ -109,7 +111,7 @@ public class Sender implements Runnable { } catch (Exception e) { e.printStackTrace(); } - } while (unsent > 0); + } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0); // close all the connections this.selector.close(); @@ -518,6 +520,13 @@ public class Sender implements Runnable { return requests.remove(node); } } + + public int totalInFlightRequests() { + int total = 0; + for (Deque deque : this.requests.values()) + total += deque.size(); + return total; + } } } diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index d7d03ea..df572f2 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout