diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 33d62a4..190ef30 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -36,7 +36,7 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; - private long lastRefresh; + private long lastRefresh; // The POSIX time in milliseconds at which the last metadata refresh occurs private Cluster cluster; private boolean forceUpdate; private final Set topics; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9f2b2e9..689c023 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -414,7 +414,7 @@ public class Sender implements Runnable { * Handle responses from the server */ private void handleResponses(List receives, long now) { - long ns = time.nanoseconds(); + long ms = time.milliseconds(); for (NetworkReceive receive : receives) { int source = receive.source(); InFlightRequest req = inFlightRequests.nextCompleted(source); @@ -432,7 +432,7 @@ public class Sender implements Runnable { } else { throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey()); } - this.sensors.recordLatency(receive.source(), now - req.created, ns); + this.sensors.recordLatency(receive.source(), now - req.created, ms); } } @@ -810,7 +810,7 @@ public class Sender implements Runnable { }); metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() { public double measure(MetricConfig config, long now) { - return (TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS) - metadata.lastUpdate()) / 1000.0; + return (now - metadata.lastUpdate()) / 1000.0; } }); } @@ -839,7 +839,7 @@ public class Sender implements Runnable { } public void updateProduceRequestMetrics(List requests) { - long ns = time.nanoseconds(); + long ms = time.milliseconds(); for (int i = 0; i < requests.size(); i++) { InFlightRequest request = requests.get(i); int records = 0; @@ -862,12 +862,12 @@ public class Sender implements Runnable { topicByteRate.record(batch.records.sizeInBytes()); // global metrics - this.batchSizeSensor.record(batch.records.sizeInBytes(), ns); - this.queueTimeSensor.record(batch.drained - batch.created, ns); - this.maxRecordSizeSensor.record(batch.maxRecordSize, ns); + this.batchSizeSensor.record(batch.records.sizeInBytes(), ms); + this.queueTimeSensor.record(batch.drained - batch.created, ms); + this.maxRecordSizeSensor.record(batch.maxRecordSize, ms); records += batch.recordCount; } - this.recordsPerRequestSensor.record(records, ns); + this.recordsPerRequestSensor.record(records, ms); } } } @@ -886,12 +886,12 @@ public class Sender implements Runnable { if (topicErrorSensor != null) topicErrorSensor.record(count); } - public void recordLatency(int node, long latency, long nowNs) { - this.requestTimeSensor.record(latency, nowNs); + public void recordLatency(int node, long latency, long nowMs) { + this.requestTimeSensor.record(latency, nowMs); if (node >= 0) { String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); - if (nodeRequestTime != null) nodeRequestTime.record(latency, nowNs); + if (nodeRequestTime != null) nodeRequestTime.record(latency, nowMs); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index b2426ac..64e3177 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -55,7 +55,7 @@ public final class KafkaMetric implements Metric { @Override public double value() { synchronized (this.lock) { - return value(time.nanoseconds()); + return value(time.milliseconds()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java index 0f405c3..49db85e 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java @@ -24,7 +24,7 @@ public interface Measurable { /** * Measure this quantity and return the result as a double * @param config The configuration for this metric - * @param now The time the measurement is being taken + * @param now The POSIX time in milliseconds the measurement is being taken * @return The measured value */ public double measure(MetricConfig config, long now); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java index 4d14fbc..dfa1b0a 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java @@ -26,7 +26,7 @@ public class MetricConfig { private Quota quota; private int samples; private long eventWindow; - private long timeWindowNs; + private long timeWindowMs; private TimeUnit unit; public MetricConfig() { @@ -34,7 +34,7 @@ public class MetricConfig { this.quota = null; this.samples = 2; this.eventWindow = Long.MAX_VALUE; - this.timeWindowNs = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); + this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS); this.unit = TimeUnit.SECONDS; } @@ -56,12 +56,12 @@ public class MetricConfig { return this; } - public long timeWindowNs() { - return timeWindowNs; + public long timeWindowMs() { + return timeWindowMs; } public MetricConfig timeWindow(long window, TimeUnit unit) { - this.timeWindowNs = TimeUnit.NANOSECONDS.convert(window, unit); + this.timeWindowMs = TimeUnit.MILLISECONDS.convert(window, unit); return this; } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index d68349b..2fab8bb 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -78,14 +78,14 @@ public final class Sensor { * bound */ public void record(double value) { - record(value, time.nanoseconds()); + record(value, time.milliseconds()); } /** * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse * the time stamp. * @param value The value we are recording - * @param time The time in nanoseconds + * @param time The current POSIX time in milliseconds * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum * bound */ diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java b/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java index e02389c..5e98326 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java @@ -25,7 +25,7 @@ public interface Stat { * Record the given value * @param config The configuration to use for this metric * @param value The value to record - * @param time The time this value occurred + * @param time The POSIX time in milliseconds this value occurred */ public void record(MetricConfig config, double value, long time); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 7f5cc53..0c573b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -65,19 +65,19 @@ public class Rate implements MeasurableStat { private double convert(long time) { switch (unit) { case NANOSECONDS: - return time; + return time * 1000.0 * 1000.0; case MICROSECONDS: - return time / 1000.0; + return time * 1000.0; case MILLISECONDS: - return time / (1000.0 * 1000.0); + return time; case SECONDS: - return time / (1000.0 * 1000.0 * 1000.0); + return time / (1000.0); case MINUTES: - return time / (60.0 * 1000.0 * 1000.0 * 1000.0); + return time / (60.0 * 1000.0); case HOURS: - return time / (60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0); + return time / (60.0 * 60.0 * 1000.0); case DAYS: - return time / (24.0 * 60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0); + return time / (24.0 * 60.0 * 60.0 * 1000.0); default: throw new IllegalStateException("Unknown unit: " + unit); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java index 776f3a1..685b1e4 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java @@ -95,7 +95,7 @@ public abstract class SampledStat implements MeasurableStat { /* Timeout any windows that have expired in the absence of any events */ protected void purgeObsoleteSamples(MetricConfig config, long now) { - long expireAge = config.samples() * config.timeWindowNs(); + long expireAge = config.samples() * config.timeWindowMs(); for (int i = 0; i < samples.size(); i++) { Sample sample = this.samples.get(i); if (now - sample.lastWindow >= expireAge) @@ -123,7 +123,7 @@ public abstract class SampledStat implements MeasurableStat { } public boolean isComplete(long now, MetricConfig config) { - return now - lastWindow >= config.timeWindowNs() || eventCount >= config.eventWindow(); + return now - lastWindow >= config.timeWindowMs() || eventCount >= config.eventWindow(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 6027cb2..00c8b57 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -210,7 +210,7 @@ public class Selector implements Selectable { long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); - this.sensors.selectTime.record(endSelect - startSelect, endSelect); + this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0) { Set keys = this.selector.selectedKeys(); @@ -268,7 +268,7 @@ public class Selector implements Selectable { } } long endIo = time.nanoseconds(); - this.sensors.ioTime.record(endIo - endSelect, endIo); + this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java index 6582c73..d682bd4 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java @@ -26,6 +26,7 @@ public class SystemTime implements Time { return System.currentTimeMillis(); } + @Override public long nanoseconds() { return System.nanoTime(); } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 9ff73f4..19bea0f 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -117,24 +117,24 @@ public class MetricsTest { public void testEventWindowing() { Count count = new Count(); MetricConfig config = new MetricConfig().eventWindow(1).samples(2); - count.record(config, 1.0, time.nanoseconds()); - count.record(config, 1.0, time.nanoseconds()); - assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS); - count.record(config, 1.0, time.nanoseconds()); // first event times out - assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS); + count.record(config, 1.0, time.milliseconds()); + count.record(config, 1.0, time.milliseconds()); + assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); + count.record(config, 1.0, time.milliseconds()); // first event times out + assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); } @Test public void testTimeWindowing() { Count count = new Count(); MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS).samples(2); - count.record(config, 1.0, time.nanoseconds()); + count.record(config, 1.0, time.milliseconds()); time.sleep(1); - count.record(config, 1.0, time.nanoseconds()); - assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS); + count.record(config, 1.0, time.milliseconds()); + assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); time.sleep(1); - count.record(config, 1.0, time.nanoseconds()); // oldest event times out - assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS); + count.record(config, 1.0, time.milliseconds()); // oldest event times out + assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); } @Test @@ -143,9 +143,9 @@ public class MetricsTest { long windowMs = 100; int samples = 2; MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples); - max.record(config, 50, time.nanoseconds()); + max.record(config, 50, time.milliseconds()); time.sleep(samples * windowMs); - assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS); + assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()), EPS); } @Test(expected = IllegalArgumentException.class) diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java index 6aab854..b24d4de 100644 --- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java +++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java @@ -84,7 +84,7 @@ public class Microbenchmarks { counter++; } } - System.out.println("synchronized: " + ((System.nanoTime() - start) / iters)); + System.out.println("synchronized: " + ((time.nanoseconds() - start) / iters)); System.out.println(counter); done.set(true); } @@ -121,7 +121,7 @@ public class Microbenchmarks { counter++; lock2.unlock(); } - System.out.println("lock: " + ((System.nanoTime() - start) / iters)); + System.out.println("lock: " + ((time.nanoseconds() - start) / iters)); System.out.println(counter); done.set(true); }