From e44ab4705a23132e67b3bbe3c4a2a47a2184a5fb Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 9 Apr 2015 17:18:37 -0700 Subject: [PATCH 1/2] Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases. 6. This doesn't include a system test. There is a separate ticket for that 7. Fixed KAFKA-2191 - (Included fix from : https://reviews.apache.org/r/34418/ ) Signed-off-by: Aditya Auradkar --- .../org/apache/kafka/common/metrics/Quota.java | 24 +++ .../common/metrics/QuotaViolationException.java | 8 + .../org/apache/kafka/common/metrics/Sensor.java | 23 ++- .../apache/kafka/common/metrics/stats/Rate.java | 22 +- .../org/apache/kafka/common/utils/MockTime.java | 43 ++++ .../apache/kafka/common/metrics/MetricsTest.java | 28 ++- .../org/apache/kafka/common/utils/MockTime.java | 43 ---- .../scala/kafka/server/ClientQuotaMetrics.scala | 222 +++++++++++++++++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 104 ++++++++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 85 +++++++- core/src/main/scala/kafka/server/KafkaServer.scala | 20 +- .../main/scala/kafka/server/ReplicaManager.scala | 4 +- .../main/scala/kafka/server/ThrottledRequest.scala | 46 +++++ .../scala/kafka/utils/ShutdownableThread.scala | 3 + .../scala/integration/kafka/api/QuotasTest.scala | 138 +++++++++++++ .../unit/kafka/server/ClientQuotaMetricsTest.scala | 150 ++++++++++++++ .../kafka/server/KafkaConfigConfigDefTest.scala | 19 +- .../server/ThrottledRequestExpirationTest.scala | 90 +++++++++ 18 files changed, 981 insertions(+), 91 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/utils/MockTime.java delete mode 100644 clients/src/test/java/org/apache/kafka/common/utils/MockTime.java create mode 100644 core/src/main/scala/kafka/server/ClientQuotaMetrics.scala create mode 100644 core/src/main/scala/kafka/server/ThrottledRequest.scala create mode 100644 core/src/test/scala/integration/kafka/api/QuotasTest.scala create mode 100644 core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala create mode 100644 core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java index d82bb0c..e87edb5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java @@ -49,4 +49,28 @@ public final class Quota { return (upper && value <= bound) || (!upper && value >= bound); } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) this.bound; + result = prime * result + (this.upper ? 1 : 0); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Quota that = (Quota) obj; + if (that.bound != this.bound) + return false; + if (that.upper != this.upper) + return false; + return true; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java index a451e53..dcc415c 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java @@ -24,9 +24,17 @@ import org.apache.kafka.common.KafkaException; public class QuotaViolationException extends KafkaException { private static final long serialVersionUID = 1L; + private int delayTimeMs; public QuotaViolationException(String m) { + this(m, 0); + } + + public QuotaViolationException(String m, int delayTimeMs) { super(m); + this.delayTimeMs = delayTimeMs; } + public void setDelayTimeMs(int delayTimeMs) { this.delayTimeMs = delayTimeMs; } + public int getDelayTimeMs() { return delayTimeMs; } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index ca823fd..fb70391 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -37,6 +37,7 @@ public final class Sensor { private final List metrics; private final MetricConfig config; private final Time time; + private final long creationTime; Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) { super(); @@ -47,6 +48,7 @@ public final class Sensor { this.stats = new ArrayList(); this.config = config; this.time = time; + this.creationTime = time.milliseconds(); checkForest(new HashSet()); } @@ -112,13 +114,30 @@ public final class Sensor { if (config != null) { Quota quota = config.quota(); if (quota != null) { - if (!quota.acceptable(metric.value(timeMs))) - throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound()); + double value = metric.value(timeMs); + if (!quota.acceptable(value)) { + throw new QuotaViolationException(String.format( + "(%s) is violating its quota of (%f) with value (%f)", + metric.metricName(), + quota.bound(), + value), + delayTime(metric, timeMs, quota, value, config)); + } } } } } + /* + * This calculates the amount of time needed to bring the metric within quota + * assuming that no new metrics are recorded + */ + private int delayTime(KafkaMetric metric, long timeMs, Quota quota, double metricValue, MetricConfig config) { + double difference = metricValue - quota.bound(); + double time = difference / quota.bound() * config.samples() * config.timeWindowMs(); + return (int) time; + } + /** * Register a compound statistic with this sensor with no config override */ diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 98429da..3ae9f7c 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -58,26 +58,28 @@ public class Rate implements MeasurableStat { @Override public double measure(MetricConfig config, long now) { double value = stat.measure(config, now); - double elapsed = convert(now - stat.oldest(now).lastWindowMs); - return value / elapsed; + // the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete + long elapsedCurrentWindowMs = now - stat.current(now).lastWindowMs; + long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1); + return value / convert(elapsedCurrentWindowMs + elapsedPriorWindowsMs); } - private double convert(long time) { + private double convert(long timeMs) { switch (unit) { case NANOSECONDS: - return time * 1000.0 * 1000.0; + return timeMs * 1000.0 * 1000.0; case MICROSECONDS: - return time * 1000.0; + return timeMs * 1000.0; case MILLISECONDS: - return time; + return timeMs; case SECONDS: - return time / 1000.0; + return timeMs / 1000.0; case MINUTES: - return time / (60.0 * 1000.0); + return timeMs / (60.0 * 1000.0); case HOURS: - return time / (60.0 * 60.0 * 1000.0); + return timeMs / (60.0 * 60.0 * 1000.0); case DAYS: - return time / (24.0 * 60.0 * 60.0 * 1000.0); + return timeMs / (24.0 * 60.0 * 60.0 * 1000.0); default: throw new IllegalStateException("Unknown unit: " + unit); } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/main/java/org/apache/kafka/common/utils/MockTime.java new file mode 100644 index 0000000..eb7fcf0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/MockTime.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.util.concurrent.TimeUnit; + +/** + * A clock that you can manually advance by calling sleep + */ +public class MockTime implements Time { + + private long nanos = 0; + + public MockTime() { + this.nanos = System.nanoTime(); + } + + @Override + public long milliseconds() { + return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS); + } + + @Override + public long nanoseconds() { + return nanos; + } + + @Override + public void sleep(long ms) { + this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 544e120..8bddbb6 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -37,9 +37,9 @@ import org.junit.Test; public class MetricsTest { private static final double EPS = 0.000001; - - MockTime time = new MockTime(); - Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time); + private MockTime time = new MockTime(); + private MetricConfig config = new MetricConfig(); + private Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time); @Test public void testMetricName() { @@ -77,19 +77,29 @@ public class MetricsTest { s2.add(new MetricName("s2.total", "grp1"), new Total()); s2.record(5.0); - for (int i = 0; i < 10; i++) + int sum = 0; + int count = 10; + for (int i = 0; i < count; i++) { s.record(i); + sum += i; + } + // prior to any time passing + double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0; + assertEquals("Occurences(0...9) = 5", count / elapsedSecs, + metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); // pretend 2 seconds passed... - time.sleep(2000); + long sleepTime = 2; + time.sleep(sleepTime * 1000); + elapsedSecs += sleepTime; assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(new MetricName("s2.total", "grp1")).value(), EPS); assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(new MetricName("test.avg", "grp1")).value(), EPS); - assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS); + assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS); assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(new MetricName("test.min", "grp1")).value(), EPS); - assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS); - assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); - assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS); + assertEquals("Rate(0...9) = 1.40625", sum / elapsedSecs, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS); + assertEquals("Occurences(0...9) = 5", count / elapsedSecs, metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); + assertEquals("Count(0...9) = 10", (double) count, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java deleted file mode 100644 index eb7fcf0..0000000 --- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.utils; - -import java.util.concurrent.TimeUnit; - -/** - * A clock that you can manually advance by calling sleep - */ -public class MockTime implements Time { - - private long nanos = 0; - - public MockTime() { - this.nanos = System.nanoTime(); - } - - @Override - public long milliseconds() { - return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS); - } - - @Override - public long nanoseconds() { - return nanos; - } - - @Override - public void sleep(long ms) { - this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS); - } - -} diff --git a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala new file mode 100644 index 0000000..9067066 --- /dev/null +++ b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.concurrent.{DelayQueue, TimeUnit} + +import kafka.utils.{ShutdownableThread, Logging} +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics._ +import org.apache.kafka.common.metrics.stats.Rate +import java.util.concurrent.locks.ReentrantReadWriteLock + +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable + +/** + * Configuration settings for quota management + * @param defaultQuotaBytesPerSecond The default bytes per second quota allocated to any client + * @param quotaBytesPerSecondOverrides The comma separated overrides per client. "c1=X,c2=Y" + * @param quotaDelayFactor The quota delay factor modifies any delay by a fixed multiplier (default 1.2) + * @param numQuotaSamples The number of sample to retain in memory + * @param quotaWindowSizeSeconds The time span of each sample + * + */ +case class ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond : Long = + ClientQuotaMetricsConfig.DefaultQuotaBytesPerSecond, + quotaBytesPerSecondOverrides : String = + ClientQuotaMetricsConfig.QuotaBytesPerSecondOverrides, + quotaDelayFactor: Double = + ClientQuotaMetricsConfig.DefaultQuotaDelayFactor, + numQuotaSamples : Int = + ClientQuotaMetricsConfig.DefaultNumQuotaSamples, + quotaWindowSizeSeconds : Int = + ClientQuotaMetricsConfig.DefaultQuotaWindowSizeSeconds) + +object ClientQuotaMetricsConfig { + val DefaultQuotaBytesPerSecond = Long.MaxValue + val QuotaBytesPerSecondOverrides = "" + val DefaultQuotaDelayFactor = 1 + // Always have 10 whole windows + 1 current window + val DefaultNumQuotaSamples = 11 + val DefaultQuotaWindowSizeSeconds = 1 + val MaxThrottleTimeSeconds = 30 +} + +/** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * @param config @KafkaConfig Configs for the Kafka Server + * @param metrics @Metrics Metrics instance + */ +class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, + private val metrics : Metrics, + private val apiKey : String, + private val time : Time) extends Logging { + private val overriddenQuota = initQuotaMap(config.quotaBytesPerSecondOverrides) + private val defaultQuota = Quota.lessThan(config.defaultQuotaBytesPerSecond) + private val lock = new ReentrantReadWriteLock() + private val delayQueue = new DelayQueue[ThrottledRequest]() + val throttledRequestReaper = new ThrottledRequestReaper(delayQueue) + throttledRequestReaper.start() + + /** + * Reaper thread that triggers callbacks on all throttled requests + * @param delayQueue DelayQueue to dequeue from + */ + class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledRequest]) extends ShutdownableThread( + "ThrottledRequestReaper-%s".format(apiKey), false) { + + override def doWork(): Unit = { + val response : ThrottledRequest = delayQueue.poll(1, TimeUnit.SECONDS) + if(response != null) { + trace("Response throttled for: " + response.delayTimeMs + " ms") + response.execute() + } + } + } + + /** + * Records that a produce request wrote some data + * @param clientId clientId that produced the data + * @param value amount of data written in bytes + * @param callback Callback function. This will be triggered immediately if quota is not violated. + * If there is a quota violation, this callback will be triggered after a delay + * @return Number of milliseconds to delay the response in case of Quota violation. + * Zero otherwise + */ + def record(clientId: String, value : Int, callback: => Unit) : Int = { + val sensors = getOrCreateQuotaSensors(clientId) + var delayTime = 0.0 + try { + sensors._1.record(value) + // trigger the callback immediately if quota is not violated + callback + } catch { + case qve : QuotaViolationException => + delayTime = qve.getDelayTimeMs*config.quotaDelayFactor + delayQueue.add(new ThrottledRequest(time, delayTime.toLong, callback)) + sensors._2.record(delayTime) + // If delayed, add the element to the delayQueue + logger.info("Quota violated for sensor (%s). Delay time: (%f)".format(sensors._1, delayTime), qve) + } + delayTime.toInt + } + + /** + * Returns the consumer quota for the specified clientId + * @return + */ + private[server] def quota(clientId : String) : Quota = { + if(overriddenQuota.contains(clientId)) + overriddenQuota(clientId) + else + defaultQuota + } + + /* + * This function either returns the sensors for a given client id or creates them if they don't exist + * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor + */ + private def getOrCreateQuotaSensors(clientId : String) : (Sensor, Sensor) = { + + // Names of the sensors to access + val quotaSensorName = apiKey + "-" + clientId + val throttleTimeSensorName = apiKey + "ThrottleTime-" + clientId + var quotaSensor : Sensor = null + var throttleTimeSensor : Sensor = null + + /* Acquire the read lock to fetch the sensors. It is safe to call getSensor from multiple threads. + * The read lock allows a thread to create a sensor in isolation. The thread creating the sensor + * will acquire the write lock and prevent the sensors from being read while they are being created. + * It should be sufficient to simply check if the sensor is null without acquiring a read lock but the + * sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added. + * This read lock waits until the writer thread has released it's lock i.e. fully initialized the sensor + * at which point it is safe to read + */ + lock.readLock().lock() + try { + quotaSensor = metrics.getSensor(quotaSensorName) + throttleTimeSensor = metrics.getSensor(throttleTimeSensorName) + } + finally { + lock.readLock().unlock() + } + + /* If the sensor is null, try to create it else return the created sensor + * Also if quota sensor is null, the throttle time sensor must be null + */ + if(quotaSensor == null) { + /* Acquire a write lock because the sensor may not have been created and we only want one thread to create it. + * Note that multiple threads may acquire the write lock if they all see a null sensor initially + * In this case, the writer checks the sensor after acquiring the lock again. + * This is safe from Double Checked Locking because the references are read + * after acquiring read locks and hence they cannot see a partially published reference + */ + lock.writeLock().lock() + try { + quotaSensor = metrics.getSensor(quotaSensorName) + if(quotaSensor == null) { + // create the throttle time sensor also + throttleTimeSensor = metrics.sensor(throttleTimeSensorName) + throttleTimeSensor.add(new MetricName("throttle-time", + apiKey, + "Tracking throttle-time per client", + "client-id", + clientId), new Rate()) + quotaSensor = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota(clientId))) + quotaSensor.add(new MetricName("byte-rate", + apiKey, + "Tracking byte-rate per client", + "client-id", + clientId), new Rate()) + } + } finally { + lock.writeLock().unlock() + } + } + // return the read or created sensors + (quotaSensor, throttleTimeSensor) + } + + private def getQuotaMetricConfig(quota : Quota) : MetricConfig = { + new MetricConfig() + .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS) + .samples(config.numQuotaSamples) + .quota(quota) + } + + private def initQuotaMap(input : String) : mutable.Map[String, Quota] = { + val output = mutable.Map[String, Quota]() + for(entry <- input.split(",")) { + val trimmedEntry = entry.trim + if(!trimmedEntry.equals("")) { + val pair: Array[String] = trimmedEntry.split("=") + if (pair.length != 2) + throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format( + entry)) + output(pair(0)) = new Quota(pair(1).toDouble, true) + } + } + output + } + + def shutdown() = { + throttledRequestReaper.shutdown() + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 387e387..53fbee9 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,6 +17,7 @@ package kafka.server +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader} import org.apache.kafka.common.TopicPartition @@ -45,9 +46,12 @@ class KafkaApis(val requestChannel: RequestChannel, val zkClient: ZkClient, val brokerId: Int, val config: KafkaConfig, - val metadataCache: MetadataCache) extends Logging { + val metadataCache: MetadataCache, + val metrics: Metrics) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) + // Store all the quota managers for each type of request + private val quotaManagers = instantiateQuotaManagers(config) /** * Top-level method that handles all requests and multiplexes to the right api @@ -232,6 +236,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] + val numBytesAppended = produceRequest.sizeInBytes // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { @@ -247,20 +252,38 @@ class KafkaApis(val requestChannel: RequestChannel, } } - if (produceRequest.requiredAcks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any error in handling - // the request, since no response is expected by the producer, the server will close socket server so that - // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) { - info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" - .format(produceRequest.correlationId, produceRequest.clientId)) - requestChannel.closeConnection(request.processor, request) - } else { - requestChannel.noOperation(request.processor, request) + def produceResponseCallback { + if (produceRequest.requiredAcks == 0) + { + // no operation needed if producer request.required.acks = 0; however, if there is any error in handling + // the request, since no response is expected by the producer, the server will close socket server so that + // the producer client will know that some error has happened and will refresh its metadata + if (errorInResponse) + { + info( + "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format( + produceRequest.correlationId, + produceRequest.clientId)) + requestChannel.closeConnection(request.processor, request) + } + else + { + requestChannel.noOperation(request.processor, request) + } } - } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + else + { + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(response))) + } + } + + val requestKey = RequestKeys.nameForKey(RequestKeys.ProduceKey) + quotaManagers.get(requestKey) match { + case Some(quotaManager) => + quotaManager.record(produceRequest.clientId, numBytesAppended, produceResponseCallback) + case None => + warn("Cannot throttle Api key %s".format(requestKey)) } } @@ -298,14 +321,28 @@ class KafkaApis(val requestChannel: RequestChannel, .format(fetchRequest.correlationId, fetchRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(data.error))) } - // record the bytes out metrics only when the response is being sent BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) } val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) + def fetchResponseCallback { + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) + } + + // Do not throttle replication traffic + if (Request.isValidBrokerId(fetchRequest.replicaId)) { + fetchResponseCallback + } else { + val requestKey = RequestKeys.nameForKey(RequestKeys.FetchKey) + quotaManagers.get(requestKey) match { + case Some(quotaManager) => + quotaManager.record(fetchRequest.clientId, response.sizeInBytes, fetchResponseCallback) + case None => + warn("Cannot throttle Api key %s".format(requestKey)) + } + } } // call the replica manager to fetch messages from the local replica @@ -588,9 +625,38 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback) } + /* + * Returns a Map of all quota managers configured. The request Api key is the key for the Map + */ + private def instantiateQuotaManagers(cfg : KafkaConfig): Map[String, ClientQuotaMetrics] = { + val producerQuotaManagerCfg = ClientQuotaMetricsConfig( + defaultQuotaBytesPerSecond = cfg.producerQuotaDefaultBytesPerSecond, + quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides, + quotaDelayFactor = cfg.quotaDelayFactor, + numQuotaSamples = cfg.numQuotaSamples, + quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds + ) + + val consumerQuotaManagerCfg = ClientQuotaMetricsConfig( + defaultQuotaBytesPerSecond = cfg.consumerQuotaDefaultBytesPerSecond, + quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides, + quotaDelayFactor = cfg.quotaDelayFactor, + numQuotaSamples = cfg.numQuotaSamples, + quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds + ) + + // Using SystemTime from kafka.common because Metrics use the system time from clients + val quotaManagers = Map[String, ClientQuotaMetrics]( + RequestKeys.nameForKey(RequestKeys.ProduceKey) -> + new ClientQuotaMetrics(producerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.ProduceKey), new org.apache.kafka.common.utils.SystemTime), + RequestKeys.nameForKey(RequestKeys.FetchKey) -> + new ClientQuotaMetrics(consumerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.FetchKey), new org.apache.kafka.common.utils.SystemTime) + ) + quotaManagers + } + def close() { - // TODO currently closing the API is an no-op since the API no longer maintain any modules - // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer - debug("Shut down complete.") + quotaManagers.foreach(entry => entry._2.shutdown()) + info("Shut down complete.") } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9efa15c..fabbf46 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -123,6 +123,19 @@ object Defaults { val OffsetCommitTimeoutMs = OffsetManagerConfig.DefaultOffsetCommitTimeoutMs val OffsetCommitRequiredAcks = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks + /** ********* Quota Configuration ***********/ + val ProducerQuotaDefaultBytesPerSecond = ClientQuotaMetricsConfig.DefaultQuotaBytesPerSecond + val ConsumerQuotaDefaultBytesPerSecond = ClientQuotaMetricsConfig.DefaultQuotaBytesPerSecond + val ProducerQuotaBytesPerSecondOverrides = ClientQuotaMetricsConfig.QuotaBytesPerSecondOverrides + val ConsumerQuotaBytesPerSecondOverrides = ClientQuotaMetricsConfig.QuotaBytesPerSecondOverrides + val QuotaDelayFactor : Double = ClientQuotaMetricsConfig.DefaultQuotaDelayFactor + val NumQuotaSamples : Int = ClientQuotaMetricsConfig.DefaultNumQuotaSamples + val QuotaWindowSizeSeconds : Int = ClientQuotaMetricsConfig.DefaultQuotaWindowSizeSeconds + + /** ********* Kafka Metrics Configuration ***********/ + val MetricsNumSamples : Int = 2 + val MetricsWindowSizeSeconds : Int = 30 + val DeleteTopicEnable = false val CompressionType = "producer" @@ -229,11 +242,21 @@ object KafkaConfig { val OffsetsRetentionCheckIntervalMsProp = "offsets.retention.check.interval.ms" val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms" val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks" + /** ********* Quota Configuration ***********/ + val ProducerQuotaDefaultBytesPerSecondProp = "quota.producer.default" + val ConsumerQuotaDefaultBytesPerSecondProp = "quota.consumer.default" + val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides" + val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides" + val QuotaDelayFactorProp = "quota.delay.factor" + val NumQuotaSamplesProp = "quota.window.num" + val QuotaWindowSizeSecondsProp = "quota.window.size.seconds" + /** ********* Kafka Metrics Configuration ***********/ + val MetricsNumSamplesProp = "metrics.num.samples" + val MetricsWindowSizeSecondsProp = "metrics.sample.window.seconds" val DeleteTopicEnableProp = "delete.topic.enable" val CompressionTypeProp = "compression.type" - /* Documentation */ /** ********* Zookeeper Configuration ***********/ val ZkConnectDoc = "Zookeeper host string" @@ -359,6 +382,20 @@ object KafkaConfig { val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + "or this timeout is reached. This is similar to the producer request timeout." val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden" + /** ********* Quota Configuration ***********/ + val ProducerQuotaDefaultBytesPerSecondDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" + val ConsumerQuotaDefaultBytesPerSecondDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" + val ProducerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default producer quota. " + + "Example: clientIdX=10485760,clientIdY=10485760" + val ConsumerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default consumer quota. " + + "Example: clientIdX=10485760,clientIdY=10485760" + val QuotaDelayFactorDoc = "The quota delay factor modifies any delay by a fixed multiplier (default 1.2)" + val NumQuotaSamplesDoc = "The number of samples to retain in memory" + val QuotaWindowSizeSecondsDoc = "The time span of each sample" + /** ********* Kafka Metrics Configuration ***********/ + val MetricsNumSamplesDoc = "Number of samples to maintained to compute metrics. Only available for KafkaMetrics and not codahale" + val MetricsWindowSizeSecondsDoc = "Size in seconds of each sample maintained to compute metrics. Only available for KafkaMetrics and not codahale" + val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + @@ -479,6 +516,19 @@ object KafkaConfig { .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc) .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc) .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc) + + /** ********* Quota configuration ***********/ + .define(ProducerQuotaDefaultBytesPerSecondProp, LONG, Defaults.ProducerQuotaDefaultBytesPerSecond, atLeast(1), HIGH, ProducerQuotaDefaultBytesPerSecondDoc) + .define(ConsumerQuotaDefaultBytesPerSecondProp, LONG, Defaults.ConsumerQuotaDefaultBytesPerSecond, atLeast(1), HIGH, ConsumerQuotaDefaultBytesPerSecondDoc) + .define(ProducerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ProducerQuotaBytesPerSecondOverrides, HIGH, ProducerQuotaBytesPerSecondOverridesDoc) + .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc) + .define(QuotaDelayFactorProp, DOUBLE, Defaults.QuotaDelayFactor, atLeast(1), LOW, QuotaDelayFactorDoc) + .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) + .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc) + + /** ********* Kafka Metrics Configuration ***********/ + .define(MetricsNumSamplesProp, INT, Defaults.MetricsNumSamples, atLeast(1), LOW, MetricsNumSamplesDoc) + .define(MetricsWindowSizeSecondsProp, INT, Defaults.MetricsWindowSizeSeconds, atLeast(1), LOW, MetricsWindowSizeSecondsDoc) } def configNames() = { @@ -597,6 +647,17 @@ object KafkaConfig { offsetsRetentionCheckIntervalMs = parsed.get(OffsetsRetentionCheckIntervalMsProp).asInstanceOf[Long], offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int], offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short], + /** ********* Quota configuration ***********/ + producerQuotaDefaultBytesPerSecond = parsed.get(ProducerQuotaDefaultBytesPerSecondProp).asInstanceOf[Long], + consumerQuotaDefaultBytesPerSecond = parsed.get(ConsumerQuotaDefaultBytesPerSecondProp).asInstanceOf[Long], + producerQuotaBytesPerSecondOverrides = parsed.get(ProducerQuotaBytesPerSecondOverridesProp).asInstanceOf[String], + consumerQuotaBytesPerSecondOverrides = parsed.get(ConsumerQuotaBytesPerSecondOverridesProp).asInstanceOf[String], + quotaDelayFactor = parsed.get(QuotaDelayFactorProp).asInstanceOf[Double], + numQuotaSamples = parsed.get(NumQuotaSamplesProp).asInstanceOf[Int], + quotaWindowSizeSeconds = parsed.get(QuotaWindowSizeSecondsProp).asInstanceOf[Int], + /** ********* Kafka Metrics Configuration ***********/ + metricsNumSamples = parsed.get(MetricsNumSamplesProp).asInstanceOf[Int], + metricsWindowSizeSeconds = parsed.get(MetricsWindowSizeSecondsProp).asInstanceOf[Int], deleteTopicEnable = parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean], compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String] ) @@ -741,9 +802,18 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val offsetCommitTimeoutMs: Int = Defaults.OffsetCommitTimeoutMs, val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks, + /** ********* Quota configuration ***********/ + val producerQuotaDefaultBytesPerSecond: Long = Defaults.ProducerQuotaDefaultBytesPerSecond, + val consumerQuotaDefaultBytesPerSecond: Long = Defaults.ConsumerQuotaDefaultBytesPerSecond, + val producerQuotaBytesPerSecondOverrides : String = Defaults.ProducerQuotaBytesPerSecondOverrides, + val consumerQuotaBytesPerSecondOverrides : String = Defaults.ConsumerQuotaBytesPerSecondOverrides, + val quotaDelayFactor : Double = Defaults.QuotaDelayFactor, + val numQuotaSamples : Int = Defaults.NumQuotaSamples, + val quotaWindowSizeSeconds : Int = Defaults.QuotaWindowSizeSeconds, + val metricsNumSamples : Int = Defaults.MetricsNumSamples, + val metricsWindowSizeSeconds : Int = Defaults.MetricsWindowSizeSeconds, val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable, - val compressionType: String = Defaults.CompressionType - ) { + val compressionType: String = Defaults.CompressionType) { val zkConnectionTimeoutMs: Int = _zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs) @@ -962,6 +1032,15 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(OffsetsRetentionCheckIntervalMsProp, offsetsRetentionCheckIntervalMs.toString) props.put(OffsetCommitTimeoutMsProp, offsetCommitTimeoutMs.toString) props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString) + props.put(ProducerQuotaDefaultBytesPerSecondProp, producerQuotaDefaultBytesPerSecond.toString) + props.put(ConsumerQuotaDefaultBytesPerSecondProp, consumerQuotaDefaultBytesPerSecond.toString) + props.put(ProducerQuotaBytesPerSecondOverridesProp, producerQuotaBytesPerSecondOverrides) + props.put(ConsumerQuotaBytesPerSecondOverridesProp, consumerQuotaBytesPerSecondOverrides) + props.put(QuotaDelayFactorProp, quotaDelayFactor.toString) + props.put(NumQuotaSamplesProp, numQuotaSamples.toString) + props.put(QuotaWindowSizeSecondsProp, quotaWindowSizeSeconds.toString) + props.put(MetricsNumSamplesProp, metricsNumSamples.toString) + props.put(MetricsWindowSizeSecondsProp, metricsWindowSizeSeconds.toString) props.put(DeleteTopicEnableProp, deleteTopicEnable.toString) props.put(CompressionTypeProp, compressionType.toString) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e66710d..79a6fbc 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -26,7 +26,12 @@ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File +import org.apache.kafka.clients.producer.ProducerConfig + import collection.mutable +import org.apache.kafka.common.metrics.{Metrics, JmxReporter, MetricsReporter, MetricConfig} + +import scala.collection.{JavaConversions, mutable} import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} @@ -72,7 +77,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val metadataCache: MetadataCache = new MetadataCache(config.brokerId) - + val metrics: Metrics = initMetrics() var zkClient: ZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" @@ -146,7 +151,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, - kafkaController, zkClient, config.brokerId, config, metadataCache) + kafkaController, zkClient, config.brokerId, config, metadataCache, metrics) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -184,6 +189,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg } } + private def initMetrics() : Metrics = { + val jmxPrefix: String = "kafka.server" + info("Initiating KafkaMetrics") + + val metricConfig: MetricConfig = new MetricConfig().samples(config.metricsNumSamples) + .timeWindow(config.metricsWindowSizeSeconds, TimeUnit.SECONDS) + new Metrics(metricConfig, + JavaConversions.seqAsJavaList(mutable.Seq(new JmxReporter(jmxPrefix))), + new org.apache.kafka.common.utils.SystemTime) + } + private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 59c9bc3..270e303 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -29,6 +29,7 @@ import kafka.message.{ByteBufferMessageSet, MessageSet} import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} import java.util.concurrent.TimeUnit +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import scala.Predef._ @@ -93,7 +94,7 @@ class ReplicaManager(val config: KafkaConfig, val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, - val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { + val isShuttingDown: AtomicBoolean) extends Logging with KafkaMetricsGroup { /* epoch of the controller that last changed the leader */ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val localBrokerId = config.brokerId @@ -418,7 +419,6 @@ class ReplicaManager(val config: KafkaConfig, fetchMinBytes: Int, fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { - val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) diff --git a/core/src/main/scala/kafka/server/ThrottledRequest.scala b/core/src/main/scala/kafka/server/ThrottledRequest.scala new file mode 100644 index 0000000..dbbca36 --- /dev/null +++ b/core/src/main/scala/kafka/server/ThrottledRequest.scala @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{TimeUnit, Delayed} + +import org.apache.kafka.common.utils.Time + + +/** + * Represents a request whose response has been delayed. + * @param time @Time instance to use + * @param delayTimeMs delay associated with this request + * @param callback Callback to trigger after delayTimeMs milliseconds + */ +private[server] class ThrottledRequest(val time: Time, val delayTimeMs : Long, callback: => Unit) extends Delayed { + val endTime = time.milliseconds + delayTimeMs + + def execute() = callback + + def getDelay(unit: TimeUnit): Long = { + unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS) + } + + def compareTo(d: Delayed): Int = { + val other = d.asInstanceOf[ThrottledRequest] + if(this.endTime < other.endTime) -1 + else if(this.endTime > other.endTime) 1 + else 0 + } +} diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala index fc226c8..db21aae 100644 --- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala +++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala @@ -51,6 +51,9 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean info("Shutdown completed") } + /** + * This method is continually in a loop called until the thread shuts down or this method throws an exception + */ def doWork(): Unit override def run(): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala new file mode 100644 index 0000000..a1524e5 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -0,0 +1,138 @@ +/** + * Copyright 2015 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package kafka.api + +import java.util.Properties + +import junit.framework.Assert +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer._ +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics.KafkaMetric +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class QuotasTest extends KafkaServerTestHarness { + private val producerBufferSize = 30000 + private val producerId1 = "QuotasTestProducer-1" + private val producerId2 = "QuotasTestProducer-2" + val numServers = 1 + val overridingProps = new Properties() + + // Low enough quota that a producer sending a small payload in a tight loop should get throttled + overridingProps.put(KafkaConfig.ProducerQuotaDefaultBytesPerSecondProp, "8000") + // un-throttled + overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue) + + override def generateConfigs() = { + FixedPortTestUtils.createBrokerConfigs(numServers, + zkConnect, + enableControlledShutdown = false) + .map(KafkaConfig.fromProps(_, overridingProps)) + } + + var producers = mutable.Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + var consumers = mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() + + private val topic1 = "topic-1" + + override def setUp() { + super.setUp() + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.ACKS_CONFIG, "0") + producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString) + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) + producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps) + + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId2) + producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps) + + // Create consumers + val consumerProps = new Properties + consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest") + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) + for(i <- 0 until 2) + consumers += new KafkaConsumer(consumerProps) + + val numPartitions = 1 + val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers) + assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) + } + + override def tearDown() { + producers.foreach( _.close ) + consumers.foreach( _.close ) + super.tearDown() + } + + @Test + def testThrottledProducer() { + val allMetrics: mutable.Map[MetricName, KafkaMetric] = produce(producers(0), 1000) + + var numAsserts = 0 + for ((name: MetricName, metric: KafkaMetric) <- allMetrics) { + if (name.tags().containsValue(producerId1) && name.name().startsWith("throttle-time")) { + Assert.assertTrue("Should have been throttled", metric.value() > 0) + numAsserts += 1 + } + } + // Should have matched 1 metric + Assert.assertEquals(1, numAsserts) + } + + @Test + def testProducerOverrideUnthrottled() { + val allMetrics: mutable.Map[MetricName, KafkaMetric] = produce(producers(1), 1000) + + var numAsserts = 0 + for ((name: MetricName, metric: KafkaMetric) <- allMetrics) { + if (name.tags().containsValue(producerId2) && name.name().startsWith("throttle-time")) { + Assert.assertFalse("Should not have been throttled", metric.value() > 0) + numAsserts += 1 + } + } + // Should have matched 1 metric + Assert.assertEquals(1, numAsserts) + } + + def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): mutable.Map[MetricName, KafkaMetric] = { + for (i <- 0 to count) { + p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, i.toString.getBytes), + new ErrorLoggingCallback(topic1, null, null, true)).get() + Thread.sleep(1) + } + import scala.collection.JavaConverters._ + servers.head.metrics.metrics().asScala + } +} diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala new file mode 100644 index 0000000..382b665 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.Collections + +import org.apache.kafka.common.metrics.{Quota, MetricConfig} +import org.apache.kafka.common.utils.MockTime +import org.scalatest.junit.JUnit3Suite +import org.junit.{Before, Test, Assert} + +class ClientQuotaMetricsTest extends JUnit3Suite { + private val time = new MockTime + private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + time) + private val config = ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond = 500, + quotaBytesPerSecondOverrides = "p1=2000,p2=4000") + + var numCallbacks : Int = 0 + def callback { + numCallbacks += 1 + } + + @Before + def beforeMethod() { + numCallbacks = 0 + } + + @Test + def testQuotaParsing() { + val clientMetrics = new ClientQuotaMetrics(config, metrics, "producer", time) + try { + Assert.assertEquals("Default producer quota should be 500", + new Quota(500, true), clientMetrics.quota("random-client-id")) + Assert.assertEquals("Should return the overridden value (2000)", + new Quota(2000, true), clientMetrics.quota("p1")) + Assert.assertEquals("Should return the overridden value (4000)", + new Quota(4000, true), clientMetrics.quota("p2")) + } finally { + clientMetrics.shutdown() + } + } + + @Test + def testQuotaViolation() { + val clientMetrics = new ClientQuotaMetrics(config, metrics, "producer", time) + try { + /* We have 10 second windows. Make sure that there is no quota violation + * if we produce under the quota + */ + for(i <- 0 until 10) { + clientMetrics.record("unknown", 400, callback) + time.sleep(1000) + } + Assert.assertEquals(10, numCallbacks) + + // Create a spike. + // 400*10 + 2000 = 6000/10 = 600 bytes per second. + // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200 + val sleepTime = clientMetrics.record("unknown", 2000, callback) + Assert.assertEquals("Should be throttled", 2200, sleepTime) + // After a request is delayed, the callback cannot be triggered immediately + clientMetrics.throttledRequestReaper.doWork() + Assert.assertEquals(10, numCallbacks) + time.sleep(sleepTime) + + // Callback can only be triggered after the the delay time passes + clientMetrics.throttledRequestReaper.doWork() + Assert.assertEquals(11, numCallbacks) + + // Could continue to see delays until the bursty sample disappears + for(i <- 0 until 10) { + clientMetrics.record("unknown", 400, callback) + time.sleep(1000) + } + + Assert.assertEquals("Should be unthrottled since bursty sample has rolled over", + 0, clientMetrics.record("unknown", 0, callback)) + } finally { + clientMetrics.shutdown() + } + } + + @Test + def testOverrideParse() { + var testConfig = ClientQuotaMetricsConfig() + var clientMetrics = new ClientQuotaMetrics(testConfig, metrics, "consumer", time) + + try { + // Case 1 - Default config + Assert.assertEquals(new Quota(ClientQuotaMetricsConfig.DefaultQuotaBytesPerSecond, true), + clientMetrics.quota("p1")) + } finally { + clientMetrics.shutdown() + } + + + // Case 2 - Empty override + testConfig = ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond = 500, + quotaBytesPerSecondOverrides = "p1=2000,p2=4000,,") + + clientMetrics = new ClientQuotaMetrics(testConfig, metrics, "consumer", time) + try { + Assert.assertEquals(new Quota(2000, true), clientMetrics.quota("p1")) + Assert.assertEquals(new Quota(4000, true), clientMetrics.quota("p2")) + } finally { + clientMetrics.shutdown() + } + + // Case 3 - NumberFormatException for override + testConfig = ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond = 500, + quotaBytesPerSecondOverrides = "p1=2000,p2=4000,p3=p4") + try { + clientMetrics = new ClientQuotaMetrics(testConfig, metrics, "consumer", time) + Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides) + } + catch { + // Swallow. + case nfe : NumberFormatException => + } + + // Case 4 - IllegalArgumentException for override + testConfig = ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond = 500, + quotaBytesPerSecondOverrides = "p1=2000=3000") + try { + clientMetrics = new ClientQuotaMetrics(testConfig, metrics, "producer", time) + Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides) + } + catch { + // Swallow. + case nfe : IllegalArgumentException => + } + + } +} diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 8014a5a..92b2c9d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -218,6 +218,15 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.OffsetsRetentionMinutesProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.OffsetCommitTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.ProducerQuotaDefaultBytesPerSecondProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.ConsumerQuotaDefaultBytesPerSecondProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => expected.setProperty(name, "P1=10,P2=20") + case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => expected.setProperty(name, "C1=10,C2=20") + case KafkaConfig.QuotaDelayFactorProp => expected.setProperty(name, "%.1f".format(nextDouble + 1)) + case KafkaConfig.NumQuotaSamplesProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.QuotaWindowSizeSecondsProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.MetricsNumSamplesProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.MetricsWindowSizeSecondsProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.DeleteTopicEnableProp => expected.setProperty(name, randFrom("true", "false")) // explicit, non trivial validations or with transient dependencies @@ -340,7 +349,15 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") - + case KafkaConfig.ProducerQuotaDefaultBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.ConsumerQuotaDefaultBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => // ignore string + case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => // ignore string + case KafkaConfig.QuotaDelayFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.MetricsNumSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.MetricsWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") diff --git a/core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala new file mode 100644 index 0000000..1c88997 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + + +import java.util.Collections +import java.util.concurrent.{TimeUnit, DelayQueue} + +import org.apache.kafka.common.metrics.MetricConfig +import org.apache.kafka.common.utils.MockTime +import org.junit.{AfterClass, Before, Assert, Test} +import org.scalatest.junit.JUnit3Suite + +class ThrottledRequestExpirationTest extends JUnit3Suite { + private val time = new MockTime + private var numCallbacks : Int = 0 + private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + time) + + def callback { + numCallbacks += 1 + } + + @Before + def beforeMethod() { + numCallbacks = 0 + } + + @Test + def testExpire() { + val clientMetrics = new ClientQuotaMetrics(ClientQuotaMetricsConfig(), metrics, "producer", time) + + val delayQueue = new DelayQueue[ThrottledRequest]() + val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue) + try { + // Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp + delayQueue.add(new ThrottledRequest(time, 10, callback)) + delayQueue.add(new ThrottledRequest(time, 30, callback)) + delayQueue.add(new ThrottledRequest(time, 30, callback)) + delayQueue.add(new ThrottledRequest(time, 20, callback)) + + for(itr <- 1 to 3) { + time.sleep(10) + reaper.doWork() + Assert.assertEquals(itr, numCallbacks) + + } + reaper.doWork() + Assert.assertEquals(4, numCallbacks) + Assert.assertEquals(0, delayQueue.size()) + reaper.doWork() + Assert.assertEquals(4, numCallbacks) + } finally { + clientMetrics.shutdown() + } + } + + @Test + def testThrottledRequest() { + val t1 : ThrottledRequest = new ThrottledRequest(time, 10, callback) + val t2 : ThrottledRequest = new ThrottledRequest(time, 20, callback) + val t3 : ThrottledRequest = new ThrottledRequest(time, 20, callback) + Assert.assertEquals(10, t1.delayTimeMs) + Assert.assertEquals(20, t2.delayTimeMs) + Assert.assertEquals(20, t3.delayTimeMs) + + for(itr <- 0 to 2) { + Assert.assertEquals(10 - 10*itr, t1.getDelay(TimeUnit.MILLISECONDS)) + Assert.assertEquals(20 - 10*itr, t2.getDelay(TimeUnit.MILLISECONDS)) + Assert.assertEquals(20 - 10*itr, t3.getDelay(TimeUnit.MILLISECONDS)) + time.sleep(10) + } + } +} -- 1.7.12.4 From 179180cec67afedb31b7af3653c83c4954300ac3 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 4 Jun 2015 16:31:03 -0700 Subject: [PATCH 2/2] Addressing Joel's comments --- .../scala/kafka/server/ClientQuotaMetrics.scala | 100 ++++++++++++--------- core/src/main/scala/kafka/server/KafkaApis.scala | 29 +++--- core/src/main/scala/kafka/server/KafkaConfig.scala | 30 +++---- core/src/main/scala/kafka/server/KafkaServer.scala | 4 +- .../main/scala/kafka/server/ThrottledRequest.scala | 4 +- .../scala/kafka/utils/ShutdownableThread.scala | 2 +- .../scala/integration/kafka/api/QuotasTest.scala | 76 +++++++++++++--- .../unit/kafka/server/ClientQuotaMetricsTest.scala | 45 ++++++---- .../kafka/server/KafkaConfigConfigDefTest.scala | 8 +- 9 files changed, 182 insertions(+), 116 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala index 9067066..1f7b532 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala @@ -21,35 +21,40 @@ import java.util.concurrent.{DelayQueue, TimeUnit} import kafka.utils.{ShutdownableThread, Logging} import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.metrics.stats.Rate +import org.apache.kafka.common.metrics.stats.{Total, Rate} import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.kafka.common.utils.Time -import scala.collection.mutable +/** + * Represents the sensors aggregated per client + * @param quotaSensor @Sensor that tracks the quota + * @param throttleTimeSensor @Sensor that tracks the throttle time + */ +private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor) /** * Configuration settings for quota management - * @param defaultQuotaBytesPerSecond The default bytes per second quota allocated to any client + * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client * @param quotaBytesPerSecondOverrides The comma separated overrides per client. "c1=X,c2=Y" * @param quotaDelayFactor The quota delay factor modifies any delay by a fixed multiplier (default 1.2) - * @param numQuotaSamples The number of sample to retain in memory + * @param numQuotaSamples The number of samples to retain in memory * @param quotaWindowSizeSeconds The time span of each sample * */ -case class ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond : Long = - ClientQuotaMetricsConfig.DefaultQuotaBytesPerSecond, - quotaBytesPerSecondOverrides : String = +case class ClientQuotaMetricsConfig(quotaBytesPerSecondDefault: Long = + ClientQuotaMetricsConfig.QuotaBytesPerSecondDefault, + quotaBytesPerSecondOverrides: String = ClientQuotaMetricsConfig.QuotaBytesPerSecondOverrides, quotaDelayFactor: Double = ClientQuotaMetricsConfig.DefaultQuotaDelayFactor, - numQuotaSamples : Int = + numQuotaSamples: Int = ClientQuotaMetricsConfig.DefaultNumQuotaSamples, - quotaWindowSizeSeconds : Int = + quotaWindowSizeSeconds: Int = ClientQuotaMetricsConfig.DefaultQuotaWindowSizeSeconds) object ClientQuotaMetricsConfig { - val DefaultQuotaBytesPerSecond = Long.MaxValue + val QuotaBytesPerSecondDefault = Long.MaxValue val QuotaBytesPerSecondOverrides = "" val DefaultQuotaDelayFactor = 1 // Always have 10 whole windows + 1 current window @@ -61,20 +66,27 @@ object ClientQuotaMetricsConfig { /** * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics * for all clients. - * @param config @KafkaConfig Configs for the Kafka Server + * @param config @ClientQuotaMetricsConfig quota configs * @param metrics @Metrics Metrics instance + * @param apiKey API Key for the request + * @param time @Time object to use */ -class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, - private val metrics : Metrics, - private val apiKey : String, - private val time : Time) extends Logging { +class ClientQuotaMetrics(private val config: ClientQuotaMetricsConfig, + private val metrics: Metrics, + private val apiKey: String, + private val time: Time) extends Logging { private val overriddenQuota = initQuotaMap(config.quotaBytesPerSecondOverrides) - private val defaultQuota = Quota.lessThan(config.defaultQuotaBytesPerSecond) + private val defaultQuota = Quota.lessThan(config.quotaBytesPerSecondDefault) private val lock = new ReentrantReadWriteLock() private val delayQueue = new DelayQueue[ThrottledRequest]() val throttledRequestReaper = new ThrottledRequestReaper(delayQueue) throttledRequestReaper.start() + private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue") + delayQueueSensor.add(new MetricName("queue-size", + apiKey, + "Tracks the size of the delay queue"), new Total()) + /** * Reaper thread that triggers callbacks on all throttled requests * @param delayQueue DelayQueue to dequeue from @@ -85,6 +97,8 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, override def doWork(): Unit = { val response : ThrottledRequest = delayQueue.poll(1, TimeUnit.SECONDS) if(response != null) { + // Decrement the size of the delay queue + delayQueueSensor.record(-1) trace("Response throttled for: " + response.delayTimeMs + " ms") response.execute() } @@ -92,7 +106,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, } /** - * Records that a produce request wrote some data + * Records that a clientId changed some metric being throttled (produced/consumed bytes, QPS etc.) * @param clientId clientId that produced the data * @param value amount of data written in bytes * @param callback Callback function. This will be triggered immediately if quota is not violated. @@ -100,20 +114,21 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, * @return Number of milliseconds to delay the response in case of Quota violation. * Zero otherwise */ - def record(clientId: String, value : Int, callback: => Unit) : Int = { - val sensors = getOrCreateQuotaSensors(clientId) + def recordAndMaybeThrottle(clientId: String, value: Int, callback: => Unit) : Int = { + val clientSensors = getOrCreateQuotaSensors(clientId) var delayTime = 0.0 try { - sensors._1.record(value) + clientSensors.quotaSensor.record(value) // trigger the callback immediately if quota is not violated callback } catch { - case qve : QuotaViolationException => - delayTime = qve.getDelayTimeMs*config.quotaDelayFactor + case qve: QuotaViolationException => + delayTime = qve.getDelayTimeMs * config.quotaDelayFactor delayQueue.add(new ThrottledRequest(time, delayTime.toLong, callback)) - sensors._2.record(delayTime) + delayQueueSensor.record() + clientSensors.throttleTimeSensor.record(delayTime) // If delayed, add the element to the delayQueue - logger.info("Quota violated for sensor (%s). Delay time: (%f)".format(sensors._1, delayTime), qve) + logger.debug("Quota violated for sensor (%s). Delay time: (%f)".format(clientSensors.quotaSensor.name(), delayTime), qve) } delayTime.toInt } @@ -122,18 +137,15 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, * Returns the consumer quota for the specified clientId * @return */ - private[server] def quota(clientId : String) : Quota = { - if(overriddenQuota.contains(clientId)) - overriddenQuota(clientId) - else - defaultQuota + private[server] def quota(clientId: String) : Quota = { + overriddenQuota.getOrElse(clientId, defaultQuota) } /* * This function either returns the sensors for a given client id or creates them if they don't exist * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor */ - private def getOrCreateQuotaSensors(clientId : String) : (Sensor, Sensor) = { + private def getOrCreateQuotaSensors(clientId: String) : ClientSensors = { // Names of the sensors to access val quotaSensorName = apiKey + "-" + clientId @@ -191,29 +203,31 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, } } // return the read or created sensors - (quotaSensor, throttleTimeSensor) + ClientSensors(quotaSensor, throttleTimeSensor) } - private def getQuotaMetricConfig(quota : Quota) : MetricConfig = { + private def getQuotaMetricConfig(quota: Quota) : MetricConfig = { new MetricConfig() .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS) .samples(config.numQuotaSamples) .quota(quota) } - private def initQuotaMap(input : String) : mutable.Map[String, Quota] = { - val output = mutable.Map[String, Quota]() - for(entry <- input.split(",")) { - val trimmedEntry = entry.trim - if(!trimmedEntry.equals("")) { + /* Construct a Map of (clientId -> Quota) + * The input config is specified as a comma-separated K=V pairs + */ + private def initQuotaMap(input: String) : Map[String, Quota] = { + // If empty input, return an empty map + if (input.trim.length == 0) + Map[String, Quota]() + else + input.split(",").map(entry => { + val trimmedEntry = entry.trim val pair: Array[String] = trimmedEntry.split("=") if (pair.length != 2) - throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format( - entry)) - output(pair(0)) = new Quota(pair(1).toDouble, true) - } - } - output + throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format(entry)) + pair(0) -> new Quota(pair(1).toDouble, true) + }).toMap } def shutdown() = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 53fbee9..412f21e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -253,26 +253,20 @@ class KafkaApis(val requestChannel: RequestChannel, } def produceResponseCallback { - if (produceRequest.requiredAcks == 0) - { + if (produceRequest.requiredAcks == 0) { // no operation needed if producer request.required.acks = 0; however, if there is any error in handling // the request, since no response is expected by the producer, the server will close socket server so that // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) - { + if (errorInResponse) { info( "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format( produceRequest.correlationId, produceRequest.clientId)) requestChannel.closeConnection(request.processor, request) - } - else - { + } else { requestChannel.noOperation(request.processor, request) } - } - else - { + } else { val response = ProducerResponse(produceRequest.correlationId, responseStatus) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(response))) } @@ -281,9 +275,9 @@ class KafkaApis(val requestChannel: RequestChannel, val requestKey = RequestKeys.nameForKey(RequestKeys.ProduceKey) quotaManagers.get(requestKey) match { case Some(quotaManager) => - quotaManager.record(produceRequest.clientId, numBytesAppended, produceResponseCallback) + quotaManager.recordAndMaybeThrottle(produceRequest.clientId, numBytesAppended, produceResponseCallback) case None => - warn("Cannot throttle Api key %s".format(requestKey)) + warn("Cannot throttle producer request for clientId: %s".format(produceRequest.clientId)) } } @@ -332,13 +326,13 @@ class KafkaApis(val requestChannel: RequestChannel, } // Do not throttle replication traffic - if (Request.isValidBrokerId(fetchRequest.replicaId)) { + if (fetchRequest.isFromFollower) { fetchResponseCallback } else { val requestKey = RequestKeys.nameForKey(RequestKeys.FetchKey) quotaManagers.get(requestKey) match { case Some(quotaManager) => - quotaManager.record(fetchRequest.clientId, response.sizeInBytes, fetchResponseCallback) + quotaManager.recordAndMaybeThrottle(fetchRequest.clientId, response.sizeInBytes, fetchResponseCallback) case None => warn("Cannot throttle Api key %s".format(requestKey)) } @@ -628,9 +622,9 @@ class KafkaApis(val requestChannel: RequestChannel, /* * Returns a Map of all quota managers configured. The request Api key is the key for the Map */ - private def instantiateQuotaManagers(cfg : KafkaConfig): Map[String, ClientQuotaMetrics] = { + private def instantiateQuotaManagers(cfg: KafkaConfig): Map[String, ClientQuotaMetrics] = { val producerQuotaManagerCfg = ClientQuotaMetricsConfig( - defaultQuotaBytesPerSecond = cfg.producerQuotaDefaultBytesPerSecond, + quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault, quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides, quotaDelayFactor = cfg.quotaDelayFactor, numQuotaSamples = cfg.numQuotaSamples, @@ -638,14 +632,13 @@ class KafkaApis(val requestChannel: RequestChannel, ) val consumerQuotaManagerCfg = ClientQuotaMetricsConfig( - defaultQuotaBytesPerSecond = cfg.consumerQuotaDefaultBytesPerSecond, + quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault, quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides, quotaDelayFactor = cfg.quotaDelayFactor, numQuotaSamples = cfg.numQuotaSamples, quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds ) - // Using SystemTime from kafka.common because Metrics use the system time from clients val quotaManagers = Map[String, ClientQuotaMetrics]( RequestKeys.nameForKey(RequestKeys.ProduceKey) -> new ClientQuotaMetrics(producerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.ProduceKey), new org.apache.kafka.common.utils.SystemTime), diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index fabbf46..8fbbddc 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -124,8 +124,8 @@ object Defaults { val OffsetCommitRequiredAcks = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks /** ********* Quota Configuration ***********/ - val ProducerQuotaDefaultBytesPerSecond = ClientQuotaMetricsConfig.DefaultQuotaBytesPerSecond - val ConsumerQuotaDefaultBytesPerSecond = ClientQuotaMetricsConfig.DefaultQuotaBytesPerSecond + val ProducerQuotaBytesPerSecondDefault = ClientQuotaMetricsConfig.QuotaBytesPerSecondDefault + val ConsumerQuotaBytesPerSecondDefault = ClientQuotaMetricsConfig.QuotaBytesPerSecondDefault val ProducerQuotaBytesPerSecondOverrides = ClientQuotaMetricsConfig.QuotaBytesPerSecondOverrides val ConsumerQuotaBytesPerSecondOverrides = ClientQuotaMetricsConfig.QuotaBytesPerSecondOverrides val QuotaDelayFactor : Double = ClientQuotaMetricsConfig.DefaultQuotaDelayFactor @@ -243,8 +243,8 @@ object KafkaConfig { val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms" val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks" /** ********* Quota Configuration ***********/ - val ProducerQuotaDefaultBytesPerSecondProp = "quota.producer.default" - val ConsumerQuotaDefaultBytesPerSecondProp = "quota.consumer.default" + val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default" + val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default" val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides" val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides" val QuotaDelayFactorProp = "quota.delay.factor" @@ -383,8 +383,8 @@ object KafkaConfig { "or this timeout is reached. This is similar to the producer request timeout." val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden" /** ********* Quota Configuration ***********/ - val ProducerQuotaDefaultBytesPerSecondDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" - val ConsumerQuotaDefaultBytesPerSecondDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" + val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" + val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" val ProducerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default producer quota. " + "Example: clientIdX=10485760,clientIdY=10485760" val ConsumerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default consumer quota. " + @@ -518,15 +518,15 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc) /** ********* Quota configuration ***********/ - .define(ProducerQuotaDefaultBytesPerSecondProp, LONG, Defaults.ProducerQuotaDefaultBytesPerSecond, atLeast(1), HIGH, ProducerQuotaDefaultBytesPerSecondDoc) - .define(ConsumerQuotaDefaultBytesPerSecondProp, LONG, Defaults.ConsumerQuotaDefaultBytesPerSecond, atLeast(1), HIGH, ConsumerQuotaDefaultBytesPerSecondDoc) + .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc) + .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc) .define(ProducerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ProducerQuotaBytesPerSecondOverrides, HIGH, ProducerQuotaBytesPerSecondOverridesDoc) .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc) .define(QuotaDelayFactorProp, DOUBLE, Defaults.QuotaDelayFactor, atLeast(1), LOW, QuotaDelayFactorDoc) .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc) - /** ********* Kafka Metrics Configuration ***********/ + /** ********* Kafka Metrics Configuration ***********/ .define(MetricsNumSamplesProp, INT, Defaults.MetricsNumSamples, atLeast(1), LOW, MetricsNumSamplesDoc) .define(MetricsWindowSizeSecondsProp, INT, Defaults.MetricsWindowSizeSeconds, atLeast(1), LOW, MetricsWindowSizeSecondsDoc) } @@ -648,8 +648,8 @@ object KafkaConfig { offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int], offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short], /** ********* Quota configuration ***********/ - producerQuotaDefaultBytesPerSecond = parsed.get(ProducerQuotaDefaultBytesPerSecondProp).asInstanceOf[Long], - consumerQuotaDefaultBytesPerSecond = parsed.get(ConsumerQuotaDefaultBytesPerSecondProp).asInstanceOf[Long], + producerQuotaBytesPerSecondDefault = parsed.get(ProducerQuotaBytesPerSecondDefaultProp).asInstanceOf[Long], + consumerQuotaBytesPerSecondDefault = parsed.get(ConsumerQuotaBytesPerSecondDefaultProp).asInstanceOf[Long], producerQuotaBytesPerSecondOverrides = parsed.get(ProducerQuotaBytesPerSecondOverridesProp).asInstanceOf[String], consumerQuotaBytesPerSecondOverrides = parsed.get(ConsumerQuotaBytesPerSecondOverridesProp).asInstanceOf[String], quotaDelayFactor = parsed.get(QuotaDelayFactorProp).asInstanceOf[Double], @@ -803,8 +803,8 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks, /** ********* Quota configuration ***********/ - val producerQuotaDefaultBytesPerSecond: Long = Defaults.ProducerQuotaDefaultBytesPerSecond, - val consumerQuotaDefaultBytesPerSecond: Long = Defaults.ConsumerQuotaDefaultBytesPerSecond, + val producerQuotaBytesPerSecondDefault: Long = Defaults.ProducerQuotaBytesPerSecondDefault, + val consumerQuotaBytesPerSecondDefault: Long = Defaults.ConsumerQuotaBytesPerSecondDefault, val producerQuotaBytesPerSecondOverrides : String = Defaults.ProducerQuotaBytesPerSecondOverrides, val consumerQuotaBytesPerSecondOverrides : String = Defaults.ConsumerQuotaBytesPerSecondOverrides, val quotaDelayFactor : Double = Defaults.QuotaDelayFactor, @@ -1032,8 +1032,8 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(OffsetsRetentionCheckIntervalMsProp, offsetsRetentionCheckIntervalMs.toString) props.put(OffsetCommitTimeoutMsProp, offsetCommitTimeoutMs.toString) props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString) - props.put(ProducerQuotaDefaultBytesPerSecondProp, producerQuotaDefaultBytesPerSecond.toString) - props.put(ConsumerQuotaDefaultBytesPerSecondProp, consumerQuotaDefaultBytesPerSecond.toString) + props.put(ProducerQuotaBytesPerSecondDefaultProp, producerQuotaBytesPerSecondDefault.toString) + props.put(ConsumerQuotaBytesPerSecondDefaultProp, consumerQuotaBytesPerSecondDefault.toString) props.put(ProducerQuotaBytesPerSecondOverridesProp, producerQuotaBytesPerSecondOverrides) props.put(ConsumerQuotaBytesPerSecondOverridesProp, consumerQuotaBytesPerSecondOverrides) props.put(QuotaDelayFactorProp, quotaDelayFactor.toString) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 79a6fbc..0d6f76c 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -190,10 +190,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg } private def initMetrics() : Metrics = { - val jmxPrefix: String = "kafka.server" + val jmxPrefix = "kafka.server" info("Initiating KafkaMetrics") - val metricConfig: MetricConfig = new MetricConfig().samples(config.metricsNumSamples) + val metricConfig = new MetricConfig().samples(config.metricsNumSamples) .timeWindow(config.metricsWindowSizeSeconds, TimeUnit.SECONDS) new Metrics(metricConfig, JavaConversions.seqAsJavaList(mutable.Seq(new JmxReporter(jmxPrefix))), diff --git a/core/src/main/scala/kafka/server/ThrottledRequest.scala b/core/src/main/scala/kafka/server/ThrottledRequest.scala index dbbca36..bf16bad 100644 --- a/core/src/main/scala/kafka/server/ThrottledRequest.scala +++ b/core/src/main/scala/kafka/server/ThrottledRequest.scala @@ -33,11 +33,11 @@ private[server] class ThrottledRequest(val time: Time, val delayTimeMs : Long, c def execute() = callback - def getDelay(unit: TimeUnit): Long = { + override def getDelay(unit: TimeUnit): Long = { unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS) } - def compareTo(d: Delayed): Int = { + override def compareTo(d: Delayed): Int = { val other = d.asInstanceOf[ThrottledRequest] if(this.endTime < other.endTime) -1 else if(this.endTime > other.endTime) 1 diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala index db21aae..dc46797 100644 --- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala +++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala @@ -52,7 +52,7 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean } /** - * This method is continually in a loop called until the thread shuts down or this method throws an exception + * This method is repeatedly invoked until the thread shuts down or this method throws an exception */ def doWork(): Unit diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index a1524e5..fb05d9b 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -1,6 +1,4 @@ /** - * Copyright 2015 Confluent Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -22,13 +20,15 @@ import junit.framework.Assert import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.consumer.{ConsumerRecords, ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics.KafkaMetric import org.junit.Assert._ import org.junit.Test +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable @@ -36,13 +36,19 @@ class QuotasTest extends KafkaServerTestHarness { private val producerBufferSize = 30000 private val producerId1 = "QuotasTestProducer-1" private val producerId2 = "QuotasTestProducer-2" + private val consumerId1 = "QuotasTestConsumer-1" + private val consumerId2 = "QuotasTestConsumer-2" + val numServers = 1 val overridingProps = new Properties() // Low enough quota that a producer sending a small payload in a tight loop should get throttled - overridingProps.put(KafkaConfig.ProducerQuotaDefaultBytesPerSecondProp, "8000") + overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000") + overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500") + // un-throttled overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue) + overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp, consumerId2 + "=" + Long.MaxValue) override def generateConfigs() = { FixedPortTestUtils.createBrokerConfigs(numServers, @@ -77,13 +83,21 @@ class QuotasTest extends KafkaServerTestHarness { val consumerProps = new Properties consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest") + consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl) consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) - for(i <- 0 until 2) - consumers += new KafkaConsumer(consumerProps) + consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range") + + //for(i <- 0 until 2) + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1) + consumers += new KafkaConsumer(consumerProps) + + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2) + consumers += new KafkaConsumer(consumerProps) val numPartitions = 1 val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers) @@ -97,8 +111,11 @@ class QuotasTest extends KafkaServerTestHarness { } @Test - def testThrottledProducer() { - val allMetrics: mutable.Map[MetricName, KafkaMetric] = produce(producers(0), 1000) + def testThrottledProducerConsumer() { + val allMetrics: mutable.Map[MetricName, KafkaMetric] = servers.head.metrics.metrics().asScala + + val numRecords = 1000 + produce(producers(0), numRecords) var numAsserts = 0 for ((name: MetricName, metric: KafkaMetric) <- allMetrics) { @@ -109,30 +126,63 @@ class QuotasTest extends KafkaServerTestHarness { } // Should have matched 1 metric Assert.assertEquals(1, numAsserts) + + // Consumer should read in a bursty manner and get throttled immediately + consume(consumers(0), numRecords) + + for ((name: MetricName, metric: KafkaMetric) <- allMetrics) { + if (name.tags().containsValue(consumerId1) && name.name().startsWith("throttle-time")) { + Assert.assertTrue("Should have been throttled", metric.value() > 0) + numAsserts += 1 + } + } } @Test def testProducerOverrideUnthrottled() { - val allMetrics: mutable.Map[MetricName, KafkaMetric] = produce(producers(1), 1000) + val allMetrics: mutable.Map[MetricName, KafkaMetric] = servers.head.metrics.metrics().asScala + val numRecords = 1000 + + produce(producers(1), numRecords) var numAsserts = 0 for ((name: MetricName, metric: KafkaMetric) <- allMetrics) { if (name.tags().containsValue(producerId2) && name.name().startsWith("throttle-time")) { - Assert.assertFalse("Should not have been throttled", metric.value() > 0) + Assert.assertEquals("Should not have been throttled", 0.0, metric.value()) numAsserts += 1 } } // Should have matched 1 metric Assert.assertEquals(1, numAsserts) + + consume(consumers(0), numRecords) + + for ((name: MetricName, metric: KafkaMetric) <- allMetrics) { + if (name.tags().containsValue(consumerId2) && name.name().startsWith("throttle-time")) { + Assert.assertEquals("Should not have been throttled", 0.0, metric.value()) + numAsserts += 1 + } + } + } - def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): mutable.Map[MetricName, KafkaMetric] = { + def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int) { for (i <- 0 to count) { p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, i.toString.getBytes), new ErrorLoggingCallback(topic1, null, null, true)).get() Thread.sleep(1) } - import scala.collection.JavaConverters._ - servers.head.metrics.metrics().asScala } + + def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) { + consumer.subscribe(topic1) + var numConsumed = 0 + while (numConsumed < numRecords) { + for (cr <- consumer.poll(100)) { + numConsumed += 1 + } + } + } + + } diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala index 382b665..d44f682 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala @@ -18,17 +18,17 @@ package kafka.server import java.util.Collections -import org.apache.kafka.common.metrics.{Quota, MetricConfig} +import kafka.api.RequestKeys +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics.{Metrics, Quota, MetricConfig} import org.apache.kafka.common.utils.MockTime import org.scalatest.junit.JUnit3Suite import org.junit.{Before, Test, Assert} class ClientQuotaMetricsTest extends JUnit3Suite { private val time = new MockTime - private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), - Collections.emptyList(), - time) - private val config = ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond = 500, + + private val config = ClientQuotaMetricsConfig(quotaBytesPerSecondDefault = 500, quotaBytesPerSecondOverrides = "p1=2000,p2=4000") var numCallbacks : Int = 0 @@ -43,7 +43,7 @@ class ClientQuotaMetricsTest extends JUnit3Suite { @Test def testQuotaParsing() { - val clientMetrics = new ClientQuotaMetrics(config, metrics, "producer", time) + val clientMetrics = new ClientQuotaMetrics(config, newMetrics, "producer", time) try { Assert.assertEquals("Default producer quota should be 500", new Quota(500, true), clientMetrics.quota("random-client-id")) @@ -58,22 +58,26 @@ class ClientQuotaMetricsTest extends JUnit3Suite { @Test def testQuotaViolation() { + val metrics = newMetrics val clientMetrics = new ClientQuotaMetrics(config, metrics, "producer", time) + val queueSizeMetric = metrics.metrics().get(new MetricName("queue-size", "producer", "")) try { /* We have 10 second windows. Make sure that there is no quota violation * if we produce under the quota */ for(i <- 0 until 10) { - clientMetrics.record("unknown", 400, callback) + clientMetrics.recordAndMaybeThrottle("unknown", 400, callback) time.sleep(1000) } Assert.assertEquals(10, numCallbacks) + Assert.assertEquals(0, queueSizeMetric.value().toInt) // Create a spike. // 400*10 + 2000 = 6000/10 = 600 bytes per second. // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200 - val sleepTime = clientMetrics.record("unknown", 2000, callback) + val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2000, callback) Assert.assertEquals("Should be throttled", 2200, sleepTime) + Assert.assertEquals(1, queueSizeMetric.value().toInt) // After a request is delayed, the callback cannot be triggered immediately clientMetrics.throttledRequestReaper.doWork() Assert.assertEquals(10, numCallbacks) @@ -81,16 +85,17 @@ class ClientQuotaMetricsTest extends JUnit3Suite { // Callback can only be triggered after the the delay time passes clientMetrics.throttledRequestReaper.doWork() + Assert.assertEquals(0, queueSizeMetric.value().toInt) Assert.assertEquals(11, numCallbacks) // Could continue to see delays until the bursty sample disappears for(i <- 0 until 10) { - clientMetrics.record("unknown", 400, callback) + clientMetrics.recordAndMaybeThrottle("unknown", 400, callback) time.sleep(1000) } Assert.assertEquals("Should be unthrottled since bursty sample has rolled over", - 0, clientMetrics.record("unknown", 0, callback)) + 0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback)) } finally { clientMetrics.shutdown() } @@ -99,11 +104,11 @@ class ClientQuotaMetricsTest extends JUnit3Suite { @Test def testOverrideParse() { var testConfig = ClientQuotaMetricsConfig() - var clientMetrics = new ClientQuotaMetrics(testConfig, metrics, "consumer", time) + var clientMetrics = new ClientQuotaMetrics(testConfig, newMetrics, "consumer", time) try { // Case 1 - Default config - Assert.assertEquals(new Quota(ClientQuotaMetricsConfig.DefaultQuotaBytesPerSecond, true), + Assert.assertEquals(new Quota(ClientQuotaMetricsConfig.QuotaBytesPerSecondDefault, true), clientMetrics.quota("p1")) } finally { clientMetrics.shutdown() @@ -111,10 +116,10 @@ class ClientQuotaMetricsTest extends JUnit3Suite { // Case 2 - Empty override - testConfig = ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond = 500, + testConfig = ClientQuotaMetricsConfig(quotaBytesPerSecondDefault = 500, quotaBytesPerSecondOverrides = "p1=2000,p2=4000,,") - clientMetrics = new ClientQuotaMetrics(testConfig, metrics, "consumer", time) + clientMetrics = new ClientQuotaMetrics(testConfig, newMetrics, "consumer", time) try { Assert.assertEquals(new Quota(2000, true), clientMetrics.quota("p1")) Assert.assertEquals(new Quota(4000, true), clientMetrics.quota("p2")) @@ -123,10 +128,10 @@ class ClientQuotaMetricsTest extends JUnit3Suite { } // Case 3 - NumberFormatException for override - testConfig = ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond = 500, + testConfig = ClientQuotaMetricsConfig(quotaBytesPerSecondDefault = 500, quotaBytesPerSecondOverrides = "p1=2000,p2=4000,p3=p4") try { - clientMetrics = new ClientQuotaMetrics(testConfig, metrics, "consumer", time) + clientMetrics = new ClientQuotaMetrics(testConfig, newMetrics, "consumer", time) Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides) } catch { @@ -135,10 +140,10 @@ class ClientQuotaMetricsTest extends JUnit3Suite { } // Case 4 - IllegalArgumentException for override - testConfig = ClientQuotaMetricsConfig(defaultQuotaBytesPerSecond = 500, + testConfig = ClientQuotaMetricsConfig(quotaBytesPerSecondDefault = 500, quotaBytesPerSecondOverrides = "p1=2000=3000") try { - clientMetrics = new ClientQuotaMetrics(testConfig, metrics, "producer", time) + clientMetrics = new ClientQuotaMetrics(testConfig, newMetrics, "producer", time) Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides) } catch { @@ -147,4 +152,8 @@ class ClientQuotaMetricsTest extends JUnit3Suite { } } + + def newMetrics: Metrics = { + new Metrics(new MetricConfig(), Collections.emptyList(), time) + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 92b2c9d..a7e5c86 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -218,8 +218,8 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.OffsetsRetentionMinutesProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.OffsetCommitTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.ProducerQuotaDefaultBytesPerSecondProp => expected.setProperty(name, atLeastOneIntProp) - case KafkaConfig.ConsumerQuotaDefaultBytesPerSecondProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => expected.setProperty(name, "P1=10,P2=20") case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => expected.setProperty(name, "C1=10,C2=20") case KafkaConfig.QuotaDelayFactorProp => expected.setProperty(name, "%.1f".format(nextDouble + 1)) @@ -349,8 +349,8 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") - case KafkaConfig.ProducerQuotaDefaultBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.ConsumerQuotaDefaultBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => // ignore string case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => // ignore string case KafkaConfig.QuotaDelayFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") -- 1.7.12.4