diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d15562a..f1def50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -131,7 +131,6 @@ public class KafkaProducer implements Producer {
config.getInt(ProducerConfig.TIMEOUT_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
- config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
this.metrics,
new SystemTime());
this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9de4af..bc4074e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -167,10 +167,6 @@ public class ProducerConfig extends AbstractConfig {
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
- /** max.in.flight.requests.per.connection */
- public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
- private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking.";
-
static {
config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -206,13 +202,7 @@ public class ProducerConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
METRICS_SAMPLE_WINDOW_MS_DOC)
- .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
- .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
- Type.INT,
- 5,
- atLeast(1),
- Importance.LOW,
- MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC);
+ .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC);
}
ProducerConfig(Map extends Object, ? extends Object> props) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 9b1f565..3e83ae0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -120,7 +120,6 @@ public class Sender implements Runnable {
int requestTimeout,
int socketSendBuffer,
int socketReceiveBuffer,
- int maxInFlightRequestsPerConnection,
Metrics metrics,
Time time) {
this.nodeStates = new NodeStates(reconnectBackoffMs);
@@ -135,7 +134,7 @@ public class Sender implements Runnable {
this.retries = retries;
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
- this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
+ this.inFlightRequests = new InFlightRequests();
this.correlation = 0;
this.metadataFetchInProgress = false;
this.time = time;
@@ -679,13 +678,7 @@ public class Sender implements Runnable {
* A set of outstanding request queues for each node that have not yet received responses
*/
private static final class InFlightRequests {
- private final int maxInFlightRequestsPerConnection;
- private final Map> requests;
-
- public InFlightRequests(int maxInFlightRequestsPerConnection) {
- this.requests = new HashMap>();
- this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
- }
+ private final Map> requests = new HashMap>();
/**
* Add the given request to the queue for the node it was directed to
@@ -721,8 +714,7 @@ public class Sender implements Runnable {
*/
public boolean canSendMore(int node) {
Deque queue = requests.get(node);
- return queue == null || queue.isEmpty() ||
- (queue.peekFirst().request.complete() && queue.size() < this.maxInFlightRequestsPerConnection);
+ return queue == null || queue.isEmpty() || queue.peekFirst().request.complete();
}
/**
@@ -770,11 +762,11 @@ public class Sender implements Runnable {
this.queueTimeSensor = metrics.sensor("queue-time");
this.queueTimeSensor.add("record-queue-time-avg",
- "The average time in ms record batches spent in the record accumulator.",
- new Avg());
+ "The average time in ms record batches spent in the record accumulator.",
+ new Avg());
this.queueTimeSensor.add("record-queue-time-max",
- "The maximum time in ms record batches spent in the record accumulator.",
- new Max());
+ "The maximum time in ms record batches spent in the record accumulator.",
+ new Max());
this.requestTimeSensor = metrics.sensor("request-time");
this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
@@ -867,8 +859,7 @@ public class Sender implements Runnable {
this.retrySensor.record(count, nowMs);
String topicRetryName = "topic." + topic + ".record-retries";
Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
- if (topicRetrySensor != null)
- topicRetrySensor.record(count, nowMs);
+ if (topicRetrySensor != null) topicRetrySensor.record(count, nowMs);
}
public void recordErrors(String topic, int count) {
@@ -876,8 +867,7 @@ public class Sender implements Runnable {
this.errorSensor.record(count, nowMs);
String topicErrorName = "topic." + topic + ".record-errors";
Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
- if (topicErrorSensor != null)
- topicErrorSensor.record(count, nowMs);
+ if (topicErrorSensor != null) topicErrorSensor.record(count, nowMs);
}
public void recordLatency(int node, long latency) {
@@ -886,8 +876,7 @@ public class Sender implements Runnable {
if (node >= 0) {
String nodeTimeName = "node-" + node + ".latency";
Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
- if (nodeRequestTime != null)
- nodeRequestTime.record(latency, nowMs);
+ if (nodeRequestTime != null) nodeRequestTime.record(latency, nowMs);
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index ac86150..eb18739 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -17,200 +17,68 @@ import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.record.Records;
public class ProducerPerformance {
- private static final long NS_PER_MS = 1000000L;
- private static final long NS_PER_SEC = 1000 * NS_PER_MS;
- private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
-
public static void main(String[] args) throws Exception {
- if (args.length < 4) {
- System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
- " topic_name num_records record_size target_records_sec [prop_name=prop_value]*");
+ if (args.length < 5) {
+ System.err.println("USAGE: java " + ProducerPerformance.class.getName()
+ + " url topic_name num_records record_size acks [compression_type]");
System.exit(1);
}
-
- /* parse args */
- String topicName = args[0];
- long numRecords = Long.parseLong(args[1]);
- int recordSize = Integer.parseInt(args[2]);
- int throughput = Integer.parseInt(args[3]);
-
+ String url = args[0];
+ String topicName = args[1];
+ int numRecords = Integer.parseInt(args[2]);
+ int recordSize = Integer.parseInt(args[3]);
+ int acks = Integer.parseInt(args[4]);
Properties props = new Properties();
- for (int i = 4; i < args.length; i++) {
- String[] pieces = args[i].split("=");
- if (pieces.length != 2)
- throw new IllegalArgumentException("Invalid property: " + args[i]);
- props.put(pieces[0], pieces[1]);
- }
- KafkaProducer producer = new KafkaProducer(props);
+ props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(acks));
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
+ props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
+ props.setProperty(ProducerConfig.TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
+ props.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024));
+ props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(64 * 1024));
+ if (args.length == 6)
+ props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]);
- /* setup perf test */
+ KafkaProducer producer = new KafkaProducer(props);
+ Callback callback = new Callback() {
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+ if (e != null)
+ e.printStackTrace();
+ }
+ };
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
ProducerRecord record = new ProducerRecord(topicName, payload);
- long sleepTime = NS_PER_SEC / throughput;
- long sleepDeficitNs = 0;
- Stats stats = new Stats(numRecords, 5000);
+ long start = System.currentTimeMillis();
+ long maxLatency = -1L;
+ long totalLatency = 0;
+ int reportingInterval = 1000000;
for (int i = 0; i < numRecords; i++) {
long sendStart = System.currentTimeMillis();
- Callback cb = stats.nextCompletion(sendStart, payload.length, stats);
- producer.send(record, cb);
-
- /*
- * Maybe sleep a little to control throughput. Sleep time can be a bit inaccurate for times < 1 ms so
- * instead of sleeping each time instead wait until a minimum sleep time accumulates (the "sleep deficit")
- * and then make up the whole deficit in one longer sleep.
- */
- if (throughput > 0) {
- sleepDeficitNs += sleepTime;
- if (sleepDeficitNs >= MIN_SLEEP_NS) {
- long sleepMs = sleepDeficitNs / 1000000;
- long sleepNs = sleepDeficitNs - sleepMs * 1000000;
- Thread.sleep(sleepMs, (int) sleepNs);
- sleepDeficitNs = 0;
- }
+ producer.send(record, callback);
+ long sendElapsed = System.currentTimeMillis() - sendStart;
+ maxLatency = Math.max(maxLatency, sendElapsed);
+ totalLatency += sendElapsed;
+ if (i % reportingInterval == 0) {
+ System.out.printf("%d max latency = %d ms, avg latency = %.5f\n",
+ i,
+ maxLatency,
+ (totalLatency / (double) reportingInterval));
+ totalLatency = 0L;
+ maxLatency = -1L;
}
}
-
- /* print final results */
+ long ellapsed = System.currentTimeMillis() - start;
+ double msgsSec = 1000.0 * numRecords / (double) ellapsed;
+ double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
producer.close();
- stats.printTotal();
- }
-
- private static class Stats {
- private long start;
- private long windowStart;
- private int[] latencies;
- private int sampling;
- private int iteration;
- private int index;
- private long count;
- private long bytes;
- private int maxLatency;
- private long totalLatency;
- private long windowCount;
- private int windowMaxLatency;
- private long windowTotalLatency;
- private long windowBytes;
- private long reportingInterval;
-
- public Stats(long numRecords, int reportingInterval) {
- this.start = System.currentTimeMillis();
- this.windowStart = System.currentTimeMillis();
- this.index = 0;
- this.iteration = 0;
- this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
- this.latencies = new int[(int) (numRecords / this.sampling) + 1];
- this.index = 0;
- this.maxLatency = 0;
- this.totalLatency = 0;
- this.windowCount = 0;
- this.windowMaxLatency = 0;
- this.windowTotalLatency = 0;
- this.windowBytes = 0;
- this.totalLatency = 0;
- this.reportingInterval = reportingInterval;
- }
-
- public void record(int iter, int latency, int bytes, long time) {
- this.count++;
- this.bytes += bytes;
- this.totalLatency += latency;
- this.maxLatency = Math.max(this.maxLatency, latency);
- this.windowCount++;
- this.windowBytes += bytes;
- this.windowTotalLatency += latency;
- this.windowMaxLatency = Math.max(windowMaxLatency, latency);
- if (iter % this.sampling == 0) {
- this.latencies[index] = latency;
- this.index++;
- }
- /* maybe report the recent perf */
- if (time - windowStart >= reportingInterval) {
- printWindow();
- newWindow();
- }
- }
-
- public Callback nextCompletion(long start, int bytes, Stats stats) {
- Callback cb = new PerfCallback(this.iteration, start, bytes, stats);
- this.iteration++;
- return cb;
- }
-
- public void printWindow() {
- long ellapsed = System.currentTimeMillis() - windowStart;
- double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
- double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);
- System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n",
- windowCount,
- recsPerSec,
- mbPerSec,
- windowTotalLatency / (double) windowCount,
- (double) windowMaxLatency);
- }
-
- public void newWindow() {
- this.windowStart = System.currentTimeMillis();
- this.windowCount = 0;
- this.windowMaxLatency = 0;
- this.windowTotalLatency = 0;
- this.windowBytes = 0;
- }
-
- public void printTotal() {
- long ellapsed = System.currentTimeMillis() - start;
- double recsPerSec = 1000.0 * count / (double) ellapsed;
- double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0);
- int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
- System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
- count,
- recsPerSec,
- mbPerSec,
- totalLatency / (double) count,
- (double) maxLatency,
- percs[0],
- percs[1],
- percs[2],
- percs[3]);
- }
-
- private static int[] percentiles(int[] latencies, int count, double... percentiles) {
- int size = Math.min(count, latencies.length);
- Arrays.sort(latencies, 0, size);
- int[] values = new int[percentiles.length];
- for (int i = 0; i < percentiles.length; i++) {
- int index = (int) (percentiles[i] * size);
- values[i] = latencies[index];
- }
- return values;
- }
- }
-
- private static final class PerfCallback implements Callback {
- private final long start;
- private final int iteration;
- private final int bytes;
- private final Stats stats;
-
- public PerfCallback(int iter, long start, int bytes, Stats stats) {
- this.start = start;
- this.stats = stats;
- this.iteration = iter;
- this.bytes = bytes;
- }
-
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- long now = System.currentTimeMillis();
- int latency = (int) (now - start);
- this.stats.record(iteration, latency, bytes, now);
- if (exception != null)
- exception.printStackTrace();
- }
+ System.out.printf("%d records sent in %d ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 3ef692c..a2b7722 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -50,7 +50,6 @@ public class SenderTest {
private static final int REQUEST_TIMEOUT_MS = 10000;
private static final int SEND_BUFFER_SIZE = 64 * 1024;
private static final int RECEIVE_BUFFER_SIZE = 64 * 1024;
- private static final int MAX_IN_FLIGHT_REQS = Integer.MAX_VALUE;
private TopicPartition tp = new TopicPartition("test", 0);
private MockTime time = new MockTime();
@@ -71,7 +70,6 @@ public class SenderTest {
REQUEST_TIMEOUT_MS,
SEND_BUFFER_SIZE,
RECEIVE_BUFFER_SIZE,
- MAX_IN_FLIGHT_REQS,
metrics,
time);
@@ -117,7 +115,6 @@ public class SenderTest {
REQUEST_TIMEOUT_MS,
SEND_BUFFER_SIZE,
RECEIVE_BUFFER_SIZE,
- MAX_IN_FLIGHT_REQS,
new Metrics(),
time);
Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 518d2df..a9c0465 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,7 +18,7 @@ package kafka.cluster
import kafka.common._
import kafka.admin.AdminUtils
-import kafka.utils.{ZkUtils, Pool, Time, Logging}
+import kafka.utils.{ZkUtils, ReplicationUtils, Pool, Time, Logging}
import kafka.utils.Utils.inLock
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
@@ -216,7 +216,7 @@ class Partition(val topic: String,
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
-
+
leaderReplicaIdOpt.foreach { leaderReplica =>
if (topic == OffsetManager.OffsetsTopicName &&
/* if we are making a leader->follower transition */
@@ -261,7 +261,15 @@ class Partition(val topic: String,
info("Expanding ISR for partition [%s,%d] from %s to %s"
.format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in ZK and cache
- updateIsr(newInSyncReplicas)
+ val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId,
+ leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas)
+ if(updateSucceeded) {
+ inSyncReplicas = newInSyncReplicas
+ zkVersion = newVersion
+ trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion))
+ } else {
+ info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
+ }
replicaManager.isrExpandRate.mark()
}
maybeIncrementLeaderHW(leaderReplica)
@@ -325,7 +333,15 @@ class Partition(val topic: String,
info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache
- updateIsr(newInSyncReplicas)
+ val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId,
+ leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas)
+ if(updateSucceeded) {
+ inSyncReplicas = newInSyncReplicas
+ zkVersion = newVersion
+ trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion))
+ } else {
+ info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
+ }
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
replicaManager.isrShrinkRate.mark()
@@ -373,22 +389,6 @@ class Partition(val topic: String,
}
}
- private def updateIsr(newIsr: Set[Replica]) {
- debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(",")))
- val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
- // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
- val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
- ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
- ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
- if (updateSucceeded){
- inSyncReplicas = newIsr
- zkVersion = newVersion
- trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
- } else {
- info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
- }
- }
-
override def equals(that: Any): Boolean = {
if(!(that.isInstanceOf[Partition]))
return false
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e776423..39524f1 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1333,6 +1333,16 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controlle
leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
leaderAndIsrInfo.toString()
}
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case n: LeaderIsrAndControllerEpoch =>
+ leaderAndIsr.leader == n.leaderAndIsr.leader && leaderAndIsr.isr.sorted == n.leaderAndIsr.isr.sorted &&
+ leaderAndIsr.leaderEpoch == n.leaderAndIsr.leaderEpoch && controllerEpoch == n.controllerEpoch
+ case _ => false
+ }
+ }
}
object ControllerStats extends KafkaMetricsGroup {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ef75b67..c7508d5 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -215,10 +215,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L)
/* the purge interval (in number of requests) of the fetch request purgatory */
- val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 1000)
+ val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000)
/* the purge interval (in number of requests) of the producer request purgatory */
- val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 1000)
+ val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
/* Enables auto leader balancing. A background thread checks and triggers leader
* balance if required at regular intervals */
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 3d0ff1e..c064c5c 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -61,7 +61,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
* this function handles delayed requests that have hit their time limit without being satisfied.
*
*/
-abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 1000)
+abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 10000)
extends Logging with KafkaMetricsGroup {
/* a list of requests watching each key */
@@ -137,7 +137,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
*/
private class Watchers {
- private val requests = new util.LinkedList[T]
+
+ private val requests = new util.ArrayList[T]
def numRequests = requests.size
@@ -216,7 +217,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
}
}
if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge
- debug("Beginning purgatory purge")
requestCounter.set(0)
val purged = purgeSatisfied()
debug("Purged %d requests from delay queue.".format(purged))
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
index 5f8f6bc..37a9ec2 100644
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -18,7 +18,6 @@
package kafka.tools
import java.util.Properties
-import java.util.Arrays
import kafka.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
@@ -36,10 +35,9 @@ object TestEndToEndLatency {
val consumerProps = new Properties()
consumerProps.put("group.id", topic)
- consumerProps.put("auto.commit.enable", "false")
+ consumerProps.put("auto.commit.enable", "true")
consumerProps.put("auto.offset.reset", "largest")
consumerProps.put("zookeeper.connect", zkConnect)
- consumerProps.put("fetch.wait.max.ms", "1")
consumerProps.put("socket.timeout.ms", 1201000.toString)
val config = new ConsumerConfig(consumerProps)
@@ -55,26 +53,19 @@ object TestEndToEndLatency {
val message = "hello there beautiful".getBytes
var totalTime = 0.0
- val latencies = new Array[Long](numMessages)
for (i <- 0 until numMessages) {
var begin = System.nanoTime
- producer.send(new ProducerRecord(topic, message))
+ val response = producer.send(new ProducerRecord(topic, message))
+ response.get()
val received = iter.next
val elapsed = System.nanoTime - begin
// poor man's progress bar
if (i % 1000 == 0)
println(i + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed
- latencies(i) = (elapsed / 1000 / 1000)
}
- println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
- Arrays.sort(latencies)
- val p50 = latencies((latencies.length * 0.5).toInt)
- val p99 = latencies((latencies.length * 0.99).toInt)
- val p999 = latencies((latencies.length * 0.999).toInt)
- println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
+ println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms")
producer.close()
- connector.commitOffsets(true)
connector.shutdown()
System.exit(0)
}
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
new file mode 100644
index 0000000..09c8b89
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+import kafka.cluster.Replica
+import kafka.api.LeaderAndIsr
+import kafka.controller.LeaderIsrAndControllerEpoch
+import org.apache.zookeeper.data.Stat
+import org.I0Itec.zkclient.ZkClient
+
+import scala.Some
+import scala.collection._
+
+object ReplicationUtils extends Logging {
+
+ def updateIsr(zkClient: ZkClient, topic: String, partitionId: Int, brokerId: Int, leaderEpoch: Int,
+ controllerEpoch: Int, zkVersion: Int, newIsr: Set[Replica]): (Boolean,Int) = {
+ debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(",")))
+ val newLeaderAndIsr = new LeaderAndIsr(brokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
+ // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
+ val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+ ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
+ ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
+ if (!updateSucceeded) {
+ return checkLeaderAndIsrZkData(zkClient, topic, partitionId,
+ ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
+ ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
+ }
+ (updateSucceeded, newVersion)
+ }
+
+ def checkLeaderAndIsrZkData(zkClient: ZkClient, topic: String, partitionId: Int, path: String,
+ newLeaderData: String, zkVersion: Int): (Boolean,Int) = {
+ try {
+ val newLeaderStat: Stat = new Stat()
+ newLeaderStat.setVersion(zkVersion)
+ val newLeader = ZkUtils.parseLeaderAndIsr(newLeaderData,topic,partitionId,newLeaderStat)
+ val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient,path)
+ val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
+ val writtenStat = writtenLeaderAndIsrInfo._2
+ writtenLeaderOpt match {
+ case Some(writtenData) =>
+ val writtenLeader = ZkUtils.parseLeaderAndIsr(writtenData,topic,partitionId,writtenStat)
+ (newLeader,writtenLeader) match {
+ case (Some(newLeader),Some(writtenLeader)) =>
+ if(newLeader.equals(writtenLeader))
+ return (true,writtenStat.getVersion())
+ case _ =>
+ }
+ case None =>
+ }
+ (false,-1)
+ } catch {
+ case e1: Exception =>
+ error("checking broker data at path %s with data %s failed due to %s".format(path, newLeaderData,
+ e1.getMessage))
+ (false,-1)
+ }
+ }
+}