From 886c6d664c6f8725624f9cc1e0e70bcb1c5ea0eb Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Tue, 19 May 2015 10:10:26 -0700 Subject: [PATCH] Fix 0 ellapsed time rate bug. --- .../apache/kafka/common/metrics/stats/Rate.java | 23 ++++++++++-------- .../apache/kafka/common/metrics/MetricsTest.java | 27 +++++++++++++++------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 98429da..7c06b0a 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -58,26 +58,29 @@ public class Rate implements MeasurableStat { @Override public double measure(MetricConfig config, long now) { double value = stat.measure(config, now); - double elapsed = convert(now - stat.oldest(now).lastWindowMs); - return value / elapsed; + // the elapsed time is always N-1 complete windows plus whatever fraction of the final + // window is complete + long ellapsedCurrentWindowMs = (now - stat.current(now).lastWindowMs); + long ellapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1); + return value / convert(ellapsedCurrentWindowMs + ellapsedPriorWindowsMs); } - private double convert(long time) { + private double convert(long timeMs) { switch (unit) { case NANOSECONDS: - return time * 1000.0 * 1000.0; + return timeMs * 1000.0 * 1000.0; case MICROSECONDS: - return time * 1000.0; + return timeMs * 1000.0; case MILLISECONDS: - return time; + return timeMs; case SECONDS: - return time / 1000.0; + return timeMs / 1000.0; case MINUTES: - return time / (60.0 * 1000.0); + return timeMs / (60.0 * 1000.0); case HOURS: - return time / (60.0 * 60.0 * 1000.0); + return timeMs / (60.0 * 60.0 * 1000.0); case DAYS: - return time / (24.0 * 60.0 * 60.0 * 1000.0); + return timeMs / (24.0 * 60.0 * 60.0 * 1000.0); default: throw new IllegalStateException("Unknown unit: " + unit); } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 544e120..6d9099a 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -38,8 +38,9 @@ public class MetricsTest { private static final double EPS = 0.000001; - MockTime time = new MockTime(); - Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time); + private MockTime time = new MockTime(); + private MetricConfig config = new MetricConfig(); + private Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time); @Test public void testMetricName() { @@ -77,19 +78,29 @@ public class MetricsTest { s2.add(new MetricName("s2.total", "grp1"), new Total()); s2.record(5.0); - for (int i = 0; i < 10; i++) + int sum = 0; + int count = 10; + for (int i = 0; i < count; i++) { s.record(i); + sum += i; + } + + // prior to any time passing + double ellapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0; + assertEquals("Occurences(0...9) = 5", count / ellapsedSecs, metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); // pretend 2 seconds passed... - time.sleep(2000); + long sleepTime = 2; + time.sleep(sleepTime * 1000); + ellapsedSecs += sleepTime; assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(new MetricName("s2.total", "grp1")).value(), EPS); assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(new MetricName("test.avg", "grp1")).value(), EPS); - assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS); + assertEquals("Max(0...9) = 9", count - 1.0, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS); assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(new MetricName("test.min", "grp1")).value(), EPS); - assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS); - assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); - assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS); + assertEquals("Rate(0...9) = 1.40625", sum / ellapsedSecs, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS); + assertEquals("Occurences(0...9) = 5", count / ellapsedSecs, metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); + assertEquals("Count(0...9) = 10", (double) count, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS); } @Test -- 2.3.2 (Apple Git-55)