diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1ac6943..e42d923 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -39,12 +39,15 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +72,7 @@ public class KafkaProducer implements Producer { private final Sender sender; private final Metrics metrics; private final Thread ioThread; + private final Sensor errors; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -90,9 +94,11 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config) { log.trace("Starting the Kafka producer"); - this.metrics = new Metrics(new MetricConfig(), - Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")), - new SystemTime()); + Time time = new SystemTime(); + String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); + String jmxPrefix = "kafka.producer." + (clientId.length() > 0 ? clientId + "." : ""); + List reporters = Collections.singletonList((MetricsReporter) new JmxReporter(jmxPrefix)); + this.metrics = new Metrics(new MetricConfig(), reporters, time); this.partitioner = new Partitioner(); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG), @@ -105,13 +111,13 @@ public class KafkaProducer implements Producer { config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, - new SystemTime()); + time); List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), System.currentTimeMillis()); - this.sender = new Sender(new Selector(), + this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); + this.sender = new Sender(new Selector(this.metrics, time), this.metadata, this.accumulator, - config.getString(ProducerConfig.CLIENT_ID_CONFIG), + clientId, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG), @@ -119,9 +125,14 @@ public class KafkaProducer implements Producer { config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), + this.metrics, new SystemTime()); this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); this.ioThread.start(); + + this.errors = this.metrics.sensor("errors"); + this.errors.add("message-error-rate", "The average number of errors per second returned to the client.", new Rate()); + config.logUnused(); log.debug("Kafka producer started"); } @@ -221,7 +232,8 @@ public class KafkaProducer implements Producer { try { Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); int partition = partitioner.partition(record, cluster); - ensureValidSize(record.key(), record.value()); + int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value()); + ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); @@ -233,8 +245,10 @@ public class KafkaProducer implements Producer { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); + this.errors.record(); return new FutureFailure(e); } catch (InterruptedException e) { + this.errors.record(); throw new KafkaException(e); } } @@ -242,15 +256,14 @@ public class KafkaProducer implements Producer { /** * Check that this key-value pair will have a serialized size small enough */ - private void ensureValidSize(byte[] key, byte[] value) { - int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); - if (serializedSize > this.maxRequestSize) - throw new RecordTooLargeException("The message is " + serializedSize + private void ensureValidRecordSize(int size) { + if (size > this.maxRequestSize) + throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the maximum request size you have configured with the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration."); - if (serializedSize > this.totalMemorySize) - throw new RecordTooLargeException("The message is " + serializedSize + if (size > this.totalMemorySize) + throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG + " configuration."); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 33d62a4..de0543a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -50,6 +50,7 @@ public final class Metadata { /** * Create a new Metadata instance + * @param metrics The metrics to use * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy * polling * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 673b296..1faee37 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -90,27 +90,34 @@ public final class RecordAccumulator { } private void registerMetrics(Metrics metrics) { - metrics.addMetric("blocked_threads", + metrics.addMetric("waiting-threads", "The number of user threads blocked waiting for buffer memory to enqueue their records", new Measurable() { public double measure(MetricConfig config, long now) { return free.queued(); } }); - metrics.addMetric("buffer_total_bytes", - "The total amount of buffer memory that is available (not currently used for buffering records).", + metrics.addMetric("buffer-total-bytes", + "The maximum amount of buffer memory the client can use (whether or not it is currently used).", new Measurable() { public double measure(MetricConfig config, long now) { return free.totalMemory(); } }); - metrics.addMetric("buffer_available_bytes", - "The total amount of buffer memory that is available (not currently used for buffering records).", + metrics.addMetric("buffer-available-bytes", + "The total amount of buffer memory that is not being used (either unallocated or in the free list).", new Measurable() { public double measure(MetricConfig config, long now) { return free.availableMemory(); } }); + metrics.addMetric("ready-partitions", + "The number of topic-partitions with buffered data ready to be sent.", + new Measurable() { + public double measure(MetricConfig config, long now) { + return ready(now).size(); + } + }); } /** @@ -227,7 +234,7 @@ public final class RecordAccumulator { * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize. * TODO: There may be a starvation issue due to iteration order */ - public List drain(List partitions, int maxSize) { + public List drain(List partitions, int maxSize, long now) { if (partitions.isEmpty()) return Collections.emptyList(); int size = 0; @@ -245,6 +252,7 @@ public final class RecordAccumulator { RecordBatch batch = deque.pollFirst(); size += batch.records.sizeInBytes(); ready.add(batch); + batch.drained = now; } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 038a05a..e9ea86f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -32,8 +32,10 @@ public final class RecordBatch { private static final Logger log = LoggerFactory.getLogger(RecordBatch.class); public int recordCount = 0; + public int maxRecordSize = 0; public volatile int attempts = 0; public final long created; + public long drained; public long lastAttempt; public final MemoryRecords records; public final TopicPartition topicPartition; @@ -58,7 +60,10 @@ public final class RecordBatch { if (!this.records.hasRoomFor(key, value)) { return null; } else { + int start = this.records.sizeInBytes(); this.records.append(0L, key, value, compression); + int end = this.records.sizeInBytes(); + this.maxRecordSize = Math.max(this.maxRecordSize, end - start); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); if (callback != null) thunks.add(new Thunk(callback, future)); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 565331d..380964c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -24,13 +24,20 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidMetadataException; -import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; @@ -108,6 +115,9 @@ public class Sender implements Runnable { /* true while the sender thread is still running */ private volatile boolean running; + /* metrics */ + private final SenderMetrics sensors; + public Sender(Selectable selector, Metadata metadata, RecordAccumulator accumulator, @@ -119,6 +129,7 @@ public class Sender implements Runnable { int requestTimeout, int socketSendBuffer, int socketReceiveBuffer, + Metrics metrics, Time time) { this.nodeStates = new NodeStates(reconnectBackoffMs); this.accumulator = accumulator; @@ -137,6 +148,7 @@ public class Sender implements Runnable { this.metadataFetchInProgress = false; this.time = time; this.metadataFetchNodeIndex = new Random().nextInt(); + this.sensors = new SenderMetrics(metrics); } /** @@ -191,8 +203,9 @@ public class Sender implements Runnable { List sendable = processReadyPartitions(cluster, ready, now); // create produce requests - List batches = this.accumulator.drain(sendable, this.maxRequestSize); - List requests = collate(cluster, batches); + List batches = this.accumulator.drain(sendable, this.maxRequestSize, now); + List requests = collate(cluster, batches, now); + sensors.updateProduceRequestMetrics(requests); if (ready.size() > 0) { log.trace("Partitions with complete batches: {}", ready); @@ -234,7 +247,7 @@ public class Sender implements Runnable { if (nodeStates.isConnected(node.id())) { Set topics = metadata.topics(); this.metadataFetchInProgress = true; - InFlightRequest metadataRequest = metadataRequest(node.id(), topics); + InFlightRequest metadataRequest = metadataRequest(now, node.id(), topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); sends.add(metadataRequest.request); this.inFlightRequests.add(metadataRequest); @@ -349,16 +362,9 @@ public class Sender implements Runnable { ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey()); switch (requestKey) { case PRODUCE: - for (RecordBatch batch : request.batches.values()) { - if (canRetry(batch, Errors.NETWORK_EXCEPTION)) { - log.warn("Destination node disconnected for topic-partition {}, retrying ({} attempts left).", - batch.topicPartition, this.retries - batch.attempts - 1); - this.accumulator.reenqueue(batch, now); - } else { - batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); - this.accumulator.deallocate(batch); - } - } + int correlation = request.request.header().correlationId(); + for (RecordBatch batch : request.batches.values()) + completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now); break; case METADATA: metadataFetchInProgress = false; @@ -408,6 +414,7 @@ public class Sender implements Runnable { * Handle responses from the server */ private void handleResponses(List receives, long now) { + long ns = time.nanoseconds(); for (NetworkReceive receive : receives) { int source = receive.source(); InFlightRequest req = inFlightRequests.nextCompleted(source); @@ -420,12 +427,14 @@ public class Sender implements Runnable { handleProduceResponse(req, req.request.header(), body, now); } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) { log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header() - .correlationId()); + .correlationId()); handleMetadataResponse(req.request.header(), body, now); } else { throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey()); } + this.sensors.recordLatency(receive.source(), now - req.created, ns); } + } private void handleMetadataResponse(RequestHeader header, Struct body, long now) { @@ -453,21 +462,39 @@ public class Sender implements Runnable { if (error.exception() instanceof InvalidMetadataException) metadata.forceUpdate(); RecordBatch batch = request.batches.get(tp); - if (canRetry(batch, error)) { - // retry - log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", - header.correlationId(), batch.topicPartition, this.retries - batch.attempts - 1, error); - this.accumulator.reenqueue(batch, now); - } else { - // tell the user the result of their request - batch.done(response.baseOffset, error.exception()); - this.accumulator.deallocate(batch); - } + completeBatch(batch, error, response.baseOffset, header.correlationId(), now); } } } /** + * Complete or retry the given batch of records. + * @param batch The record batch + * @param error The error (or null if none) + * @param baseOffset The base offset assigned to the records if successful + * @param correlationId The correlation id for the request + * @param now The current time stamp + */ + private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) { + if (error != Errors.NONE && canRetry(batch, error)) { + // retry + log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", + correlationId, + batch.topicPartition, + this.retries - batch.attempts - 1, + error); + this.accumulator.reenqueue(batch, now); + this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); + } else { + // tell the user the result of their request + batch.done(baseOffset, error.exception()); + this.accumulator.deallocate(batch); + if (error != Errors.NONE) + this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); + } + } + + /** * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed */ private boolean canRetry(RecordBatch batch, Errors error) { @@ -488,16 +515,16 @@ public class Sender implements Runnable { /** * Create a metadata request for the given topics */ - private InFlightRequest metadataRequest(int node, Set topics) { + private InFlightRequest metadataRequest(long now, int node, Set topics) { MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct()); - return new InFlightRequest(true, send, null); + return new InFlightRequest(now, true, send, null); } /** * Collate the record batches into a list of produce requests on a per-node basis */ - private List collate(Cluster cluster, List batches) { + private List collate(Cluster cluster, List batches, long now) { Map> collated = new HashMap>(); for (RecordBatch batch : batches) { Node node = cluster.leaderFor(batch.topicPartition); @@ -510,14 +537,14 @@ public class Sender implements Runnable { } List requests = new ArrayList(collated.size()); for (Map.Entry> entry : collated.entrySet()) - requests.add(produceRequest(entry.getKey(), acks, requestTimeout, entry.getValue())); + requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); return requests; } /** * Create a produce request from the given record batches */ - private InFlightRequest produceRequest(int destination, short acks, int timeout, List batches) { + private InFlightRequest produceRequest(long now, int destination, short acks, int timeout, List batches) { Map batchesByPartition = new HashMap(); Map> batchesByTopic = new HashMap>(); for (RecordBatch batch : batches) { @@ -552,7 +579,7 @@ public class Sender implements Runnable { produce.set("topic_data", topicDatas.toArray()); RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce); - return new InFlightRequest(acks != 0, send, batchesByPartition); + return new InFlightRequest(now, acks != 0, send, batchesByPartition); } private RequestHeader header(ApiKeys key) { @@ -641,6 +668,7 @@ public class Sender implements Runnable { * An request that hasn't been fully processed yet */ private static final class InFlightRequest { + public long created; public boolean expectResponse; public Map batches; public RequestSend request; @@ -650,7 +678,8 @@ public class Sender implements Runnable { * @param request The request * @param batches The record batches contained in the request if it is a produce request */ - public InFlightRequest(boolean expectResponse, RequestSend request, Map batches) { + public InFlightRequest(long created, boolean expectResponse, RequestSend request, Map batches) { + this.created = created; this.batches = batches; this.request = request; this.expectResponse = expectResponse; @@ -728,4 +757,117 @@ public class Sender implements Runnable { } } + /** + * A collection of sensors for the sender + */ + private class SenderMetrics { + + private final Metrics metrics; + public final Sensor retrySensor; + public final Sensor errorSensor; + public final Sensor queueTimeSensor; + public final Sensor requestTimeSensor; + public final Sensor messagesPerRequestSensor; + public final Sensor batchSizeSensor; + public final Sensor maxRecordSizeSensor; + + public SenderMetrics(Metrics metrics) { + this.metrics = metrics; + this.batchSizeSensor = metrics.sensor("batch-size"); + this.queueTimeSensor = metrics.sensor("queue-time"); + this.requestTimeSensor = metrics.sensor("request-time"); + this.messagesPerRequestSensor = metrics.sensor("messages-per-request"); + this.retrySensor = metrics.sensor("message-retries"); + this.errorSensor = metrics.sensor("message-error-rate"); + this.maxRecordSizeSensor = metrics.sensor("max-record-size"); + this.batchSizeSensor.add("batch-size-avg", "The average number of bytes per partition sent in requests.", new Avg()); + this.retrySensor.add("message-retry-rate", "The average per-second number of retried message sends", new Rate()); + this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg()); + this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max()); + this.messagesPerRequestSensor.add("message-send-rate", "The average number of messages sent per second.", new Rate()); + this.messagesPerRequestSensor.add("messages-per-request", "The average number of messages per request.", new Avg()); + this.maxRecordSizeSensor.add("message-size-max", "The maximum message size", new Max()); + this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() { + public double measure(MetricConfig config, long now) { + return inFlightRequests.totalInFlightRequests(); + } + }); + metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() { + public double measure(MetricConfig config, long now) { + return (TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS) - metadata.lastUpdate()) / 1000.0; + } + }); + } + + public void updateProduceRequestMetrics(List requests) { + long ns = time.nanoseconds(); + for (int i = 0; i < requests.size(); i++) { + InFlightRequest request = requests.get(i); + int messages = 0; + if (request.batches != null) { + for (RecordBatch batch : request.batches.values()) { + + // per-topic message count + String topicMessageCountName = "topic." + batch.topicPartition.topic() + ".messages-per-batch"; + Sensor topicMessageCount = this.metrics.getSensor(topicMessageCountName); + if (topicMessageCount == null) { + topicMessageCount = this.metrics.sensor(topicMessageCountName); + topicMessageCount.add("topic." + batch.topicPartition.topic() + ".message-rate", new Rate()); + } + topicMessageCount.record(batch.recordCount); + + // per-topic bytes-per-second + String topicByteRateName = "topic." + batch.topicPartition.topic() + ".bytes"; + Sensor topicByteRate = this.metrics.getSensor(topicByteRateName); + if (topicByteRate == null) { + topicByteRate = this.metrics.sensor(topicByteRateName); + topicByteRate.add("topic." + batch.topicPartition.topic() + ".byte-rate", new Rate()); + } + topicByteRate.record(batch.records.sizeInBytes()); + + this.batchSizeSensor.record(batch.records.sizeInBytes(), ns); + this.queueTimeSensor.record(batch.drained - batch.created, ns); + this.maxRecordSizeSensor.record(batch.maxRecordSize, ns); + messages += batch.recordCount; + } + this.messagesPerRequestSensor.record(messages, ns); + } + } + } + + public void recordRetries(String topic, int count) { + this.retrySensor.record(count); + String topicRetryName = "topic." + topic + ".message-retries"; + Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); + if (topicRetrySensor == null) { + topicRetrySensor = this.metrics.sensor(topicRetryName); + topicRetrySensor.add("topic." + topic + ".message-retry-rate", new Rate()); + } + topicRetrySensor.record(count); + } + + public void recordErrors(String topic, int count) { + this.errorSensor.record(count); + String topicErrorName = "topic." + topic + ".message-retries"; + Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); + if (topicErrorSensor == null) { + topicErrorSensor = this.metrics.sensor(topicErrorName); + topicErrorSensor.add("topic." + topic + ".message-retries", new Rate()); + } + topicErrorSensor.record(count); + } + + public void recordLatency(int node, long latency, long nowNs) { + this.requestTimeSensor.record(latency, nowNs); + String nodeTimeName = "server." + node + ".latency"; + Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); + if (nodeRequestTime == null) { + nodeRequestTime = this.metrics.sensor(nodeTimeName); + nodeRequestTime.add("node-" + node + ".latency-avg", new Avg()); + nodeRequestTime.add("node-" + node + ".latency-max", new Max()); + } + nodeRequestTime.record(latency, nowNs); + } + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index 3ebbb80..e016815 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.tools; @@ -43,7 +39,7 @@ public class ProducerPerformance { props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url); props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000)); props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); - props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024)); + props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024)); props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024)); KafkaProducer producer = new KafkaProducer(props); @@ -79,7 +75,7 @@ public class ProducerPerformance { double msgsSec = 1000.0 * numRecords / (double) ellapsed; double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0); producer.close(); - System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec); + System.out.printf("%d records sent in %d ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec); } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index 6db2dfb..09f4ba5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -1,32 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; - /** * A registry of sensors and metrics. *

@@ -87,8 +82,8 @@ public class Metrics { */ public Metrics(MetricConfig defaultConfig, List reporters, Time time) { this.config = defaultConfig; - this.sensors = new ConcurrentHashMap(); - this.metrics = new ConcurrentHashMap(); + this.sensors = new CopyOnWriteMap(); + this.metrics = new CopyOnWriteMap(); this.reporters = Utils.notNull(reporters); this.time = time; for (MetricsReporter reporter : reporters) @@ -96,8 +91,26 @@ public class Metrics { } /** - * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every - * value recorded with this sensor. + * Get the sensor with the given name if it exists + * @param name The name of the sensor + * @return Return the sensor or null if no such sensor exists + */ + public Sensor getSensor(String name) { + return this.sensors.get(Utils.notNull(name)); + } + + /** + * Get or create a sensor with the given unique name and no parent sensors. + * @param name The sensor name + * @return The sensor + */ + public Sensor sensor(String name) { + return sensor(name, null, (Sensor[]) null); + } + + /** + * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will + * receive every value recorded with this sensor. * @param name The name of the sensor * @param parents The parent sensors * @return The sensor that is created @@ -107,15 +120,15 @@ public class Metrics { } /** - * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every - * value recorded with this sensor. + * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will + * receive every value recorded with this sensor. * @param name The name of the sensor * @param config A default configuration to use for this sensor for metrics that don't have their own config * @param parents The parent sensors * @return The sensor that is created */ public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) { - Sensor s = this.sensors.get(Utils.notNull(name)); + Sensor s = getSensor(name); if (s == null) { s = new Sensor(this, name, parents, config == null ? this.config : config, time); this.sensors.put(name, s); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 7e4849b..d68349b 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics; @@ -26,7 +22,6 @@ import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; - /** * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set @@ -46,7 +41,7 @@ public final class Sensor { super(); this.registry = registry; this.name = Utils.notNull(name); - this.parents = parents; + this.parents = parents == null ? new Sensor[0] : parents; this.metrics = new ArrayList(); this.stats = new ArrayList(); this.config = config; @@ -86,27 +81,39 @@ public final class Sensor { record(value, time.nanoseconds()); } - private void record(double value, long time) { + /** + * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse + * the time stamp. + * @param value The value we are recording + * @param time The time in nanoseconds + * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum + * bound + */ + public void record(double value, long time) { synchronized (this) { // increment all the stats for (int i = 0; i < this.stats.size(); i++) this.stats.get(i).record(config, value, time); checkQuotas(time); - } for (int i = 0; i < parents.length; i++) parents[i].record(value, time); } + /** + * Check if we have violated our quota for any metric that has a configured quota + * @param time + */ private void checkQuotas(long time) { for (int i = 0; i < this.metrics.size(); i++) { KafkaMetric metric = this.metrics.get(i); MetricConfig config = metric.config(); if (config != null) { Quota quota = config.quota(); - if (quota != null) + if (quota != null) { if (!quota.acceptable(metric.value(time))) throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound()); + } } } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 3b0454f..9e5bc07 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics.stats; @@ -22,7 +18,6 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.common.metrics.MeasurableStat; import org.apache.kafka.common.metrics.MetricConfig; - /** * The rate of the given quanitity. By default this is the total observed over a set of samples from a sampled statistic * divided by the ellapsed time over the sample windows. Alternative {@link SampledStat} implementations can be @@ -42,6 +37,10 @@ public class Rate implements MeasurableStat { this(unit, new SampledTotal()); } + public Rate(SampledStat stat) { + this(TimeUnit.SECONDS, stat); + } + public Rate(TimeUnit unit, SampledStat stat) { this.stat = stat; this.unit = unit; @@ -58,7 +57,7 @@ public class Rate implements MeasurableStat { @Override public double measure(MetricConfig config, long now) { - double ellapsed = convert(now - stat.oldest().lastWindow); + double ellapsed = convert(now - stat.oldest(now).lastWindow); return stat.measure(config, now) / ellapsed; } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java index f8b413a..ebc0445 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics.stats; @@ -22,7 +18,6 @@ import java.util.List; import org.apache.kafka.common.metrics.MeasurableStat; import org.apache.kafka.common.metrics.MetricConfig; - /** * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a * configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the @@ -82,8 +77,10 @@ public abstract class SampledStat implements MeasurableStat { return this.samples.get(this.current); } - public Sample oldest() { - return this.samples.get((this.current + 1) % this.samples.size()); + public Sample oldest(long now) { + if (samples.size() == 0) + this.samples.add(newSample(now)); + return this.samples.get((this.current + 1) % samples.size()); } protected abstract void update(Sample sample, MetricConfig config, double value, long now); diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java index 9305b61..6350424 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; @@ -29,6 +25,7 @@ public class ByteBufferSend implements Send { private final int destination; protected final ByteBuffer[] buffers; private int remaining; + private int size; public ByteBufferSend(int destination, ByteBuffer... buffers) { super(); @@ -36,6 +33,7 @@ public class ByteBufferSend implements Send { this.buffers = buffers; for (int i = 0; i < buffers.length; i++) remaining += buffers[i].remaining(); + this.size = remaining; } @Override @@ -58,6 +56,10 @@ public class ByteBufferSend implements Send { return this.remaining; } + public int size() { + return this.size; + } + @Override public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index 51d4892..dcc639a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 9839632..6ea9486 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -25,8 +25,18 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,21 +78,25 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; + private final Time time; + private final SelectorMetrics sensors; /** * Create a new selector */ - public Selector() { + public Selector(Metrics metrics, Time time) { try { this.selector = java.nio.channels.Selector.open(); } catch (IOException e) { throw new KafkaException(e); } + this.time = time; this.keys = new HashMap(); this.completedSends = new ArrayList(); this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); + this.sensors = new SelectorMetrics(metrics); } /** @@ -192,7 +206,11 @@ public class Selector implements Selectable { } /* check ready keys */ + long startSelect = time.nanoseconds(); int readyKeys = select(timeout); + long endSelect = time.nanoseconds(); + this.sensors.selectTime.record(endSelect - startSelect, endSelect); + if (readyKeys > 0) { Set keys = this.selector.selectedKeys(); Iterator iter = keys.iterator(); @@ -208,6 +226,7 @@ public class Selector implements Selectable { channel.finishConnect(); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); this.connected.add(transmissions.id); + this.sensors.connectionCreated.record(); } /* read from any connections that have readable data */ @@ -218,6 +237,7 @@ public class Selector implements Selectable { if (transmissions.receive.complete()) { transmissions.receive.payload().rewind(); this.completedReceives.add(transmissions.receive); + this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit()); transmissions.clearReceive(); } } @@ -227,6 +247,7 @@ public class Selector implements Selectable { transmissions.send.writeTo(channel); if (transmissions.send.remaining() <= 0) { this.completedSends.add(transmissions.send); + this.sensors.recordBytesSent(transmissions.id, transmissions.send.size()); transmissions.clearSend(); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); } @@ -241,6 +262,8 @@ public class Selector implements Selectable { } } } + long endIo = time.nanoseconds(); + this.sensors.ioTime.record(endIo - endSelect, endIo); } @Override @@ -309,6 +332,7 @@ public class Selector implements Selectable { } catch (IOException e) { log.error("Exception closing connection to node {}:", trans.id, e); } + this.sensors.connectionClosed.record(); } /** @@ -364,4 +388,81 @@ public class Selector implements Selectable { } } + private class SelectorMetrics { + private final Metrics metrics; + public final Sensor connectionClosed; + public final Sensor connectionCreated; + public final Sensor bytesTransferred; + public final Sensor bytesSent; + public final Sensor bytesReceived; + public final Sensor selectTime; + public final Sensor ioTime; + + public SelectorMetrics(Metrics metrics) { + this.metrics = metrics; + this.connectionClosed = this.metrics.sensor("connections-closed"); + this.connectionCreated = this.metrics.sensor("connections-created"); + this.bytesTransferred = this.metrics.sensor("bytes-sent-received"); + this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred); + this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred); + this.selectTime = this.metrics.sensor("select-time"); + this.ioTime = this.metrics.sensor("io-time"); + bytesTransferred.add("network-ops-per-second", + "The average number of network operations (reads or writes) on all connections per second.", + new Rate(new Count())); + this.bytesSent.add("bytes-sent-per-second", "The average number of outgoing bytes sent per second to all servers.", new Rate()); + this.bytesSent.add("requests-sent-per-second", "The average number of requests sent per second.", new Rate(new Count())); + this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg()); + this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max()); + this.bytesReceived.add("bytes-received-per-second", "Bytes/second read off all sockets", new Rate()); + this.bytesReceived.add("responses-received-per-second", "Responses received sent per second.", new Rate(new Count())); + this.connectionCreated.add("connections-created-per-second", + "New connections established per second in the window.", + new Rate()); + this.connectionClosed.add("connections-closed-per-second", "Connections closed per second in the window.", new Rate()); + this.selectTime.add("select-calls-per-second", + "Number of times the I/O layer checked for new I/O to perform per second", + new Rate(new Count())); + this.selectTime.add("select-time-avg-ns", "The average length of time per select call in nanoseconds.", new Avg()); + this.selectTime.add("select-percentage", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS)); + this.ioTime.add("io-time-avg-ns", "The average length of time for I/O per select call in nanoseconds.", new Avg()); + this.ioTime.add("io-percentage", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS)); + this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() { + public double measure(MetricConfig config, long now) { + return keys.size(); + } + }); + } + + public void recordBytesSent(int node, int bytes) { + this.bytesSent.record(bytes); + String name = "node-" + node + ".bytes-sent"; + Sensor sensor = this.metrics.getSensor(name); + if (sensor == null) { + sensor = this.metrics.sensor(name); + sensor.add("node-" + node + ".bytes-sent-per-second", new Rate()); + sensor.add("node-" + node + ".requests-sent-per-second", + "The average number of requests sent per second.", + new Rate(new Count())); + sensor.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg()); + sensor.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max()); + } + sensor.record(bytes); + } + + public void recordBytesReceived(int node, int bytes) { + this.bytesReceived.record(bytes); + String name = "node-" + node + ".bytes-received"; + Sensor sensor = this.metrics.getSensor(name); + if (sensor == null) { + sensor = this.metrics.sensor(name); + sensor.add("node-" + node + ".bytes-received-per-second", new Rate()); + sensor.add("node-" + node + ".responses-received-per-second", + "The average number of responses received per second.", + new Rate(new Count())); + } + sensor.record(bytes); + } + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 09a5355..8b4ac0f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index ed56906..f37ab77 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -52,7 +52,7 @@ public class RecordAccumulatorTest { } accum.append(tp, key, value, CompressionType.NONE, null); assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); - List batches = accum.drain(asList(tp), Integer.MAX_VALUE); + List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -80,7 +80,7 @@ public class RecordAccumulatorTest { assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size()); time.sleep(10); assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); - List batches = accum.drain(asList(tp), Integer.MAX_VALUE); + List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -101,7 +101,7 @@ public class RecordAccumulatorTest { } assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size()); - List batches = accum.drain(partitions, 1024); + List batches = accum.drain(partitions, 1024, 0); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); } @@ -131,7 +131,7 @@ public class RecordAccumulatorTest { long now = time.milliseconds(); while (read < numThreads * msgs) { List tps = accum.ready(now); - List batches = accum.drain(tps, 5 * 1024); + List batches = accum.drain(tps, 5 * 1024, 0); for (RecordBatch batch : batches) { for (LogEntry entry : batch.records) read++; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 12c9500..a2b7722 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -70,6 +70,7 @@ public class SenderTest { REQUEST_TIMEOUT_MS, SEND_BUFFER_SIZE, RECEIVE_BUFFER_SIZE, + metrics, time); @Before @@ -114,6 +115,7 @@ public class SenderTest { REQUEST_TIMEOUT_MS, SEND_BUFFER_SIZE, RECEIVE_BUFFER_SIZE, + new Metrics(), time); Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); RequestSend request1 = completeSend(sender); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 90e2dcf..60a38e5 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; @@ -32,11 +28,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; - -import org.apache.kafka.common.network.NetworkReceive; -import org.apache.kafka.common.network.NetworkSend; -import org.apache.kafka.common.network.Selectable; -import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -58,7 +51,7 @@ public class SelectorTest { public void setup() throws Exception { this.server = new EchoServer(); this.server.start(); - this.selector = new Selector(); + this.selector = new Selector(new Metrics(), new MockTime()); } @After diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java index 7239b4a..a7c4835 100644 --- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java +++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.test; @@ -21,12 +17,7 @@ import java.util.Arrays; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Percentile; -import org.apache.kafka.common.metrics.stats.Percentiles; -import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; - public class MetricsBench { @@ -37,18 +28,18 @@ public class MetricsBench { Sensor child = metrics.sensor("child", parent); for (Sensor sensor : Arrays.asList(parent, child)) { sensor.add(sensor.name() + ".avg", new Avg()); - sensor.add(sensor.name() + ".count", new Count()); + // sensor.add(sensor.name() + ".count", new Count()); sensor.add(sensor.name() + ".max", new Max()); - sensor.add(new Percentiles(1024, - 0.0, - iters, - BucketSizing.CONSTANT, - new Percentile(sensor.name() + ".median", 50.0), - new Percentile(sensor.name() + ".p_99", 99.0))); + // sensor.add(new Percentiles(1024, + // 0.0, + // iters, + // BucketSizing.CONSTANT, + // new Percentile(sensor.name() + ".median", 50.0), + // new Percentile(sensor.name() + ".p_99", 99.0))); } long start = System.nanoTime(); for (int i = 0; i < iters; i++) - child.record(i); + parent.record(i); double ellapsed = (System.nanoTime() - start) / (double) iters; System.out.println(String.format("%.2f ns per metric recording.", ellapsed)); }