diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d15562a..f1def50 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -131,7 +131,6 @@ public class KafkaProducer implements Producer { config.getInt(ProducerConfig.TIMEOUT_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), - config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), this.metrics, new SystemTime()); this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index f9de4af..bc4074e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -167,10 +167,6 @@ public class ProducerConfig extends AbstractConfig { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; - /** max.in.flight.requests.per.connection */ - public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; - private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."; - static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -206,13 +202,7 @@ public class ProducerConfig extends AbstractConfig { atLeast(0), Importance.LOW, METRICS_SAMPLE_WINDOW_MS_DOC) - .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) - .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, - Type.INT, - 5, - atLeast(1), - Importance.LOW, - MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC); + .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9b1f565..3e83ae0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -120,7 +120,6 @@ public class Sender implements Runnable { int requestTimeout, int socketSendBuffer, int socketReceiveBuffer, - int maxInFlightRequestsPerConnection, Metrics metrics, Time time) { this.nodeStates = new NodeStates(reconnectBackoffMs); @@ -135,7 +134,7 @@ public class Sender implements Runnable { this.retries = retries; this.socketSendBuffer = socketSendBuffer; this.socketReceiveBuffer = socketReceiveBuffer; - this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection); + this.inFlightRequests = new InFlightRequests(); this.correlation = 0; this.metadataFetchInProgress = false; this.time = time; @@ -679,13 +678,7 @@ public class Sender implements Runnable { * A set of outstanding request queues for each node that have not yet received responses */ private static final class InFlightRequests { - private final int maxInFlightRequestsPerConnection; - private final Map> requests; - - public InFlightRequests(int maxInFlightRequestsPerConnection) { - this.requests = new HashMap>(); - this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; - } + private final Map> requests = new HashMap>(); /** * Add the given request to the queue for the node it was directed to @@ -721,8 +714,7 @@ public class Sender implements Runnable { */ public boolean canSendMore(int node) { Deque queue = requests.get(node); - return queue == null || queue.isEmpty() || - (queue.peekFirst().request.complete() && queue.size() < this.maxInFlightRequestsPerConnection); + return queue == null || queue.isEmpty() || queue.peekFirst().request.complete(); } /** @@ -770,11 +762,11 @@ public class Sender implements Runnable { this.queueTimeSensor = metrics.sensor("queue-time"); this.queueTimeSensor.add("record-queue-time-avg", - "The average time in ms record batches spent in the record accumulator.", - new Avg()); + "The average time in ms record batches spent in the record accumulator.", + new Avg()); this.queueTimeSensor.add("record-queue-time-max", - "The maximum time in ms record batches spent in the record accumulator.", - new Max()); + "The maximum time in ms record batches spent in the record accumulator.", + new Max()); this.requestTimeSensor = metrics.sensor("request-time"); this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg()); @@ -867,8 +859,7 @@ public class Sender implements Runnable { this.retrySensor.record(count, nowMs); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); - if (topicRetrySensor != null) - topicRetrySensor.record(count, nowMs); + if (topicRetrySensor != null) topicRetrySensor.record(count, nowMs); } public void recordErrors(String topic, int count) { @@ -876,8 +867,7 @@ public class Sender implements Runnable { this.errorSensor.record(count, nowMs); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); - if (topicErrorSensor != null) - topicErrorSensor.record(count, nowMs); + if (topicErrorSensor != null) topicErrorSensor.record(count, nowMs); } public void recordLatency(int node, long latency) { @@ -886,8 +876,7 @@ public class Sender implements Runnable { if (node >= 0) { String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); - if (nodeRequestTime != null) - nodeRequestTime.record(latency, nowMs); + if (nodeRequestTime != null) nodeRequestTime.record(latency, nowMs); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index ac86150..eb18739 100644 --- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -17,200 +17,68 @@ import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.record.Records; public class ProducerPerformance { - private static final long NS_PER_MS = 1000000L; - private static final long NS_PER_SEC = 1000 * NS_PER_MS; - private static final long MIN_SLEEP_NS = 2 * NS_PER_MS; - public static void main(String[] args) throws Exception { - if (args.length < 4) { - System.err.println("USAGE: java " + ProducerPerformance.class.getName() + - " topic_name num_records record_size target_records_sec [prop_name=prop_value]*"); + if (args.length < 5) { + System.err.println("USAGE: java " + ProducerPerformance.class.getName() + + " url topic_name num_records record_size acks [compression_type]"); System.exit(1); } - - /* parse args */ - String topicName = args[0]; - long numRecords = Long.parseLong(args[1]); - int recordSize = Integer.parseInt(args[2]); - int throughput = Integer.parseInt(args[3]); - + String url = args[0]; + String topicName = args[1]; + int numRecords = Integer.parseInt(args[2]); + int recordSize = Integer.parseInt(args[3]); + int acks = Integer.parseInt(args[4]); Properties props = new Properties(); - for (int i = 4; i < args.length; i++) { - String[] pieces = args[i].split("="); - if (pieces.length != 2) - throw new IllegalArgumentException("Invalid property: " + args[i]); - props.put(pieces[0], pieces[1]); - } - KafkaProducer producer = new KafkaProducer(props); + props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(acks)); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url); + props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000)); + props.setProperty(ProducerConfig.TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE)); + props.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024)); + props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(64 * 1024)); + if (args.length == 6) + props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]); - /* setup perf test */ + KafkaProducer producer = new KafkaProducer(props); + Callback callback = new Callback() { + public void onCompletion(RecordMetadata metadata, Exception e) { + if (e != null) + e.printStackTrace(); + } + }; byte[] payload = new byte[recordSize]; Arrays.fill(payload, (byte) 1); ProducerRecord record = new ProducerRecord(topicName, payload); - long sleepTime = NS_PER_SEC / throughput; - long sleepDeficitNs = 0; - Stats stats = new Stats(numRecords, 5000); + long start = System.currentTimeMillis(); + long maxLatency = -1L; + long totalLatency = 0; + int reportingInterval = 1000000; for (int i = 0; i < numRecords; i++) { long sendStart = System.currentTimeMillis(); - Callback cb = stats.nextCompletion(sendStart, payload.length, stats); - producer.send(record, cb); - - /* - * Maybe sleep a little to control throughput. Sleep time can be a bit inaccurate for times < 1 ms so - * instead of sleeping each time instead wait until a minimum sleep time accumulates (the "sleep deficit") - * and then make up the whole deficit in one longer sleep. - */ - if (throughput > 0) { - sleepDeficitNs += sleepTime; - if (sleepDeficitNs >= MIN_SLEEP_NS) { - long sleepMs = sleepDeficitNs / 1000000; - long sleepNs = sleepDeficitNs - sleepMs * 1000000; - Thread.sleep(sleepMs, (int) sleepNs); - sleepDeficitNs = 0; - } + producer.send(record, callback); + long sendElapsed = System.currentTimeMillis() - sendStart; + maxLatency = Math.max(maxLatency, sendElapsed); + totalLatency += sendElapsed; + if (i % reportingInterval == 0) { + System.out.printf("%d max latency = %d ms, avg latency = %.5f\n", + i, + maxLatency, + (totalLatency / (double) reportingInterval)); + totalLatency = 0L; + maxLatency = -1L; } } - - /* print final results */ + long ellapsed = System.currentTimeMillis() - start; + double msgsSec = 1000.0 * numRecords / (double) ellapsed; + double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0); producer.close(); - stats.printTotal(); - } - - private static class Stats { - private long start; - private long windowStart; - private int[] latencies; - private int sampling; - private int iteration; - private int index; - private long count; - private long bytes; - private int maxLatency; - private long totalLatency; - private long windowCount; - private int windowMaxLatency; - private long windowTotalLatency; - private long windowBytes; - private long reportingInterval; - - public Stats(long numRecords, int reportingInterval) { - this.start = System.currentTimeMillis(); - this.windowStart = System.currentTimeMillis(); - this.index = 0; - this.iteration = 0; - this.sampling = (int) (numRecords / Math.min(numRecords, 500000)); - this.latencies = new int[(int) (numRecords / this.sampling) + 1]; - this.index = 0; - this.maxLatency = 0; - this.totalLatency = 0; - this.windowCount = 0; - this.windowMaxLatency = 0; - this.windowTotalLatency = 0; - this.windowBytes = 0; - this.totalLatency = 0; - this.reportingInterval = reportingInterval; - } - - public void record(int iter, int latency, int bytes, long time) { - this.count++; - this.bytes += bytes; - this.totalLatency += latency; - this.maxLatency = Math.max(this.maxLatency, latency); - this.windowCount++; - this.windowBytes += bytes; - this.windowTotalLatency += latency; - this.windowMaxLatency = Math.max(windowMaxLatency, latency); - if (iter % this.sampling == 0) { - this.latencies[index] = latency; - this.index++; - } - /* maybe report the recent perf */ - if (time - windowStart >= reportingInterval) { - printWindow(); - newWindow(); - } - } - - public Callback nextCompletion(long start, int bytes, Stats stats) { - Callback cb = new PerfCallback(this.iteration, start, bytes, stats); - this.iteration++; - return cb; - } - - public void printWindow() { - long ellapsed = System.currentTimeMillis() - windowStart; - double recsPerSec = 1000.0 * windowCount / (double) ellapsed; - double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0); - System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n", - windowCount, - recsPerSec, - mbPerSec, - windowTotalLatency / (double) windowCount, - (double) windowMaxLatency); - } - - public void newWindow() { - this.windowStart = System.currentTimeMillis(); - this.windowCount = 0; - this.windowMaxLatency = 0; - this.windowTotalLatency = 0; - this.windowBytes = 0; - } - - public void printTotal() { - long ellapsed = System.currentTimeMillis() - start; - double recsPerSec = 1000.0 * count / (double) ellapsed; - double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0); - int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); - System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n", - count, - recsPerSec, - mbPerSec, - totalLatency / (double) count, - (double) maxLatency, - percs[0], - percs[1], - percs[2], - percs[3]); - } - - private static int[] percentiles(int[] latencies, int count, double... percentiles) { - int size = Math.min(count, latencies.length); - Arrays.sort(latencies, 0, size); - int[] values = new int[percentiles.length]; - for (int i = 0; i < percentiles.length; i++) { - int index = (int) (percentiles[i] * size); - values[i] = latencies[index]; - } - return values; - } - } - - private static final class PerfCallback implements Callback { - private final long start; - private final int iteration; - private final int bytes; - private final Stats stats; - - public PerfCallback(int iter, long start, int bytes, Stats stats) { - this.start = start; - this.stats = stats; - this.iteration = iter; - this.bytes = bytes; - } - - public void onCompletion(RecordMetadata metadata, Exception exception) { - long now = System.currentTimeMillis(); - int latency = (int) (now - start); - this.stats.record(iteration, latency, bytes, now); - if (exception != null) - exception.printStackTrace(); - } + System.out.printf("%d records sent in %d ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 3ef692c..a2b7722 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -50,7 +50,6 @@ public class SenderTest { private static final int REQUEST_TIMEOUT_MS = 10000; private static final int SEND_BUFFER_SIZE = 64 * 1024; private static final int RECEIVE_BUFFER_SIZE = 64 * 1024; - private static final int MAX_IN_FLIGHT_REQS = Integer.MAX_VALUE; private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); @@ -71,7 +70,6 @@ public class SenderTest { REQUEST_TIMEOUT_MS, SEND_BUFFER_SIZE, RECEIVE_BUFFER_SIZE, - MAX_IN_FLIGHT_REQS, metrics, time); @@ -117,7 +115,6 @@ public class SenderTest { REQUEST_TIMEOUT_MS, SEND_BUFFER_SIZE, RECEIVE_BUFFER_SIZE, - MAX_IN_FLIGHT_REQS, new Metrics(), time); Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 518d2df..a9c0465 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils.{ZkUtils, Pool, Time, Logging} +import kafka.utils.{ZkUtils, ReplicationUtils, Pool, Time, Logging} import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig @@ -216,7 +216,7 @@ class Partition(val topic: String, inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion - + leaderReplicaIdOpt.foreach { leaderReplica => if (topic == OffsetManager.OffsetsTopicName && /* if we are making a leader->follower transition */ @@ -261,7 +261,15 @@ class Partition(val topic: String, info("Expanding ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache - updateIsr(newInSyncReplicas) + val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, + leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) + if(updateSucceeded) { + inSyncReplicas = newInSyncReplicas + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + } replicaManager.isrExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica) @@ -325,7 +333,15 @@ class Partition(val topic: String, info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache - updateIsr(newInSyncReplicas) + val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, + leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) + if(updateSucceeded) { + inSyncReplicas = newInSyncReplicas + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + } // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) replicaManager.isrShrinkRate.mark() @@ -373,22 +389,6 @@ class Partition(val topic: String, } } - private def updateIsr(newIsr: Set[Replica]) { - debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) - val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) - // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), - ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) - if (updateSucceeded){ - inSyncReplicas = newIsr - zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) - } else { - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) - } - } - override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Partition])) return false diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e776423..39524f1 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1333,6 +1333,16 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controlle leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")") leaderAndIsrInfo.toString() } + + override def equals(obj: Any): Boolean = { + obj match { + case null => false + case n: LeaderIsrAndControllerEpoch => + leaderAndIsr.leader == n.leaderAndIsr.leader && leaderAndIsr.isr.sorted == n.leaderAndIsr.isr.sorted && + leaderAndIsr.leaderEpoch == n.leaderAndIsr.leaderEpoch && controllerEpoch == n.controllerEpoch + case _ => false + } + } } object ControllerStats extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ef75b67..c7508d5 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -215,10 +215,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L) /* the purge interval (in number of requests) of the fetch request purgatory */ - val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 1000) + val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000) /* the purge interval (in number of requests) of the producer request purgatory */ - val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 1000) + val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000) /* Enables auto leader balancing. A background thread checks and triggers leader * balance if required at regular intervals */ diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 3d0ff1e..c064c5c 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -61,7 +61,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * this function handles delayed requests that have hit their time limit without being satisfied. * */ -abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 1000) +abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 10000) extends Logging with KafkaMetricsGroup { /* a list of requests watching each key */ @@ -137,7 +137,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge */ private class Watchers { - private val requests = new util.LinkedList[T] + + private val requests = new util.ArrayList[T] def numRequests = requests.size @@ -216,7 +217,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } } if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge - debug("Beginning purgatory purge") requestCounter.set(0) val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala index 5f8f6bc..37a9ec2 100644 --- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala @@ -18,7 +18,6 @@ package kafka.tools import java.util.Properties -import java.util.Arrays import kafka.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} @@ -36,10 +35,9 @@ object TestEndToEndLatency { val consumerProps = new Properties() consumerProps.put("group.id", topic) - consumerProps.put("auto.commit.enable", "false") + consumerProps.put("auto.commit.enable", "true") consumerProps.put("auto.offset.reset", "largest") consumerProps.put("zookeeper.connect", zkConnect) - consumerProps.put("fetch.wait.max.ms", "1") consumerProps.put("socket.timeout.ms", 1201000.toString) val config = new ConsumerConfig(consumerProps) @@ -55,26 +53,19 @@ object TestEndToEndLatency { val message = "hello there beautiful".getBytes var totalTime = 0.0 - val latencies = new Array[Long](numMessages) for (i <- 0 until numMessages) { var begin = System.nanoTime - producer.send(new ProducerRecord(topic, message)) + val response = producer.send(new ProducerRecord(topic, message)) + response.get() val received = iter.next val elapsed = System.nanoTime - begin // poor man's progress bar if (i % 1000 == 0) println(i + "\t" + elapsed / 1000.0 / 1000.0) totalTime += elapsed - latencies(i) = (elapsed / 1000 / 1000) } - println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0)) - Arrays.sort(latencies) - val p50 = latencies((latencies.length * 0.5).toInt) - val p99 = latencies((latencies.length * 0.99).toInt) - val p999 = latencies((latencies.length * 0.999).toInt) - println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999)) + println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms") producer.close() - connector.commitOffsets(true) connector.shutdown() System.exit(0) } diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala new file mode 100644 index 0000000..09c8b89 --- /dev/null +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils +import kafka.cluster.Replica +import kafka.api.LeaderAndIsr +import kafka.controller.LeaderIsrAndControllerEpoch +import org.apache.zookeeper.data.Stat +import org.I0Itec.zkclient.ZkClient + +import scala.Some +import scala.collection._ + +object ReplicationUtils extends Logging { + + def updateIsr(zkClient: ZkClient, topic: String, partitionId: Int, brokerId: Int, leaderEpoch: Int, + controllerEpoch: Int, zkVersion: Int, newIsr: Set[Replica]): (Boolean,Int) = { + debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) + val newLeaderAndIsr = new LeaderAndIsr(brokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) + // use the epoch of the controller that made the leadership decision, instead of the current controller epoch + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) + if (!updateSucceeded) { + return checkLeaderAndIsrZkData(zkClient, topic, partitionId, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) + } + (updateSucceeded, newVersion) + } + + def checkLeaderAndIsrZkData(zkClient: ZkClient, topic: String, partitionId: Int, path: String, + newLeaderData: String, zkVersion: Int): (Boolean,Int) = { + try { + val newLeaderStat: Stat = new Stat() + newLeaderStat.setVersion(zkVersion) + val newLeader = ZkUtils.parseLeaderAndIsr(newLeaderData,topic,partitionId,newLeaderStat) + val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient,path) + val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 + val writtenStat = writtenLeaderAndIsrInfo._2 + writtenLeaderOpt match { + case Some(writtenData) => + val writtenLeader = ZkUtils.parseLeaderAndIsr(writtenData,topic,partitionId,writtenStat) + (newLeader,writtenLeader) match { + case (Some(newLeader),Some(writtenLeader)) => + if(newLeader.equals(writtenLeader)) + return (true,writtenStat.getVersion()) + case _ => + } + case None => + } + (false,-1) + } catch { + case e1: Exception => + error("checking broker data at path %s with data %s failed due to %s".format(path, newLeaderData, + e1.getMessage)) + (false,-1) + } + } +}