diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 33d62a4..f114ffd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -36,7 +36,7 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; - private long lastRefresh; + private long lastRefreshMs; private Cluster cluster; private boolean forceUpdate; private final Set topics; @@ -57,7 +57,7 @@ public final class Metadata { public Metadata(long refreshBackoffMs, long metadataExpireMs) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; - this.lastRefresh = 0L; + this.lastRefreshMs = 0L; this.cluster = Cluster.empty(); this.forceUpdate = false; this.topics = new HashSet(); @@ -105,8 +105,8 @@ public final class Metadata { * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more * than metadataExpireMs has passed since the last refresh) */ - public synchronized boolean needsUpdate(long now) { - long msSinceLastUpdate = now - this.lastRefresh; + public synchronized boolean needsUpdate(long nowMs) { + long msSinceLastUpdate = nowMs - this.lastRefreshMs; boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs; boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs; return updateAllowed && updateNeeded; @@ -129,9 +129,9 @@ public final class Metadata { /** * Update the cluster metadata */ - public synchronized void update(Cluster cluster, long now) { + public synchronized void update(Cluster cluster, long nowMs) { this.forceUpdate = false; - this.lastRefresh = now; + this.lastRefreshMs = nowMs; this.cluster = cluster; notifyAll(); log.debug("Updated cluster metadata to {}", cluster); @@ -141,7 +141,7 @@ public final class Metadata { * The last time metadata was updated. */ public synchronized long lastUpdate() { - return this.lastRefresh; + return this.lastRefreshMs; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ffd13ff..2d7e52d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -93,27 +93,27 @@ public final class RecordAccumulator { metrics.addMetric("waiting-threads", "The number of user threads blocked waiting for buffer memory to enqueue their records", new Measurable() { - public double measure(MetricConfig config, long now) { + public double measure(MetricConfig config, long nowMs) { return free.queued(); } }); metrics.addMetric("buffer-total-bytes", "The maximum amount of buffer memory the client can use (whether or not it is currently used).", new Measurable() { - public double measure(MetricConfig config, long now) { + public double measure(MetricConfig config, long nowMs) { return free.totalMemory(); } }); metrics.addMetric("buffer-available-bytes", "The total amount of buffer memory that is not being used (either unallocated or in the free list).", new Measurable() { - public double measure(MetricConfig config, long now) { + public double measure(MetricConfig config, long nowMs) { return free.availableMemory(); } }); metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() { - public double measure(MetricConfig config, long now) { - return ready(now).size(); + public double measure(MetricConfig config, long nowMs) { + return ready(nowMs).size(); } }); } @@ -170,9 +170,9 @@ public final class RecordAccumulator { /** * Re-enqueue the given record batch in the accumulator to retry */ - public void reenqueue(RecordBatch batch, long now) { + public void reenqueue(RecordBatch batch, long nowMs) { batch.attempts++; - batch.lastAttempt = now; + batch.lastAttemptMs = nowMs; Deque deque = dequeFor(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); @@ -191,7 +191,7 @@ public final class RecordAccumulator { *
  • The accumulator has been closed * */ - public List ready(long now) { + public List ready(long nowMs) { List ready = new ArrayList(); boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { @@ -199,9 +199,9 @@ public final class RecordAccumulator { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { - boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now; + boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = now - batch.created >= lingerMs; + boolean expired = nowMs - batch.createdMs >= lingerMs; boolean sendable = full || expired || exhausted || closed; if (sendable && !backingOff) ready.add(batch.topicPartition); @@ -231,11 +231,11 @@ public final class RecordAccumulator { * * @param partitions The list of partitions to drain * @param maxSize The maximum number of bytes to drain - * @param now The current unix time + * @param nowMs The current unix time in milliseconds * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize. * TODO: There may be a starvation issue due to iteration order */ - public List drain(List partitions, int maxSize, long now) { + public List drain(List partitions, int maxSize, long nowMs) { if (partitions.isEmpty()) return Collections.emptyList(); int size = 0; @@ -258,7 +258,7 @@ public final class RecordAccumulator { batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); - batch.drained = now; + batch.drainedMs = nowMs; } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 94157f7..5ee5455 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -34,17 +34,17 @@ public final class RecordBatch { public int recordCount = 0; public int maxRecordSize = 0; public volatile int attempts = 0; - public final long created; - public long drained; - public long lastAttempt; + public final long createdMs; + public long drainedMs; + public long lastAttemptMs; public final MemoryRecords records; public final TopicPartition topicPartition; private final ProduceRequestResult produceFuture; private final List thunks; - public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { - this.created = now; - this.lastAttempt = now; + public RecordBatch(TopicPartition tp, MemoryRecords records, long nowMs) { + this.createdMs = nowMs; + this.lastAttemptMs = nowMs; this.records = records; this.topicPartition = tp; this.produceFuture = new ProduceRequestResult(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9f2b2e9..f0152fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -188,23 +187,23 @@ public class Sender implements Runnable { /** * Run a single iteration of sending * - * @param now The current time + * @param nowMs The current POSIX time in milliseconds */ - public void run(long now) { + public void run(long nowMs) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - List ready = this.accumulator.ready(now); + List ready = this.accumulator.ready(nowMs); // should we update our metadata? List sends = new ArrayList(); - maybeUpdateMetadata(cluster, sends, now); + maybeUpdateMetadata(cluster, sends, nowMs); // prune the list of ready topics to eliminate any that we aren't ready to send yet - List sendable = processReadyPartitions(cluster, ready, now); + List sendable = processReadyPartitions(cluster, ready, nowMs); // create produce requests - List batches = this.accumulator.drain(sendable, this.maxRequestSize, now); - List requests = collate(cluster, batches, now); + List batches = this.accumulator.drain(sendable, this.maxRequestSize, nowMs); + List requests = collate(cluster, batches, nowMs); sensors.updateProduceRequestMetrics(requests); if (ready.size() > 0) { @@ -228,16 +227,16 @@ public class Sender implements Runnable { // handle responses, connections, and disconnections handleSends(this.selector.completedSends()); - handleResponses(this.selector.completedReceives(), now); - handleDisconnects(this.selector.disconnected(), now); + handleResponses(this.selector.completedReceives(), nowMs); + handleDisconnects(this.selector.disconnected(), nowMs); handleConnects(this.selector.connected()); } /** * Add a metadata request to the list of sends if we need to make one */ - private void maybeUpdateMetadata(Cluster cluster, List sends, long now) { - if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) + private void maybeUpdateMetadata(Cluster cluster, List sends, long nowMs) { + if (this.metadataFetchInProgress || !metadata.needsUpdate(nowMs)) return; Node node = selectMetadataDestination(cluster); @@ -247,13 +246,13 @@ public class Sender implements Runnable { if (nodeStates.isConnected(node.id())) { Set topics = metadata.topics(); this.metadataFetchInProgress = true; - InFlightRequest metadataRequest = metadataRequest(now, node.id(), topics); + InFlightRequest metadataRequest = metadataRequest(nowMs, node.id(), topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); sends.add(metadataRequest.request); this.inFlightRequests.add(metadataRequest); - } else if (nodeStates.canConnect(node.id(), now)) { + } else if (nodeStates.canConnect(node.id(), nowMs)) { // we don't have a connection to this node right now, make one - initiateConnect(node, now); + initiateConnect(node, nowMs); } } @@ -315,7 +314,7 @@ public class Sender implements Runnable { * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate * metadata to be able to do so */ - private List processReadyPartitions(Cluster cluster, List ready, long now) { + private List processReadyPartitions(Cluster cluster, List ready, long nowMs) { List sendable = new ArrayList(ready.size()); for (TopicPartition tp : ready) { Node node = cluster.leaderFor(tp); @@ -324,9 +323,9 @@ public class Sender implements Runnable { metadata.forceUpdate(); } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { sendable.add(tp); - } else if (nodeStates.canConnect(node.id(), now)) { + } else if (nodeStates.canConnect(node.id(), nowMs)) { // we don't have a connection to this node right now, make one - initiateConnect(node, now); + initiateConnect(node, nowMs); } } return sendable; @@ -335,11 +334,11 @@ public class Sender implements Runnable { /** * Initiate a connection to the given node */ - private void initiateConnect(Node node, long now) { + private void initiateConnect(Node node, long nowMs) { try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); - this.nodeStates.connecting(node.id(), now); + this.nodeStates.connecting(node.id(), nowMs); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ nodeStates.disconnected(node.id()); @@ -352,7 +351,7 @@ public class Sender implements Runnable { /** * Handle any closed connections */ - private void handleDisconnects(List disconnects, long now) { + private void handleDisconnects(List disconnects, long nowMs) { // clear out the in-flight requests for the disconnected broker for (int node : disconnects) { nodeStates.disconnected(node); @@ -364,7 +363,7 @@ public class Sender implements Runnable { case PRODUCE: int correlation = request.request.header().correlationId(); for (RecordBatch batch : request.batches.values()) - completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now); + completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, nowMs); break; case METADATA: metadataFetchInProgress = false; @@ -413,8 +412,7 @@ public class Sender implements Runnable { /** * Handle responses from the server */ - private void handleResponses(List receives, long now) { - long ns = time.nanoseconds(); + private void handleResponses(List receives, long nowMs) { for (NetworkReceive receive : receives) { int source = receive.source(); InFlightRequest req = inFlightRequests.nextCompleted(source); @@ -424,27 +422,27 @@ public class Sender implements Runnable { correlate(req.request.header(), header); if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) { log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId()); - handleProduceResponse(req, req.request.header(), body, now); + handleProduceResponse(req, req.request.header(), body, nowMs); } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) { log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header() .correlationId()); - handleMetadataResponse(req.request.header(), body, now); + handleMetadataResponse(req.request.header(), body, nowMs); } else { throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey()); } - this.sensors.recordLatency(receive.source(), now - req.created, ns); + this.sensors.recordLatency(receive.source(), nowMs - req.createdMs); } } - private void handleMetadataResponse(RequestHeader header, Struct body, long now) { + private void handleMetadataResponse(RequestHeader header, Struct body, long nowMs) { this.metadataFetchInProgress = false; MetadataResponse response = new MetadataResponse(body); Cluster cluster = response.cluster(); // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) - this.metadata.update(cluster, now); + this.metadata.update(cluster, nowMs); else log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); } @@ -452,7 +450,7 @@ public class Sender implements Runnable { /** * Handle a produce response */ - private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long now) { + private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long nowMs) { ProduceResponse pr = new ProduceResponse(body); for (Map responses : pr.responses().values()) { for (Map.Entry entry : responses.entrySet()) { @@ -462,7 +460,7 @@ public class Sender implements Runnable { if (error.exception() instanceof InvalidMetadataException) metadata.forceUpdate(); RecordBatch batch = request.batches.get(tp); - completeBatch(batch, error, response.baseOffset, header.correlationId(), now); + completeBatch(batch, error, response.baseOffset, header.correlationId(), nowMs); } } } @@ -473,9 +471,9 @@ public class Sender implements Runnable { * @param error The error (or null if none) * @param baseOffset The base offset assigned to the records if successful * @param correlationId The correlation id for the request - * @param now The current time stamp + * @param nowMs The current POSIX time stamp in milliseconds */ - private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) { + private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long nowMs) { if (error != Errors.NONE && canRetry(batch, error)) { // retry log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", @@ -483,7 +481,7 @@ public class Sender implements Runnable { batch.topicPartition, this.retries - batch.attempts - 1, error); - this.accumulator.reenqueue(batch, now); + this.accumulator.reenqueue(batch, nowMs); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } else { // tell the user the result of their request @@ -515,16 +513,16 @@ public class Sender implements Runnable { /** * Create a metadata request for the given topics */ - private InFlightRequest metadataRequest(long now, int node, Set topics) { + private InFlightRequest metadataRequest(long nowMs, int node, Set topics) { MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct()); - return new InFlightRequest(now, true, send, null); + return new InFlightRequest(nowMs, true, send, null); } /** * Collate the record batches into a list of produce requests on a per-node basis */ - private List collate(Cluster cluster, List batches, long now) { + private List collate(Cluster cluster, List batches, long nowMs) { Map> collated = new HashMap>(); for (RecordBatch batch : batches) { Node node = cluster.leaderFor(batch.topicPartition); @@ -537,14 +535,14 @@ public class Sender implements Runnable { } List requests = new ArrayList(collated.size()); for (Map.Entry> entry : collated.entrySet()) - requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); + requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; } /** * Create a produce request from the given record batches */ - private InFlightRequest produceRequest(long now, int destination, short acks, int timeout, List batches) { + private InFlightRequest produceRequest(long nowMs, int destination, short acks, int timeout, List batches) { Map batchesByPartition = new HashMap(); Map> batchesByTopic = new HashMap>(); for (RecordBatch batch : batches) { @@ -579,7 +577,7 @@ public class Sender implements Runnable { produce.set("topic_data", topicDatas.toArray()); RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce); - return new InFlightRequest(now, acks != 0, send, batchesByPartition); + return new InFlightRequest(nowMs, acks != 0, send, batchesByPartition); } private RequestHeader header(ApiKeys key) { @@ -605,15 +603,15 @@ public class Sender implements Runnable { */ private static final class NodeState { private ConnectionState state; - private long lastConnectAttempt; + private long lastConnectAttemptMs; public NodeState(ConnectionState state, long lastConnectAttempt) { this.state = state; - this.lastConnectAttempt = lastConnectAttempt; + this.lastConnectAttemptMs = lastConnectAttempt; } public String toString() { - return "NodeState(" + state + ", " + lastConnectAttempt + ")"; + return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; } } @@ -626,16 +624,16 @@ public class Sender implements Runnable { this.nodeState = new HashMap(); } - public boolean canConnect(int node, long now) { + public boolean canConnect(int node, long nowMs) { NodeState state = nodeState.get(node); if (state == null) return true; else - return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs; + return state.state == ConnectionState.DISCONNECTED && nowMs - state.lastConnectAttemptMs > this.reconnectBackoffMs; } - public void connecting(int node, long now) { - nodeState.put(node, new NodeState(ConnectionState.CONNECTING, now)); + public void connecting(int node, long nowMs) { + nodeState.put(node, new NodeState(ConnectionState.CONNECTING, nowMs)); } public boolean isConnected(int node) { @@ -668,19 +666,19 @@ public class Sender implements Runnable { * An request that hasn't been fully processed yet */ private static final class InFlightRequest { - public long created; + public long createdMs; public boolean expectResponse; public Map batches; public RequestSend request; /** - * @param created The unix timestamp for the time at which this request was created. + * @param createdMs The unix timestamp in milliseonds for the time at which this request was created. * @param expectResponse Should we expect a response message or is this request complete once it is sent? * @param request The request * @param batches The record batches contained in the request if it is a produce request */ - public InFlightRequest(long created, boolean expectResponse, RequestSend request, Map batches) { - this.created = created; + public InFlightRequest(long createdMs, boolean expectResponse, RequestSend request, Map batches) { + this.createdMs = createdMs; this.batches = batches; this.request = request; this.expectResponse = expectResponse; @@ -804,13 +802,13 @@ public class Sender implements Runnable { this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max()); this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() { - public double measure(MetricConfig config, long now) { + public double measure(MetricConfig config, long nowMs) { return inFlightRequests.totalInFlightRequests(); } }); metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() { - public double measure(MetricConfig config, long now) { - return (TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS) - metadata.lastUpdate()) / 1000.0; + public double measure(MetricConfig config, long nowMs) { + return (nowMs - metadata.lastUpdate()) / 1000.0; } }); } @@ -839,7 +837,7 @@ public class Sender implements Runnable { } public void updateProduceRequestMetrics(List requests) { - long ns = time.nanoseconds(); + long nowMs = time.milliseconds(); for (int i = 0; i < requests.size(); i++) { InFlightRequest request = requests.get(i); int records = 0; @@ -862,36 +860,39 @@ public class Sender implements Runnable { topicByteRate.record(batch.records.sizeInBytes()); // global metrics - this.batchSizeSensor.record(batch.records.sizeInBytes(), ns); - this.queueTimeSensor.record(batch.drained - batch.created, ns); - this.maxRecordSizeSensor.record(batch.maxRecordSize, ns); + this.batchSizeSensor.record(batch.records.sizeInBytes(), nowMs); + this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, nowMs); + this.maxRecordSizeSensor.record(batch.maxRecordSize, nowMs); records += batch.recordCount; } - this.recordsPerRequestSensor.record(records, ns); + this.recordsPerRequestSensor.record(records, nowMs); } } } public void recordRetries(String topic, int count) { - this.retrySensor.record(count); + long nowMs = time.milliseconds(); + this.retrySensor.record(count, nowMs); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); - if (topicRetrySensor != null) topicRetrySensor.record(count); + if (topicRetrySensor != null) topicRetrySensor.record(count, nowMs); } public void recordErrors(String topic, int count) { - this.errorSensor.record(count); + long nowMs = time.milliseconds(); + this.errorSensor.record(count, nowMs); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); - if (topicErrorSensor != null) topicErrorSensor.record(count); + if (topicErrorSensor != null) topicErrorSensor.record(count, nowMs); } - public void recordLatency(int node, long latency, long nowNs) { - this.requestTimeSensor.record(latency, nowNs); + public void recordLatency(int node, long latency) { + long nowMs = time.milliseconds(); + this.requestTimeSensor.record(latency, nowMs); if (node >= 0) { String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); - if (nodeRequestTime != null) nodeRequestTime.record(latency, nowNs); + if (nodeRequestTime != null) nodeRequestTime.record(latency, nowMs); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index b2426ac..a7458b5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -55,12 +55,12 @@ public final class KafkaMetric implements Metric { @Override public double value() { synchronized (this.lock) { - return value(time.nanoseconds()); + return value(time.milliseconds()); } } - double value(long time) { - return this.measurable.measure(config, time); + double value(long timeMs) { + return this.measurable.measure(config, timeMs); } public void config(MetricConfig config) { diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java index 0f405c3..7c2e33c 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java @@ -24,9 +24,9 @@ public interface Measurable { /** * Measure this quantity and return the result as a double * @param config The configuration for this metric - * @param now The time the measurement is being taken + * @param nowMs The POSIX time in milliseconds the measurement is being taken * @return The measured value */ - public double measure(MetricConfig config, long now); + public double measure(MetricConfig config, long nowMs); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java index 4d14fbc..dfa1b0a 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java @@ -26,7 +26,7 @@ public class MetricConfig { private Quota quota; private int samples; private long eventWindow; - private long timeWindowNs; + private long timeWindowMs; private TimeUnit unit; public MetricConfig() { @@ -34,7 +34,7 @@ public class MetricConfig { this.quota = null; this.samples = 2; this.eventWindow = Long.MAX_VALUE; - this.timeWindowNs = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); + this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS); this.unit = TimeUnit.SECONDS; } @@ -56,12 +56,12 @@ public class MetricConfig { return this; } - public long timeWindowNs() { - return timeWindowNs; + public long timeWindowMs() { + return timeWindowMs; } public MetricConfig timeWindow(long window, TimeUnit unit) { - this.timeWindowNs = TimeUnit.NANOSECONDS.convert(window, unit); + this.timeWindowMs = TimeUnit.MILLISECONDS.convert(window, unit); return this; } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index d68349b..25c1faf 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -78,40 +78,40 @@ public final class Sensor { * bound */ public void record(double value) { - record(value, time.nanoseconds()); + record(value, time.milliseconds()); } /** * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse * the time stamp. * @param value The value we are recording - * @param time The time in nanoseconds + * @param timeMs The current POSIX time in milliseconds * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum * bound */ - public void record(double value, long time) { + public void record(double value, long timeMs) { synchronized (this) { // increment all the stats for (int i = 0; i < this.stats.size(); i++) - this.stats.get(i).record(config, value, time); - checkQuotas(time); + this.stats.get(i).record(config, value, timeMs); + checkQuotas(timeMs); } for (int i = 0; i < parents.length; i++) - parents[i].record(value, time); + parents[i].record(value, timeMs); } /** * Check if we have violated our quota for any metric that has a configured quota - * @param time + * @param timeMs */ - private void checkQuotas(long time) { + private void checkQuotas(long timeMs) { for (int i = 0; i < this.metrics.size(); i++) { KafkaMetric metric = this.metrics.get(i); MetricConfig config = metric.config(); if (config != null) { Quota quota = config.quota(); if (quota != null) { - if (!quota.acceptable(metric.value(time))) + if (!quota.acceptable(metric.value(timeMs))) throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java b/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java index e02389c..0eb7ab2 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Stat.java @@ -25,8 +25,8 @@ public interface Stat { * Record the given value * @param config The configuration to use for this metric * @param value The value to record - * @param time The time this value occurred + * @param timeMs The POSIX time in milliseconds this value occurred */ - public void record(MetricConfig config, double value, long time); + public void record(MetricConfig config, double value, long timeMs); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java index 51725b2..c9963cb 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java @@ -31,12 +31,12 @@ public class Avg extends SampledStat { } @Override - protected void update(Sample sample, MetricConfig config, double value, long now) { + protected void update(Sample sample, MetricConfig config, double value, long timeMs) { sample.value += value; } @Override - public double combine(List samples, MetricConfig config, long now) { + public double combine(List samples, MetricConfig config, long nowMs) { double total = 0.0; long count = 0; for (int i = 0; i < samples.size(); i++) { diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java index 3cdd1d0..efcd61b 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java @@ -31,12 +31,12 @@ public class Count extends SampledStat { } @Override - protected void update(Sample sample, MetricConfig config, double value, long now) { + protected void update(Sample sample, MetricConfig config, double value, long timeMs) { sample.value += 1.0; } @Override - public double combine(List samples, MetricConfig config, long now) { + public double combine(List samples, MetricConfig config, long nowMs) { double total = 0.0; for (int i = 0; i < samples.size(); i++) total += samples.get(i).value; diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java index bba5972..c492c38 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java @@ -31,12 +31,12 @@ public final class Max extends SampledStat { } @Override - protected void update(Sample sample, MetricConfig config, double value, long now) { + protected void update(Sample sample, MetricConfig config, double value, long timeMs) { sample.value = Math.max(sample.value, value); } @Override - public double combine(List samples, MetricConfig config, long now) { + public double combine(List samples, MetricConfig config, long nowMs) { double max = Double.NEGATIVE_INFINITY; for (int i = 0; i < samples.size(); i++) max = Math.max(max, samples.get(i).value); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java index d370049..bd0919c 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java @@ -31,12 +31,12 @@ public class Min extends SampledStat { } @Override - protected void update(Sample sample, MetricConfig config, double value, long now) { + protected void update(Sample sample, MetricConfig config, double value, long timeMs) { sample.value = Math.min(sample.value, value); } @Override - public double combine(List samples, MetricConfig config, long now) { + public double combine(List samples, MetricConfig config, long nowMs) { double max = Double.MAX_VALUE; for (int i = 0; i < samples.size(); i++) max = Math.min(max, samples.get(i).value); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java index b47ed88..8300978 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java @@ -65,16 +65,16 @@ public class Percentiles extends SampledStat implements CompoundStat { for (Percentile percentile : this.percentiles) { final double pct = percentile.percentile(); ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() { - public double measure(MetricConfig config, long now) { - return value(config, now, pct / 100.0); + public double measure(MetricConfig config, long nowMs) { + return value(config, nowMs, pct / 100.0); } })); } return ms; } - public double value(MetricConfig config, long now, double quantile) { - purgeObsoleteSamples(config, now); + public double value(MetricConfig config, long nowMs, double quantile) { + purgeObsoleteSamples(config, nowMs); float count = 0.0f; for (Sample sample : this.samples) count += sample.eventCount; @@ -94,17 +94,17 @@ public class Percentiles extends SampledStat implements CompoundStat { return Double.POSITIVE_INFINITY; } - public double combine(List samples, MetricConfig config, long now) { - return value(config, now, 0.5); + public double combine(List samples, MetricConfig config, long nowMs) { + return value(config, nowMs, 0.5); } @Override - protected HistogramSample newSample(long now) { - return new HistogramSample(this.binScheme, now); + protected HistogramSample newSample(long timeMs) { + return new HistogramSample(this.binScheme, timeMs); } @Override - protected void update(Sample sample, MetricConfig config, double value, long now) { + protected void update(Sample sample, MetricConfig config, double value, long timeMs) { HistogramSample hist = (HistogramSample) sample; hist.histogram.record(value); } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 7f5cc53..4b481a5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -51,33 +51,33 @@ public class Rate implements MeasurableStat { } @Override - public void record(MetricConfig config, double value, long time) { - this.stat.record(config, value, time); + public void record(MetricConfig config, double value, long timeMs) { + this.stat.record(config, value, timeMs); } @Override - public double measure(MetricConfig config, long now) { - double value = stat.measure(config, now); - double elapsed = convert(now - stat.oldest(now).lastWindow); + public double measure(MetricConfig config, long nowMs) { + double value = stat.measure(config, nowMs); + double elapsed = convert(nowMs - stat.oldest(nowMs).lastWindowMs); return value / elapsed; } private double convert(long time) { switch (unit) { case NANOSECONDS: - return time; + return time * 1000.0 * 1000.0; case MICROSECONDS: - return time / 1000.0; + return time * 1000.0; case MILLISECONDS: - return time / (1000.0 * 1000.0); + return time; case SECONDS: - return time / (1000.0 * 1000.0 * 1000.0); + return time / (1000.0); case MINUTES: - return time / (60.0 * 1000.0 * 1000.0 * 1000.0); + return time / (60.0 * 1000.0); case HOURS: - return time / (60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0); + return time / (60.0 * 60.0 * 1000.0); case DAYS: - return time / (24.0 * 60.0 * 60.0 * 1000.0 * 1000.0 * 1000.0); + return time / (24.0 * 60.0 * 60.0 * 1000.0); default: throw new IllegalStateException("Unknown unit: " + unit); } @@ -90,12 +90,12 @@ public class Rate implements MeasurableStat { } @Override - protected void update(Sample sample, MetricConfig config, double value, long now) { + protected void update(Sample sample, MetricConfig config, double value, long timeMs) { sample.value += value; } @Override - public double combine(List samples, MetricConfig config, long now) { + public double combine(List samples, MetricConfig config, long nowMs) { double total = 0.0; for (int i = 0; i < samples.size(); i++) total += samples.get(i).value; diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java index 776f3a1..0d4056f 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java @@ -40,90 +40,90 @@ public abstract class SampledStat implements MeasurableStat { } @Override - public void record(MetricConfig config, double value, long now) { - Sample sample = current(now); - if (sample.isComplete(now, config)) - sample = advance(config, now); - update(sample, config, value, now); + public void record(MetricConfig config, double value, long timeMs) { + Sample sample = current(timeMs); + if (sample.isComplete(timeMs, config)) + sample = advance(config, timeMs); + update(sample, config, value, timeMs); sample.eventCount += 1; } - private Sample advance(MetricConfig config, long now) { + private Sample advance(MetricConfig config, long timeMs) { this.current = (this.current + 1) % config.samples(); if (this.current >= samples.size()) { - Sample sample = newSample(now); + Sample sample = newSample(timeMs); this.samples.add(sample); return sample; } else { - Sample sample = current(now); - sample.reset(now); + Sample sample = current(timeMs); + sample.reset(timeMs); return sample; } } - protected Sample newSample(long now) { - return new Sample(this.initialValue, now); + protected Sample newSample(long timeMs) { + return new Sample(this.initialValue, timeMs); } @Override - public double measure(MetricConfig config, long now) { - purgeObsoleteSamples(config, now); - return combine(this.samples, config, now); + public double measure(MetricConfig config, long nowMs) { + purgeObsoleteSamples(config, nowMs); + return combine(this.samples, config, nowMs); } - public Sample current(long now) { + public Sample current(long timeMs) { if (samples.size() == 0) - this.samples.add(newSample(now)); + this.samples.add(newSample(timeMs)); return this.samples.get(this.current); } - public Sample oldest(long now) { + public Sample oldest(long nowMs) { if (samples.size() == 0) - this.samples.add(newSample(now)); + this.samples.add(newSample(nowMs)); Sample oldest = this.samples.get(0); for (int i = 1; i < this.samples.size(); i++) { Sample curr = this.samples.get(i); - if (curr.lastWindow < oldest.lastWindow) + if (curr.lastWindowMs < oldest.lastWindowMs) oldest = curr; } return oldest; } - protected abstract void update(Sample sample, MetricConfig config, double value, long now); + protected abstract void update(Sample sample, MetricConfig config, double value, long timeMs); - public abstract double combine(List samples, MetricConfig config, long now); + public abstract double combine(List samples, MetricConfig config, long nowMs); /* Timeout any windows that have expired in the absence of any events */ - protected void purgeObsoleteSamples(MetricConfig config, long now) { - long expireAge = config.samples() * config.timeWindowNs(); + protected void purgeObsoleteSamples(MetricConfig config, long nowMs) { + long expireAge = config.samples() * config.timeWindowMs(); for (int i = 0; i < samples.size(); i++) { Sample sample = this.samples.get(i); - if (now - sample.lastWindow >= expireAge) - sample.reset(now); + if (nowMs - sample.lastWindowMs >= expireAge) + sample.reset(nowMs); } } protected static class Sample { public double initialValue; public long eventCount; - public long lastWindow; + public long lastWindowMs; public double value; public Sample(double initialValue, long now) { this.initialValue = initialValue; this.eventCount = 0; - this.lastWindow = now; + this.lastWindowMs = now; this.value = initialValue; } public void reset(long now) { this.eventCount = 0; - this.lastWindow = now; + this.lastWindowMs = now; this.value = initialValue; } - public boolean isComplete(long now, MetricConfig config) { - return now - lastWindow >= config.timeWindowNs() || eventCount >= config.eventWindow(); + public boolean isComplete(long timeMs, MetricConfig config) { + return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java index a9940ed..53dd3d5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java @@ -35,12 +35,12 @@ public class Total implements MeasurableStat { } @Override - public void record(MetricConfig config, double value, long time) { + public void record(MetricConfig config, double value, long timeMs) { this.total += value; } @Override - public double measure(MetricConfig config, long now) { + public double measure(MetricConfig config, long nowMs) { return this.total; } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 6027cb2..3e35898 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -210,7 +210,7 @@ public class Selector implements Selectable { long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); - this.sensors.selectTime.record(endSelect - startSelect, endSelect); + this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0) { Set keys = this.selector.selectedKeys(); @@ -268,7 +268,7 @@ public class Selector implements Selectable { } } long endIo = time.nanoseconds(); - this.sensors.ioTime.record(endIo - endSelect, endIo); + this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); } @Override @@ -441,7 +441,7 @@ public class Selector implements Selectable { this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS)); this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() { - public double measure(MetricConfig config, long now) { + public double measure(MetricConfig config, long nowMs) { return keys.size(); } }); @@ -476,20 +476,22 @@ public class Selector implements Selectable { } public void recordBytesSent(int node, int bytes) { - this.bytesSent.record(bytes); + long nowMs = time.milliseconds(); + this.bytesSent.record(bytes, nowMs); if (node >= 0) { String nodeRequestName = "node-" + node + ".bytes-sent"; Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); - if (nodeRequest != null) nodeRequest.record(bytes); + if (nodeRequest != null) nodeRequest.record(bytes, nowMs); } } public void recordBytesReceived(int node, int bytes) { - this.bytesReceived.record(bytes); + long nowMs = time.milliseconds(); + this.bytesReceived.record(bytes, nowMs); if (node >= 0) { String nodeRequestName = "node-" + node + ".bytes-received"; Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); - if (nodeRequest != null) nodeRequest.record(bytes); + if (nodeRequest != null) nodeRequest.record(bytes, nowMs); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java index 6582c73..d682bd4 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java @@ -26,6 +26,7 @@ public class SystemTime implements Time { return System.currentTimeMillis(); } + @Override public long nanoseconds() { return System.nanoTime(); } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 9ff73f4..e4e0a04 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -117,24 +117,24 @@ public class MetricsTest { public void testEventWindowing() { Count count = new Count(); MetricConfig config = new MetricConfig().eventWindow(1).samples(2); - count.record(config, 1.0, time.nanoseconds()); - count.record(config, 1.0, time.nanoseconds()); - assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS); - count.record(config, 1.0, time.nanoseconds()); // first event times out - assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS); + count.record(config, 1.0, time.milliseconds()); + count.record(config, 1.0, time.milliseconds()); + assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); + count.record(config, 1.0, time.milliseconds()); // first event times out + assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); } @Test public void testTimeWindowing() { Count count = new Count(); MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS).samples(2); - count.record(config, 1.0, time.nanoseconds()); + count.record(config, 1.0, time.milliseconds()); time.sleep(1); - count.record(config, 1.0, time.nanoseconds()); - assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS); + count.record(config, 1.0, time.milliseconds()); + assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); time.sleep(1); - count.record(config, 1.0, time.nanoseconds()); // oldest event times out - assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS); + count.record(config, 1.0, time.milliseconds()); // oldest event times out + assertEquals(2.0, count.measure(config, time.milliseconds()), EPS); } @Test @@ -143,9 +143,9 @@ public class MetricsTest { long windowMs = 100; int samples = 2; MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples); - max.record(config, 50, time.nanoseconds()); + max.record(config, 50, time.milliseconds()); time.sleep(samples * windowMs); - assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS); + assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()), EPS); } @Test(expected = IllegalArgumentException.class) @@ -213,7 +213,7 @@ public class MetricsTest { public double value = 0.0; @Override - public double measure(MetricConfig config, long now) { + public double measure(MetricConfig config, long nowMs) { return value; } diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java index 6aab854..b24d4de 100644 --- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java +++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java @@ -84,7 +84,7 @@ public class Microbenchmarks { counter++; } } - System.out.println("synchronized: " + ((System.nanoTime() - start) / iters)); + System.out.println("synchronized: " + ((time.nanoseconds() - start) / iters)); System.out.println(counter); done.set(true); } @@ -121,7 +121,7 @@ public class Microbenchmarks { counter++; lock2.unlock(); } - System.out.println("lock: " + ((System.nanoTime() - start) / iters)); + System.out.println("lock: " + ((time.nanoseconds() - start) / iters)); System.out.println(counter); done.set(true); }