diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f1def50..d15562a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -131,6 +131,7 @@ public class KafkaProducer implements Producer { config.getInt(ProducerConfig.TIMEOUT_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), + config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), this.metrics, new SystemTime()); this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index bc4074e..f9de4af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -167,6 +167,10 @@ public class ProducerConfig extends AbstractConfig { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + /** max.in.flight.requests.per.connection */ + public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; + private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."; + static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -202,7 +206,13 @@ public class ProducerConfig extends AbstractConfig { atLeast(0), Importance.LOW, METRICS_SAMPLE_WINDOW_MS_DOC) - .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC); + .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) + .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, + Type.INT, + 5, + atLeast(1), + Importance.LOW, + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 3e83ae0..9b1f565 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -120,6 +120,7 @@ public class Sender implements Runnable { int requestTimeout, int socketSendBuffer, int socketReceiveBuffer, + int maxInFlightRequestsPerConnection, Metrics metrics, Time time) { this.nodeStates = new NodeStates(reconnectBackoffMs); @@ -134,7 +135,7 @@ public class Sender implements Runnable { this.retries = retries; this.socketSendBuffer = socketSendBuffer; this.socketReceiveBuffer = socketReceiveBuffer; - this.inFlightRequests = new InFlightRequests(); + this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection); this.correlation = 0; this.metadataFetchInProgress = false; this.time = time; @@ -678,7 +679,13 @@ public class Sender implements Runnable { * A set of outstanding request queues for each node that have not yet received responses */ private static final class InFlightRequests { - private final Map> requests = new HashMap>(); + private final int maxInFlightRequestsPerConnection; + private final Map> requests; + + public InFlightRequests(int maxInFlightRequestsPerConnection) { + this.requests = new HashMap>(); + this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; + } /** * Add the given request to the queue for the node it was directed to @@ -714,7 +721,8 @@ public class Sender implements Runnable { */ public boolean canSendMore(int node) { Deque queue = requests.get(node); - return queue == null || queue.isEmpty() || queue.peekFirst().request.complete(); + return queue == null || queue.isEmpty() || + (queue.peekFirst().request.complete() && queue.size() < this.maxInFlightRequestsPerConnection); } /** @@ -762,11 +770,11 @@ public class Sender implements Runnable { this.queueTimeSensor = metrics.sensor("queue-time"); this.queueTimeSensor.add("record-queue-time-avg", - "The average time in ms record batches spent in the record accumulator.", - new Avg()); + "The average time in ms record batches spent in the record accumulator.", + new Avg()); this.queueTimeSensor.add("record-queue-time-max", - "The maximum time in ms record batches spent in the record accumulator.", - new Max()); + "The maximum time in ms record batches spent in the record accumulator.", + new Max()); this.requestTimeSensor = metrics.sensor("request-time"); this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg()); @@ -859,7 +867,8 @@ public class Sender implements Runnable { this.retrySensor.record(count, nowMs); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); - if (topicRetrySensor != null) topicRetrySensor.record(count, nowMs); + if (topicRetrySensor != null) + topicRetrySensor.record(count, nowMs); } public void recordErrors(String topic, int count) { @@ -867,7 +876,8 @@ public class Sender implements Runnable { this.errorSensor.record(count, nowMs); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); - if (topicErrorSensor != null) topicErrorSensor.record(count, nowMs); + if (topicErrorSensor != null) + topicErrorSensor.record(count, nowMs); } public void recordLatency(int node, long latency) { @@ -876,7 +886,8 @@ public class Sender implements Runnable { if (node >= 0) { String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); - if (nodeRequestTime != null) nodeRequestTime.record(latency, nowMs); + if (nodeRequestTime != null) + nodeRequestTime.record(latency, nowMs); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index eb18739..dfc5010 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -17,68 +17,194 @@ import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.record.Records; public class ProducerPerformance { + private static final long NS_PER_MS = 1000000L; + private static final long NS_PER_SEC = 1000 * NS_PER_MS; + private static final long MIN_SLEEP_NS = 2 * NS_PER_MS; + public static void main(String[] args) throws Exception { - if (args.length < 5) { - System.err.println("USAGE: java " + ProducerPerformance.class.getName() - + " url topic_name num_records record_size acks [compression_type]"); + if (args.length < 4) { + System.err.println("USAGE: java " + ProducerPerformance.class.getName() + + " topic_name num_records record_size target_throughput [prop_name=prop_value]*"); System.exit(1); } - String url = args[0]; - String topicName = args[1]; - int numRecords = Integer.parseInt(args[2]); - int recordSize = Integer.parseInt(args[3]); - int acks = Integer.parseInt(args[4]); - Properties props = new Properties(); - props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(acks)); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url); - props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000)); - props.setProperty(ProducerConfig.TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); - props.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024)); - props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(64 * 1024)); - if (args.length == 6) - props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]); + /* parse args */ + String topicName = args[0]; + long numRecords = Long.parseLong(args[1]); + int recordSize = Integer.parseInt(args[2]); + int throughput = Integer.parseInt(args[3]); + + Properties props = new Properties(); + for (int i = 4; i < args.length; i++) { + String[] pieces = args[i].split("="); + if (pieces.length != 2) + throw new IllegalArgumentException("Invalid property: " + args[i]); + props.put(pieces[0], pieces[1]); + } KafkaProducer producer = new KafkaProducer(props); - Callback callback = new Callback() { - public void onCompletion(RecordMetadata metadata, Exception e) { - if (e != null) - e.printStackTrace(); - } - }; + + /* setup perf test */ byte[] payload = new byte[recordSize]; Arrays.fill(payload, (byte) 1); ProducerRecord record = new ProducerRecord(topicName, payload); - long start = System.currentTimeMillis(); - long maxLatency = -1L; - long totalLatency = 0; - int reportingInterval = 1000000; + long sleepTime = NS_PER_SEC / throughput; + long sleepDeficitNs = 0; + Stats stats = new Stats(numRecords, 5000); for (int i = 0; i < numRecords; i++) { long sendStart = System.currentTimeMillis(); - producer.send(record, callback); - long sendElapsed = System.currentTimeMillis() - sendStart; - maxLatency = Math.max(maxLatency, sendElapsed); - totalLatency += sendElapsed; - if (i % reportingInterval == 0) { - System.out.printf("%d max latency = %d ms, avg latency = %.5f\n", - i, - maxLatency, - (totalLatency / (double) reportingInterval)); - totalLatency = 0L; - maxLatency = -1L; + Callback cb = stats.nextCompletion(sendStart, payload.length, stats); + producer.send(record, cb); + + /* maybe sleep a little to control throughput */ + if (throughput > 0) { + sleepDeficitNs += sleepTime; + if (sleepDeficitNs >= MIN_SLEEP_NS) { + long sleepMs = sleepDeficitNs / 1000000; + long sleepNs = sleepDeficitNs - sleepMs * 1000000; + Thread.sleep(sleepMs, (int) sleepNs); + sleepDeficitNs = 0; + } } } - long ellapsed = System.currentTimeMillis() - start; - double msgsSec = 1000.0 * numRecords / (double) ellapsed; - double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0); + + /* print final results */ producer.close(); - System.out.printf("%d records sent in %d ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec); + stats.printTotal(); + } + + private static class Stats { + private long start; + private long windowStart; + private int[] latencies; + private int sampling; + private int iteration; + private int index; + private long count; + private long bytes; + private int maxLatency; + private long totalLatency; + private long windowCount; + private int windowMaxLatency; + private long windowTotalLatency; + private long windowBytes; + private long reportingInterval; + + public Stats(long numRecords, int reportingInterval) { + this.start = System.currentTimeMillis(); + this.windowStart = System.currentTimeMillis(); + this.index = 0; + this.iteration = 0; + this.sampling = (int) (numRecords / Math.min(numRecords, 500000)); + this.latencies = new int[(int) (numRecords / this.sampling) + 1]; + this.index = 0; + this.maxLatency = 0; + this.totalLatency = 0; + this.windowCount = 0; + this.windowMaxLatency = 0; + this.windowTotalLatency = 0; + this.windowBytes = 0; + this.totalLatency = 0; + this.reportingInterval = reportingInterval; + } + + public void record(int iter, int latency, int bytes, long time) { + this.count++; + this.bytes += bytes; + this.totalLatency += latency; + this.maxLatency = Math.max(this.maxLatency, latency); + this.windowCount++; + this.windowBytes += bytes; + this.windowTotalLatency += latency; + this.windowMaxLatency = Math.max(windowMaxLatency, latency); + if (iter % this.sampling == 0) { + this.latencies[index] = latency; + this.index++; + } + /* maybe report the recent perf */ + if (time - windowStart >= reportingInterval) { + printWindow(); + newWindow(); + } + } + + public Callback nextCompletion(long start, int bytes, Stats stats) { + Callback cb = new OnCompletion(this.iteration, start, bytes, stats); + this.iteration++; + return cb; + } + + public void printWindow() { + long ellapsed = System.currentTimeMillis() - windowStart; + double recsPerSec = 1000.0 * windowCount / (double) ellapsed; + double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0); + System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n", + windowCount, + recsPerSec, + mbPerSec, + windowTotalLatency / (double) windowCount, + (double) windowMaxLatency); + } + + public void newWindow() { + this.windowStart = System.currentTimeMillis(); + this.windowCount = 0; + this.windowMaxLatency = 0; + this.windowTotalLatency = 0; + this.windowBytes = 0; + } + + public void printTotal() { + long ellapsed = System.currentTimeMillis() - start; + double recsPerSec = 1000.0 * count / (double) ellapsed; + double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0); + int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); + System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n", + count, + recsPerSec, + mbPerSec, + totalLatency / (double) count, + (double) maxLatency, + percs[0], + percs[1], + percs[2], + percs[3]); + } + + private static int[] percentiles(int[] latencies, int count, double... percentiles) { + int size = Math.min(count, latencies.length); + Arrays.sort(latencies, 0, size); + int[] values = new int[percentiles.length]; + for (int i = 0; i < percentiles.length; i++) { + int index = (int) (percentiles[i] * size); + values[i] = latencies[index]; + } + return values; + } + } + + private static final class OnCompletion implements Callback { + private final long start; + private final int iteration; + private final int bytes; + private final Stats stats; + + public OnCompletion(int iter, long start, int bytes, Stats stats) { + this.start = start; + this.stats = stats; + this.iteration = iter; + this.bytes = bytes; + } + + public void onCompletion(RecordMetadata metadata, Exception exception) { + long now = System.currentTimeMillis(); + int latency = (int) (now - start); + this.stats.record(iteration, latency, bytes, now); + } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index a2b7722..3ef692c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -50,6 +50,7 @@ public class SenderTest { private static final int REQUEST_TIMEOUT_MS = 10000; private static final int SEND_BUFFER_SIZE = 64 * 1024; private static final int RECEIVE_BUFFER_SIZE = 64 * 1024; + private static final int MAX_IN_FLIGHT_REQS = Integer.MAX_VALUE; private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); @@ -70,6 +71,7 @@ public class SenderTest { REQUEST_TIMEOUT_MS, SEND_BUFFER_SIZE, RECEIVE_BUFFER_SIZE, + MAX_IN_FLIGHT_REQS, metrics, time); @@ -115,6 +117,7 @@ public class SenderTest { REQUEST_TIMEOUT_MS, SEND_BUFFER_SIZE, RECEIVE_BUFFER_SIZE, + MAX_IN_FLIGHT_REQS, new Metrics(), time); Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c7508d5..ef75b67 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -215,10 +215,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L) /* the purge interval (in number of requests) of the fetch request purgatory */ - val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000) + val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 1000) /* the purge interval (in number of requests) of the producer request purgatory */ - val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000) + val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 1000) /* Enables auto leader balancing. A background thread checks and triggers leader * balance if required at regular intervals */ diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index c064c5c..3d0ff1e 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -61,7 +61,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * this function handles delayed requests that have hit their time limit without being satisfied. * */ -abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 10000) +abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { /* a list of requests watching each key */ @@ -137,8 +137,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge */ private class Watchers { - - private val requests = new util.ArrayList[T] + private val requests = new util.LinkedList[T] def numRequests = requests.size @@ -217,6 +216,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } } if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge + debug("Beginning purgatory purge") requestCounter.set(0) val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala index 37a9ec2..5f8f6bc 100644 --- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala @@ -18,6 +18,7 @@ package kafka.tools import java.util.Properties +import java.util.Arrays import kafka.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} @@ -35,9 +36,10 @@ object TestEndToEndLatency { val consumerProps = new Properties() consumerProps.put("group.id", topic) - consumerProps.put("auto.commit.enable", "true") + consumerProps.put("auto.commit.enable", "false") consumerProps.put("auto.offset.reset", "largest") consumerProps.put("zookeeper.connect", zkConnect) + consumerProps.put("fetch.wait.max.ms", "1") consumerProps.put("socket.timeout.ms", 1201000.toString) val config = new ConsumerConfig(consumerProps) @@ -53,19 +55,26 @@ object TestEndToEndLatency { val message = "hello there beautiful".getBytes var totalTime = 0.0 + val latencies = new Array[Long](numMessages) for (i <- 0 until numMessages) { var begin = System.nanoTime - val response = producer.send(new ProducerRecord(topic, message)) - response.get() + producer.send(new ProducerRecord(topic, message)) val received = iter.next val elapsed = System.nanoTime - begin // poor man's progress bar if (i % 1000 == 0) println(i + "\t" + elapsed / 1000.0 / 1000.0) totalTime += elapsed + latencies(i) = (elapsed / 1000 / 1000) } - println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms") + println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0)) + Arrays.sort(latencies) + val p50 = latencies((latencies.length * 0.5).toInt) + val p99 = latencies((latencies.length * 0.99).toInt) + val p999 = latencies((latencies.length * 0.999).toInt) + println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999)) producer.close() + connector.commitOffsets(true) connector.shutdown() System.exit(0) }