diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index f1def50..d15562a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -131,6 +131,7 @@ public class KafkaProducer implements Producer {
config.getInt(ProducerConfig.TIMEOUT_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
+ config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
this.metrics,
new SystemTime());
this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index bc4074e..f9de4af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -167,6 +167,10 @@ public class ProducerConfig extends AbstractConfig {
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
+ /** max.in.flight.requests.per.connection */
+ public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
+ private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking.";
+
static {
config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -202,7 +206,13 @@ public class ProducerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
METRICS_SAMPLE_WINDOW_MS_DOC)
- .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC);
+ .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+ .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+ Type.INT,
+ 5,
+ atLeast(1),
+ Importance.LOW,
+ MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC);
}
ProducerConfig(Map extends Object, ? extends Object> props) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 3e83ae0..9b1f565 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -120,6 +120,7 @@ public class Sender implements Runnable {
int requestTimeout,
int socketSendBuffer,
int socketReceiveBuffer,
+ int maxInFlightRequestsPerConnection,
Metrics metrics,
Time time) {
this.nodeStates = new NodeStates(reconnectBackoffMs);
@@ -134,7 +135,7 @@ public class Sender implements Runnable {
this.retries = retries;
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
- this.inFlightRequests = new InFlightRequests();
+ this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
this.correlation = 0;
this.metadataFetchInProgress = false;
this.time = time;
@@ -678,7 +679,13 @@ public class Sender implements Runnable {
* A set of outstanding request queues for each node that have not yet received responses
*/
private static final class InFlightRequests {
- private final Map> requests = new HashMap>();
+ private final int maxInFlightRequestsPerConnection;
+ private final Map> requests;
+
+ public InFlightRequests(int maxInFlightRequestsPerConnection) {
+ this.requests = new HashMap>();
+ this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
+ }
/**
* Add the given request to the queue for the node it was directed to
@@ -714,7 +721,8 @@ public class Sender implements Runnable {
*/
public boolean canSendMore(int node) {
Deque queue = requests.get(node);
- return queue == null || queue.isEmpty() || queue.peekFirst().request.complete();
+ return queue == null || queue.isEmpty() ||
+ (queue.peekFirst().request.complete() && queue.size() < this.maxInFlightRequestsPerConnection);
}
/**
@@ -762,11 +770,11 @@ public class Sender implements Runnable {
this.queueTimeSensor = metrics.sensor("queue-time");
this.queueTimeSensor.add("record-queue-time-avg",
- "The average time in ms record batches spent in the record accumulator.",
- new Avg());
+ "The average time in ms record batches spent in the record accumulator.",
+ new Avg());
this.queueTimeSensor.add("record-queue-time-max",
- "The maximum time in ms record batches spent in the record accumulator.",
- new Max());
+ "The maximum time in ms record batches spent in the record accumulator.",
+ new Max());
this.requestTimeSensor = metrics.sensor("request-time");
this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
@@ -859,7 +867,8 @@ public class Sender implements Runnable {
this.retrySensor.record(count, nowMs);
String topicRetryName = "topic." + topic + ".record-retries";
Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
- if (topicRetrySensor != null) topicRetrySensor.record(count, nowMs);
+ if (topicRetrySensor != null)
+ topicRetrySensor.record(count, nowMs);
}
public void recordErrors(String topic, int count) {
@@ -867,7 +876,8 @@ public class Sender implements Runnable {
this.errorSensor.record(count, nowMs);
String topicErrorName = "topic." + topic + ".record-errors";
Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
- if (topicErrorSensor != null) topicErrorSensor.record(count, nowMs);
+ if (topicErrorSensor != null)
+ topicErrorSensor.record(count, nowMs);
}
public void recordLatency(int node, long latency) {
@@ -876,7 +886,8 @@ public class Sender implements Runnable {
if (node >= 0) {
String nodeTimeName = "node-" + node + ".latency";
Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
- if (nodeRequestTime != null) nodeRequestTime.record(latency, nowMs);
+ if (nodeRequestTime != null)
+ nodeRequestTime.record(latency, nowMs);
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index eb18739..dfc5010 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -17,68 +17,194 @@ import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.record.Records;
public class ProducerPerformance {
+ private static final long NS_PER_MS = 1000000L;
+ private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+ private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+
public static void main(String[] args) throws Exception {
- if (args.length < 5) {
- System.err.println("USAGE: java " + ProducerPerformance.class.getName()
- + " url topic_name num_records record_size acks [compression_type]");
+ if (args.length < 4) {
+ System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
+ " topic_name num_records record_size target_throughput [prop_name=prop_value]*");
System.exit(1);
}
- String url = args[0];
- String topicName = args[1];
- int numRecords = Integer.parseInt(args[2]);
- int recordSize = Integer.parseInt(args[3]);
- int acks = Integer.parseInt(args[4]);
- Properties props = new Properties();
- props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(acks));
- props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
- props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
- props.setProperty(ProducerConfig.TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
- props.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024));
- props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(64 * 1024));
- if (args.length == 6)
- props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]);
+ /* parse args */
+ String topicName = args[0];
+ long numRecords = Long.parseLong(args[1]);
+ int recordSize = Integer.parseInt(args[2]);
+ int throughput = Integer.parseInt(args[3]);
+
+ Properties props = new Properties();
+ for (int i = 4; i < args.length; i++) {
+ String[] pieces = args[i].split("=");
+ if (pieces.length != 2)
+ throw new IllegalArgumentException("Invalid property: " + args[i]);
+ props.put(pieces[0], pieces[1]);
+ }
KafkaProducer producer = new KafkaProducer(props);
- Callback callback = new Callback() {
- public void onCompletion(RecordMetadata metadata, Exception e) {
- if (e != null)
- e.printStackTrace();
- }
- };
+
+ /* setup perf test */
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
ProducerRecord record = new ProducerRecord(topicName, payload);
- long start = System.currentTimeMillis();
- long maxLatency = -1L;
- long totalLatency = 0;
- int reportingInterval = 1000000;
+ long sleepTime = NS_PER_SEC / throughput;
+ long sleepDeficitNs = 0;
+ Stats stats = new Stats(numRecords, 5000);
for (int i = 0; i < numRecords; i++) {
long sendStart = System.currentTimeMillis();
- producer.send(record, callback);
- long sendElapsed = System.currentTimeMillis() - sendStart;
- maxLatency = Math.max(maxLatency, sendElapsed);
- totalLatency += sendElapsed;
- if (i % reportingInterval == 0) {
- System.out.printf("%d max latency = %d ms, avg latency = %.5f\n",
- i,
- maxLatency,
- (totalLatency / (double) reportingInterval));
- totalLatency = 0L;
- maxLatency = -1L;
+ Callback cb = stats.nextCompletion(sendStart, payload.length, stats);
+ producer.send(record, cb);
+
+ /* maybe sleep a little to control throughput */
+ if (throughput > 0) {
+ sleepDeficitNs += sleepTime;
+ if (sleepDeficitNs >= MIN_SLEEP_NS) {
+ long sleepMs = sleepDeficitNs / 1000000;
+ long sleepNs = sleepDeficitNs - sleepMs * 1000000;
+ Thread.sleep(sleepMs, (int) sleepNs);
+ sleepDeficitNs = 0;
+ }
}
}
- long ellapsed = System.currentTimeMillis() - start;
- double msgsSec = 1000.0 * numRecords / (double) ellapsed;
- double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
+
+ /* print final results */
producer.close();
- System.out.printf("%d records sent in %d ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec);
+ stats.printTotal();
+ }
+
+ private static class Stats {
+ private long start;
+ private long windowStart;
+ private int[] latencies;
+ private int sampling;
+ private int iteration;
+ private int index;
+ private long count;
+ private long bytes;
+ private int maxLatency;
+ private long totalLatency;
+ private long windowCount;
+ private int windowMaxLatency;
+ private long windowTotalLatency;
+ private long windowBytes;
+ private long reportingInterval;
+
+ public Stats(long numRecords, int reportingInterval) {
+ this.start = System.currentTimeMillis();
+ this.windowStart = System.currentTimeMillis();
+ this.index = 0;
+ this.iteration = 0;
+ this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
+ this.latencies = new int[(int) (numRecords / this.sampling) + 1];
+ this.index = 0;
+ this.maxLatency = 0;
+ this.totalLatency = 0;
+ this.windowCount = 0;
+ this.windowMaxLatency = 0;
+ this.windowTotalLatency = 0;
+ this.windowBytes = 0;
+ this.totalLatency = 0;
+ this.reportingInterval = reportingInterval;
+ }
+
+ public void record(int iter, int latency, int bytes, long time) {
+ this.count++;
+ this.bytes += bytes;
+ this.totalLatency += latency;
+ this.maxLatency = Math.max(this.maxLatency, latency);
+ this.windowCount++;
+ this.windowBytes += bytes;
+ this.windowTotalLatency += latency;
+ this.windowMaxLatency = Math.max(windowMaxLatency, latency);
+ if (iter % this.sampling == 0) {
+ this.latencies[index] = latency;
+ this.index++;
+ }
+ /* maybe report the recent perf */
+ if (time - windowStart >= reportingInterval) {
+ printWindow();
+ newWindow();
+ }
+ }
+
+ public Callback nextCompletion(long start, int bytes, Stats stats) {
+ Callback cb = new OnCompletion(this.iteration, start, bytes, stats);
+ this.iteration++;
+ return cb;
+ }
+
+ public void printWindow() {
+ long ellapsed = System.currentTimeMillis() - windowStart;
+ double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
+ double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);
+ System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n",
+ windowCount,
+ recsPerSec,
+ mbPerSec,
+ windowTotalLatency / (double) windowCount,
+ (double) windowMaxLatency);
+ }
+
+ public void newWindow() {
+ this.windowStart = System.currentTimeMillis();
+ this.windowCount = 0;
+ this.windowMaxLatency = 0;
+ this.windowTotalLatency = 0;
+ this.windowBytes = 0;
+ }
+
+ public void printTotal() {
+ long ellapsed = System.currentTimeMillis() - start;
+ double recsPerSec = 1000.0 * count / (double) ellapsed;
+ double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0);
+ int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
+ System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
+ count,
+ recsPerSec,
+ mbPerSec,
+ totalLatency / (double) count,
+ (double) maxLatency,
+ percs[0],
+ percs[1],
+ percs[2],
+ percs[3]);
+ }
+
+ private static int[] percentiles(int[] latencies, int count, double... percentiles) {
+ int size = Math.min(count, latencies.length);
+ Arrays.sort(latencies, 0, size);
+ int[] values = new int[percentiles.length];
+ for (int i = 0; i < percentiles.length; i++) {
+ int index = (int) (percentiles[i] * size);
+ values[i] = latencies[index];
+ }
+ return values;
+ }
+ }
+
+ private static final class OnCompletion implements Callback {
+ private final long start;
+ private final int iteration;
+ private final int bytes;
+ private final Stats stats;
+
+ public OnCompletion(int iter, long start, int bytes, Stats stats) {
+ this.start = start;
+ this.stats = stats;
+ this.iteration = iter;
+ this.bytes = bytes;
+ }
+
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ long now = System.currentTimeMillis();
+ int latency = (int) (now - start);
+ this.stats.record(iteration, latency, bytes, now);
+ }
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index a2b7722..3ef692c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -50,6 +50,7 @@ public class SenderTest {
private static final int REQUEST_TIMEOUT_MS = 10000;
private static final int SEND_BUFFER_SIZE = 64 * 1024;
private static final int RECEIVE_BUFFER_SIZE = 64 * 1024;
+ private static final int MAX_IN_FLIGHT_REQS = Integer.MAX_VALUE;
private TopicPartition tp = new TopicPartition("test", 0);
private MockTime time = new MockTime();
@@ -70,6 +71,7 @@ public class SenderTest {
REQUEST_TIMEOUT_MS,
SEND_BUFFER_SIZE,
RECEIVE_BUFFER_SIZE,
+ MAX_IN_FLIGHT_REQS,
metrics,
time);
@@ -115,6 +117,7 @@ public class SenderTest {
REQUEST_TIMEOUT_MS,
SEND_BUFFER_SIZE,
RECEIVE_BUFFER_SIZE,
+ MAX_IN_FLIGHT_REQS,
new Metrics(),
time);
Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c7508d5..ef75b67 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -215,10 +215,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L)
/* the purge interval (in number of requests) of the fetch request purgatory */
- val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000)
+ val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 1000)
/* the purge interval (in number of requests) of the producer request purgatory */
- val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
+ val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 1000)
/* Enables auto leader balancing. A background thread checks and triggers leader
* balance if required at regular intervals */
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index c064c5c..3d0ff1e 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -61,7 +61,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
* this function handles delayed requests that have hit their time limit without being satisfied.
*
*/
-abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 10000)
+abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 1000)
extends Logging with KafkaMetricsGroup {
/* a list of requests watching each key */
@@ -137,8 +137,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
*/
private class Watchers {
-
- private val requests = new util.ArrayList[T]
+ private val requests = new util.LinkedList[T]
def numRequests = requests.size
@@ -217,6 +216,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
}
}
if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge
+ debug("Beginning purgatory purge")
requestCounter.set(0)
val purged = purgeSatisfied()
debug("Purged %d requests from delay queue.".format(purged))
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
index 37a9ec2..5f8f6bc 100644
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -18,6 +18,7 @@
package kafka.tools
import java.util.Properties
+import java.util.Arrays
import kafka.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
@@ -35,9 +36,10 @@ object TestEndToEndLatency {
val consumerProps = new Properties()
consumerProps.put("group.id", topic)
- consumerProps.put("auto.commit.enable", "true")
+ consumerProps.put("auto.commit.enable", "false")
consumerProps.put("auto.offset.reset", "largest")
consumerProps.put("zookeeper.connect", zkConnect)
+ consumerProps.put("fetch.wait.max.ms", "1")
consumerProps.put("socket.timeout.ms", 1201000.toString)
val config = new ConsumerConfig(consumerProps)
@@ -53,19 +55,26 @@ object TestEndToEndLatency {
val message = "hello there beautiful".getBytes
var totalTime = 0.0
+ val latencies = new Array[Long](numMessages)
for (i <- 0 until numMessages) {
var begin = System.nanoTime
- val response = producer.send(new ProducerRecord(topic, message))
- response.get()
+ producer.send(new ProducerRecord(topic, message))
val received = iter.next
val elapsed = System.nanoTime - begin
// poor man's progress bar
if (i % 1000 == 0)
println(i + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed
+ latencies(i) = (elapsed / 1000 / 1000)
}
- println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms")
+ println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
+ Arrays.sort(latencies)
+ val p50 = latencies((latencies.length * 0.5).toInt)
+ val p99 = latencies((latencies.length * 0.99).toInt)
+ val p999 = latencies((latencies.length * 0.999).toInt)
+ println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
producer.close()
+ connector.commitOffsets(true)
connector.shutdown()
System.exit(0)
}