From f8a5af362e1ef763610deb0918249dc791073fa1 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 4 Dec 2014 18:13:55 -0800 Subject: [PATCH] KAFKA-1807 Improve accuracy of ProducerPerformance target throughput. --- .../apache/kafka/clients/tools/ProducerPerformance.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index ac86150..28175fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -55,6 +55,7 @@ public class ProducerPerformance { long sleepTime = NS_PER_SEC / throughput; long sleepDeficitNs = 0; Stats stats = new Stats(numRecords, 5000); + long start = System.currentTimeMillis(); for (int i = 0; i < numRecords; i++) { long sendStart = System.currentTimeMillis(); Callback cb = stats.nextCompletion(sendStart, payload.length, stats); @@ -66,12 +67,15 @@ public class ProducerPerformance { * and then make up the whole deficit in one longer sleep. */ if (throughput > 0) { - sleepDeficitNs += sleepTime; - if (sleepDeficitNs >= MIN_SLEEP_NS) { - long sleepMs = sleepDeficitNs / 1000000; - long sleepNs = sleepDeficitNs - sleepMs * 1000000; - Thread.sleep(sleepMs, (int) sleepNs); - sleepDeficitNs = 0; + float elapsed = (sendStart - start)/1000.f; + if (elapsed > 0 && i/elapsed > throughput) { + sleepDeficitNs += sleepTime; + if (sleepDeficitNs >= MIN_SLEEP_NS) { + long sleepMs = sleepDeficitNs / 1000000; + long sleepNs = sleepDeficitNs - sleepMs * 1000000; + Thread.sleep(sleepMs, (int) sleepNs); + sleepDeficitNs = 0; + } } } } -- 2.1.3