From 97c219878c16715269013167ddb4efa34c42df5f Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Wed, 13 May 2015 08:46:24 -0700 Subject: [PATCH] KAFKA-2191; Measured rate should not be infinite --- .../apache/kafka/common/metrics/stats/Rate.java | 2 ++ .../kafka/common/metrics/stats/SampledStat.java | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 98429da..422a2be 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -59,6 +59,8 @@ public class Rate implements MeasurableStat { public double measure(MetricConfig config, long now) { double value = stat.measure(config, now); double elapsed = convert(now - stat.oldest(now).lastWindowMs); + if (elapsed == 0) + return 0; return value / elapsed; } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java index b341b7d..7c9a674 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java @@ -42,8 +42,8 @@ public abstract class SampledStat implements MeasurableStat { @Override public void record(MetricConfig config, double value, long timeMs) { Sample sample = current(timeMs); - if (sample.isComplete(timeMs, config)) - sample = advance(config, timeMs); + while (sample.isComplete(timeMs, config)) + sample = advance(config, sample.lastWindowMs + config.timeWindowMs()); update(sample, config, value, timeMs); sample.eventCount += 1; } @@ -71,7 +71,7 @@ public abstract class SampledStat implements MeasurableStat { return combine(this.samples, config, now); } - public Sample current(long timeMs) { + private Sample current(long timeMs) { if (samples.size() == 0) this.samples.add(newSample(timeMs)); return this.samples.get(this.current); @@ -91,7 +91,7 @@ public abstract class SampledStat implements MeasurableStat { protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs); - public abstract double combine(List samples, MetricConfig config, long now); + protected abstract double combine(List samples, MetricConfig config, long now); /* Timeout any windows that have expired in the absence of any events */ protected void purgeObsoleteSamples(MetricConfig config, long now) { @@ -123,7 +123,7 @@ public abstract class SampledStat implements MeasurableStat { } public boolean isComplete(long timeMs, MetricConfig config) { - return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); + return timeMs - lastWindowMs >= config.timeWindowMs(); } } -- 1.7.9.5