From f716449069ced90b8c0d0bf76124f746ae0168fd Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 9 Apr 2015 17:18:37 -0700 Subject: [PATCH 1/5] WIP: First patch for quotas. Changes are 1. Adding per-client throttle time and quota metrics in ClientQuotaMetrics.scala 2. Making changes in QuotaViolationException and Sensor to return delay time changes. 3. Added configuration needed so far for quotas in KafkaConfig. 4. Unit tests This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a better idea once I look at the new purgatory implementation. Hopefully, this smaller patch is easier to review. --- .../apache/kafka/common/metrics/MetricConfig.java | 20 +- .../org/apache/kafka/common/metrics/Metrics.java | 1 - .../org/apache/kafka/common/metrics/Quota.java | 24 +++ .../common/metrics/QuotaViolationException.java | 8 +- .../org/apache/kafka/common/metrics/Sensor.java | 26 ++- .../scala/kafka/server/ClientQuotaMetrics.scala | 204 +++++++++++++++++++++ core/src/main/scala/kafka/server/KafkaConfig.scala | 56 +++++- core/src/main/scala/kafka/server/KafkaServer.scala | 15 +- .../unit/kafka/server/ClientQuotaMetricsTest.scala | 106 +++++++++++ 9 files changed, 453 insertions(+), 7 deletions(-) create mode 100644 core/src/main/scala/kafka/server/ClientQuotaMetrics.scala create mode 100644 core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java index dfa1b0a..e45692b 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java @@ -28,6 +28,7 @@ public class MetricConfig { private long eventWindow; private long timeWindowMs; private TimeUnit unit; + private long quotaEnforcementDelayMs; public MetricConfig() { super(); @@ -36,6 +37,9 @@ public class MetricConfig { this.eventWindow = Long.MAX_VALUE; this.timeWindowMs = TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS); this.unit = TimeUnit.SECONDS; + // By default quota checking is disabled for the first window. + // This builds up of sufficient data initially before making quota enforcement decisions + this.quotaEnforcementDelayMs = timeWindowMs; } public Quota quota() { @@ -65,7 +69,21 @@ public class MetricConfig { return this; } - public int samples() { + /** + * Disables quota enforcement on this metric for this certain period of time + * after the metric is first created + */ + public MetricConfig quotaEnforcementDelay(long time, TimeUnit unit) { + this.quotaEnforcementDelayMs = TimeUnit.MILLISECONDS.convert(time, unit); + return this; + } + + public long quotaEnforcementDelayMs() { + return quotaEnforcementDelayMs; + } + + + public int samples() { return this.samples; } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index b3d3d7c..e5b2096 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -47,7 +47,6 @@ import org.apache.kafka.common.utils.Utils; * */ public class Metrics { - private final MetricConfig config; private final ConcurrentMap metrics; private final ConcurrentMap sensors; diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java index d82bb0c..007e4cf 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java @@ -49,4 +49,28 @@ public final class Quota { return (upper && value <= bound) || (!upper && value >= bound); } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) this.bound; + result = prime * result + (this.upper ? 1 : 0); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Quota that = (Quota) obj; + if(that.bound != this.bound) + return false; + if(that.upper != this.upper) + return false; + return true; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java index a451e53..2e33dcf 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java @@ -24,9 +24,15 @@ import org.apache.kafka.common.KafkaException; public class QuotaViolationException extends KafkaException { private static final long serialVersionUID = 1L; + private final int delayTimeMs; public QuotaViolationException(String m) { - super(m); + this(m, 0); } + public QuotaViolationException(String m, int delayTimeMs) { + this.delayTimeMs = delayTimeMs; + } + + public int getDelayTimeMs() {return delayTimeMs;} } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index ca823fd..3be3dce 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -37,6 +37,7 @@ public final class Sensor { private final List metrics; private final MetricConfig config; private final Time time; + private final long creationTime; Sensor(Metrics registry, String name, Sensor[] parents, MetricConfig config, Time time) { super(); @@ -47,6 +48,7 @@ public final class Sensor { this.stats = new ArrayList(); this.config = config; this.time = time; + this.creationTime = time.milliseconds(); checkForest(new HashSet()); } @@ -112,13 +114,33 @@ public final class Sensor { if (config != null) { Quota quota = config.quota(); if (quota != null) { - if (!quota.acceptable(metric.value(timeMs))) - throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound()); + double value = metric.value(timeMs); + if(timeMs - this.creationTime < config.quotaEnforcementDelayMs()) + continue; + if (!quota.acceptable(value)) + { + throw new QuotaViolationException(String.format( + "(%s) is violating its quota of (%f) with value (%f)", + metric.metricName(), + quota.bound(), + value), + delayTime(metric, timeMs, quota, value, config)); + } } } } } + /* + * This calculates the amount of time needed to bring the metric within quota + * assuming that no new metrics are recorded + */ + private int delayTime(KafkaMetric metric, long timeMs, Quota quota, double metricValue, MetricConfig config) { + double difference = metricValue - quota.bound(); + double time = difference/quota.bound()*config.samples()*config.timeWindowMs(); + return (int) time; + } + /** * Register a compound statistic with this sensor with no config override */ diff --git a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala new file mode 100644 index 0000000..388549c --- /dev/null +++ b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.concurrent.TimeUnit + +import kafka.message.{NoCompressionCodec, CompressionCodec} +import kafka.utils.Logging +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics._ +import org.apache.kafka.common.metrics.stats.{Avg, Max, Rate} + +import scala.collection.mutable + +/** + * Configuration settings for quota management + * @param quotaDelayFactor The quota delay factor modifies any delay by a fixed multiplier (default 1.2) + * @param quotaEnforcementDelaySeconds The Sensor does not report quota violations for this amount of time after + * the sensor is created. This is meant to prevent throttling all clients + * upon server startup + * @param numQuotaSamples The number of sample to retain in memory + * @param quotaWindowSizeSeconds The time span of each sample + * + */ +case class ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond : Long = + ClientQuotaMetricsConfig.DefaultProducerQuotaBytesPerSecond, + defaultConsumerQuotaBytesPerSecond : Long = + ClientQuotaMetricsConfig.DefaultConsumerQuotaBytesPerSecond, + producerQuotaOverrides : String = + ClientQuotaMetricsConfig.DefaultProducerQuotaOverrides, + consumerQuotaOverrides : String = + ClientQuotaMetricsConfig.DefaultConsumerQuotaOverrides, + quotaDelayFactor: Double = + ClientQuotaMetricsConfig.DefaultQuotaDelayFactor, + quotaEnforcementDelaySeconds : Int = + ClientQuotaMetricsConfig.DefaultQuotaEnforcementDelaySeconds, + numQuotaSamples : Int = + ClientQuotaMetricsConfig.DefaultNumQuotaSamples, + quotaWindowSizeSeconds : Int = + ClientQuotaMetricsConfig.DefaultQuotaWindowSizeSeconds) + +object ClientQuotaMetricsConfig { + val DefaultProducerQuotaBytesPerSecond = Long.MaxValue + val DefaultConsumerQuotaBytesPerSecond = Long.MaxValue + val DefaultProducerQuotaOverrides = "" + val DefaultConsumerQuotaOverrides = "" + val DefaultQuotaDelayFactor = 1.2; + val DefaultQuotaEnforcementDelaySeconds = 5; + // Always have 10 whole windows + 1 current window + val DefaultNumQuotaSamples = 11; + val DefaultQuotaWindowSizeSeconds = 1; +} + +object ClientQuotaMetrics { + private val ProducerSensorPrefix = "ProducerQuotaMetrics" + private val ConsumerSensorPrefix = "ConsumerQuotaMetrics" +} + +/** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * @param config @KafkaConfig Configs for the Kafka Server + * @param metrics @Metrics Metrics instance + */ +class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, + private val metrics : Metrics) extends Logging { + private val producerOverriddenQuota = initQuotaMap(config.producerQuotaOverrides) + private val defaultBytesProducedQuota = Quota.lessThan(config.defaultProducerQuotaBytesPerSecond) + private val consumerOverriddenQuota = initQuotaMap(config.consumerQuotaOverrides) + private val defaultBytesConsumedQuota = Quota.lessThan(config.defaultConsumerQuotaBytesPerSecond) + + /** + * Records that a fetch request consumed some data + * @param clientId clientId that fetched the data + * @param bytes amount of data consumed in bytes + * @return Number of milliseconds to delay the response in case of Quota violation. + * Zero otherwise + */ + def recordBytesConsumed(clientId: String, bytes: Int) : Int = { + recordSensor(ClientQuotaMetrics.ConsumerSensorPrefix, clientId, consumerQuota(clientId), bytes) + } + /** + * Records that a produce request wrote some data + * @param clientId clientId that produced the data + * @param bytes amount of data written in bytes + * @return Number of milliseconds to delay the response in case of Quota violation. + * Zero otherwise + */ + def recordBytesProduced(clientId: String, bytes: Int) : Int = { + recordSensor(ClientQuotaMetrics.ProducerSensorPrefix, clientId, producerQuota(clientId), bytes) + } + + /** + * Returns the producer quota for the specified clientId + * @return + */ + def producerQuota(clientId : String) : Quota = { + if(producerOverriddenQuota.contains(clientId)) + producerOverriddenQuota(clientId) + else + defaultBytesProducedQuota + } + + /** + * Returns the consumer quota for the specified clientId + * @return + */ + def consumerQuota(clientId : String) : Quota = { + if(consumerOverriddenQuota.contains(clientId)) + consumerOverriddenQuota(clientId) + else + defaultBytesConsumedQuota + } + + private def recordSensor(sensorPrefix : String, + clientId : String, + quota : Quota, + value : Int) : Int = { + val sensors = getOrCreateQuotaSensors(sensorPrefix, clientId, quota) + var delayTime = 0.0 + try { + sensors._1.record(value) + } catch { + case qve : QuotaViolationException => + delayTime = qve.getDelayTimeMs()*config.quotaDelayFactor + sensors._2.record(delayTime) + logger.warn("Quota violated for sensor (%s). Delay time: (%f)".format(sensors._1, delayTime), qve) + } + delayTime.toInt + } + + private def getOrCreateQuotaSensors(sensorPrefix : String, + clientId : String, + quota : Quota) : (Sensor, Sensor) = { + val quotaSensorName = sensorPrefix + "-" + clientId + val throttleTimeSensorName = sensorPrefix + "ThrottleTime-" + clientId + var sensorVal = metrics.getSensor(quotaSensorName) + if (sensorVal == null) { + this.synchronized { + sensorVal = metrics.getSensor(quotaSensorName) + if (sensorVal == null) { + sensorVal = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota)) + sensorVal.add(new MetricName("byte-rate", + sensorPrefix, + "Tracking byte-rate per client", + "client-id", clientId), + new Rate()) + // create the throttle time sensor also + val throttleTimeSensor = metrics.sensor(throttleTimeSensorName) + throttleTimeSensor.add(new MetricName("throttle-time-max", + sensorPrefix, + "Tracking throttle-time per client", + "client-id", clientId), + new Max(), + new MetricConfig()) + throttleTimeSensor.add(new MetricName("throttle-time-avg", + sensorPrefix, + "Tracking throttle-time per client", + "client-id", clientId), + new Avg(), + new MetricConfig()) + } + } + } + (sensorVal, metrics.sensor(throttleTimeSensorName)) + } + + private def getQuotaMetricConfig(quota : Quota) : MetricConfig = { + new MetricConfig() + .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS) + .samples(config.numQuotaSamples) + .quotaEnforcementDelay(config.quotaEnforcementDelaySeconds, TimeUnit.SECONDS) + .quota(quota) + } + + private def initQuotaMap(input : String) : mutable.Map[String, Quota] = { + val output = mutable.Map[String, Quota]() + for(entry <- input.split(",")) { + val trimmedEntry = entry.trim + if(!trimmedEntry.equals("")) { + val pair: Array[String] = trimmedEntry.split("=") + if (pair.length != 2) + throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format( + entry)) + output(pair(0)) = new Quota(pair(1).toDouble, true) + } + } + output + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 69b772c..f587cac 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -123,6 +123,16 @@ object Defaults { val OffsetCommitTimeoutMs = OffsetManagerConfig.DefaultOffsetCommitTimeoutMs val OffsetCommitRequiredAcks = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks + /** ********* Quota Configuration ***********/ + val ProducerQuotaDefaultBytesPerSecond = ClientQuotaMetricsConfig.DefaultProducerQuotaBytesPerSecond + val ConsumerQuotaDefaultBytesPerSecond = ClientQuotaMetricsConfig.DefaultConsumerQuotaBytesPerSecond + val ProducerQuotaOverrides = ClientQuotaMetricsConfig.DefaultProducerQuotaOverrides + val ConsumerQuotaOverrides = ClientQuotaMetricsConfig.DefaultConsumerQuotaOverrides + val QuotaDelayFactor : Double = ClientQuotaMetricsConfig.DefaultQuotaDelayFactor + val QuotaEnforcementDelaySeconds : Int = ClientQuotaMetricsConfig.DefaultQuotaEnforcementDelaySeconds + val NumQuotaSamples : Int = ClientQuotaMetricsConfig.DefaultNumQuotaSamples + val QuotaWindowSizeSeconds : Int = ClientQuotaMetricsConfig.DefaultQuotaWindowSizeSeconds + val DeleteTopicEnable = false val CompressionType = "producer" @@ -230,11 +240,19 @@ object KafkaConfig { val OffsetsRetentionCheckIntervalMsProp = "offsets.retention.check.interval.ms" val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms" val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks" + /** ********* Quota Configuration ***********/ + val ProducerQuotaDefaultBytesPerSecondProp = "quota.producer.default" + val ConsumerQuotaDefaultBytesPerSecondProp = "quota.consumer.default" + val ProducerQuotaOverridesProp = "quota.producer.overrides" + val ConsumerQuotaOverridesProp = "quota.consumer.overrides" + val QuotaDelayFactorProp = "quota.delay.factor" + val QuotaEnforcementDelaySecondsProp = "quota.delay.enforcement.seconds" + val NumQuotaSamplesProp = "quota.window.num" + val QuotaWindowSizeSecondsProp = "quota.window.size.seconds" val DeleteTopicEnableProp = "delete.topic.enable" val CompressionTypeProp = "compression.type" - /* Documentation */ /** ********* Zookeeper Configuration ***********/ val ZkConnectDoc = "Zookeeper host string" @@ -360,6 +378,18 @@ object KafkaConfig { val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + "or this timeout is reached. This is similar to the producer request timeout." val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden" + /** ********* Quota Configuration ***********/ + val ProducerQuotaDefaultBytesPerSecondDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" + val ConsumerQuotaDefaultBytesPerSecondDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" + val ProducerQuotaOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default producer quota. " + + "Example: clientIdX=10485760,clientIdY=10485760" + val ConsumerQuotaOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default consumer quota. " + + "Example: clientIdX=10485760,clientIdY=10485760" + val QuotaDelayFactorDoc = "The quota delay factor modifies any delay by a fixed multiplier (default 1.2)" + val QuotaEnforcementDelaySecondsDoc = "The Sensor does not report quota violations for this amount of time after" + + " the sensor is created. This is meant to prevent throttling all clients upon server startup" + val NumQuotaSamplesDoc = "The number of samples to retain in memory" + val QuotaWindowSizeSecondsDoc = "The time span of each sample" val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + @@ -481,6 +511,16 @@ object KafkaConfig { .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc) .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc) .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc) + + /** ********* Quota configuration ***********/ + .define(ProducerQuotaDefaultBytesPerSecondProp, LONG, Defaults.ProducerQuotaDefaultBytesPerSecond, HIGH, ProducerQuotaDefaultBytesPerSecondDoc) + .define(ConsumerQuotaDefaultBytesPerSecondProp, LONG, Defaults.ConsumerQuotaDefaultBytesPerSecond, HIGH, ConsumerQuotaDefaultBytesPerSecondDoc) + .define(ProducerQuotaOverridesProp, STRING, Defaults.ProducerQuotaOverrides, HIGH, ProducerQuotaOverridesDoc) + .define(ConsumerQuotaOverridesProp, STRING, Defaults.ConsumerQuotaOverrides, HIGH, ConsumerQuotaOverridesDoc) + .define(QuotaDelayFactorProp, DOUBLE, Defaults.QuotaDelayFactor, LOW, QuotaDelayFactorDoc) + .define(QuotaEnforcementDelaySecondsProp, INT, Defaults.QuotaEnforcementDelaySeconds, LOW, QuotaEnforcementDelaySecondsProp) + .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, LOW, NumQuotaSamplesDoc) + .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, LOW, QuotaWindowSizeSecondsDoc) } def configNames() = { @@ -600,6 +640,11 @@ object KafkaConfig { offsetsRetentionCheckIntervalMs = parsed.get(OffsetsRetentionCheckIntervalMsProp).asInstanceOf[Long], offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int], offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short], + /** ********* Quota configuration ***********/ + producerQuotaDefaultBytesPerSecond = parsed.get(ProducerQuotaDefaultBytesPerSecondProp).asInstanceOf[Long], + consumerQuotaDefaultBytesPerSecond = parsed.get(ConsumerQuotaDefaultBytesPerSecondProp).asInstanceOf[Long], + producerQuotaOverrides = parsed.get(ProducerQuotaOverridesProp).asInstanceOf[String], + consumerQuotaOverrides = parsed.get(ConsumerQuotaOverridesProp).asInstanceOf[String], deleteTopicEnable = parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean], compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String] ) @@ -745,6 +790,11 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val offsetCommitTimeoutMs: Int = Defaults.OffsetCommitTimeoutMs, val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks, + /** ********* Quota configuration ***********/ + val producerQuotaDefaultBytesPerSecond: Long = Defaults.ProducerQuotaDefaultBytesPerSecond, + val consumerQuotaDefaultBytesPerSecond: Long = Defaults.ConsumerQuotaDefaultBytesPerSecond, + val producerQuotaOverrides : String = Defaults.ProducerQuotaOverrides, + val consumerQuotaOverrides : String = Defaults.ConsumerQuotaOverrides, val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable, val compressionType: String = Defaults.CompressionType ) { @@ -961,6 +1011,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(OffsetsRetentionCheckIntervalMsProp, offsetsRetentionCheckIntervalMs.toString) props.put(OffsetCommitTimeoutMsProp, offsetCommitTimeoutMs.toString) props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString) + props.put(ProducerQuotaDefaultBytesPerSecondProp, producerQuotaDefaultBytesPerSecond.toString) + props.put(ConsumerQuotaDefaultBytesPerSecondProp, consumerQuotaDefaultBytesPerSecond.toString) + props.put(ProducerQuotaOverridesProp, producerQuotaOverrides.toString) + props.put(ConsumerQuotaOverridesProp, consumerQuotaOverrides.toString) props.put(DeleteTopicEnableProp, deleteTopicEnable.toString) props.put(CompressionTypeProp, compressionType.toString) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c63f4ba..ab8b6a3 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -27,6 +27,9 @@ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File import collection.mutable +import org.apache.kafka.common.metrics.{Metrics, JmxReporter, MetricsReporter, MetricConfig} + +import scala.collection.{JavaConversions, mutable} import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} @@ -72,7 +75,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val metadataCache: MetadataCache = new MetadataCache(config.brokerId) - + val metrics: Metrics = initMetrics() var zkClient: ZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" @@ -184,6 +187,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg } } + private def initMetrics() : Metrics = { + val jmxPrefix: String = "kafka.server" + val reporters: Seq[MetricsReporter] = mutable.Seq(new JmxReporter(jmxPrefix)) + info("Initiated metrics") + + new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + JavaConversions.seqAsJavaList(reporters), + new org.apache.kafka.common.utils.SystemTime) + } + private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala new file mode 100644 index 0000000..eea7a1e --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.Collections + +import kafka.utils.TestUtils +import org.apache.kafka.common.metrics.{Quota, JmxReporter, Metrics, MetricConfig} +import org.apache.kafka.common.utils.MockTime +import org.scalatest.junit.JUnit3Suite +import org.junit.{Test, Assert} + +class ClientQuotaMetricsTest extends JUnit3Suite { + private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + new MockTime) + private val config = ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond = 500, + defaultConsumerQuotaBytesPerSecond = 1000, + producerQuotaOverrides = "p1=2000,p2=4000", + consumerQuotaOverrides = "c1=2000,c2=4000") + + @Test + def testQuotaParsing() { + val props = TestUtils.createBrokerConfig(1) + props.put(KafkaConfig.ProducerQuotaDefaultBytesPerSecondProp, "500") + props.put(KafkaConfig.ProducerQuotaOverridesProp, "p1=2000,p2=4000") + props.put(KafkaConfig.ConsumerQuotaDefaultBytesPerSecondProp, "1000") + props.put(KafkaConfig.ConsumerQuotaOverridesProp, "c1=2000,c2=4000") + val clientMetrics = new ClientQuotaMetrics(config, metrics) + Assert.assertEquals("Default producer quota should be 500", + new Quota(500, true), clientMetrics.producerQuota("random-client-id")) + Assert.assertEquals("Default consumer quota should be 1000", + new Quota(1000, true), clientMetrics.consumerQuota("random-client-id")) + Assert.assertEquals("Should return the overridden value (2000)", + new Quota(2000, true), clientMetrics.producerQuota("p1")) + Assert.assertEquals("Should return the overridden value (4000)", + new Quota(4000, true), clientMetrics.producerQuota("p2")) + Assert.assertEquals("Should return the overridden value (2000)", + new Quota(2000, true), clientMetrics.consumerQuota("c1")) + Assert.assertEquals("Should return the overridden value (4000)", + new Quota(4000, true), clientMetrics.consumerQuota("c2")) + } + + @Test + def testProducerQuotaViolation() { + val time = new MockTime + val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + time) + val clientMetrics = new ClientQuotaMetrics(config, metrics) + /* We have 10 second windows. Make sure that there is no quota violation + * if we produce under the quota + */ + for(i <- 0 until 10) { + Assert.assertEquals(0, clientMetrics.recordBytesProduced("unknown", 400)) + time.sleep(1000) + } + + // Create a spike. + val sleepTime = clientMetrics.recordBytesProduced("unknown", 2000); + // 400*10 + 2000 = 6000/10 = 600 bytes per second. + // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200*1.2 = 2640 + Assert.assertEquals("Should be throttled", 2640, sleepTime) + time.sleep(sleepTime) + // At the end of sleep, the + Assert.assertEquals("Should be unthrottled again", 0, clientMetrics.recordBytesProduced("unknown", 0)) + } + + @Test + def testConsumerQuotaViolation() { + val time = new MockTime + val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + time) + + val clientMetrics = new ClientQuotaMetrics(config, metrics) + for(i <- 0 until 10) { + Assert.assertEquals(0, clientMetrics.recordBytesConsumed("c1", 1600)) + time.sleep(1000) + } + + // Create a spike. + val sleepTime = clientMetrics.recordBytesConsumed("c1", 8000); + // 1600*10 + 8000 = 24000/10 = 2400 bytes per second. + // (2400 - quota)/quota*window-size = (2400-2000)/2000*11 seconds = 2200*1.2 = 2640 + Assert.assertEquals("Should be throttled", 2640, sleepTime) + time.sleep(sleepTime) + // At the end of sleep, the + Assert.assertEquals("Should be unthrottled again", 0, clientMetrics.recordBytesConsumed("c1", 0)) + } + +} -- 1.7.12.4 From 73bc182a26e47f57763a64e66ba6e1d37e702978 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 9 Apr 2015 18:10:21 -0700 Subject: [PATCH 2/5] Added more testcases --- .../scala/kafka/server/ClientQuotaMetrics.scala | 11 ++- .../unit/kafka/server/ClientQuotaMetricsTest.scala | 88 ++++++++++++++++++---- 2 files changed, 80 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala index 388549c..5ec7ba7 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala @@ -18,7 +18,6 @@ package kafka.server import java.util.concurrent.TimeUnit -import kafka.message.{NoCompressionCodec, CompressionCodec} import kafka.utils.Logging import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics._ @@ -58,11 +57,11 @@ object ClientQuotaMetricsConfig { val DefaultConsumerQuotaBytesPerSecond = Long.MaxValue val DefaultProducerQuotaOverrides = "" val DefaultConsumerQuotaOverrides = "" - val DefaultQuotaDelayFactor = 1.2; - val DefaultQuotaEnforcementDelaySeconds = 5; + val DefaultQuotaDelayFactor = 1.2 + val DefaultQuotaEnforcementDelaySeconds = 5 // Always have 10 whole windows + 1 current window - val DefaultNumQuotaSamples = 11; - val DefaultQuotaWindowSizeSeconds = 1; + val DefaultNumQuotaSamples = 11 + val DefaultQuotaWindowSizeSeconds = 1 } object ClientQuotaMetrics { @@ -136,7 +135,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, sensors._1.record(value) } catch { case qve : QuotaViolationException => - delayTime = qve.getDelayTimeMs()*config.quotaDelayFactor + delayTime = qve.getDelayTimeMs*config.quotaDelayFactor sensors._2.record(delayTime) logger.warn("Quota violated for sensor (%s). Delay time: (%f)".format(sensors._1, delayTime), qve) } diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala index eea7a1e..bcd86c1 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala @@ -18,8 +18,7 @@ package kafka.server import java.util.Collections -import kafka.utils.TestUtils -import org.apache.kafka.common.metrics.{Quota, JmxReporter, Metrics, MetricConfig} +import org.apache.kafka.common.metrics.{Quota, MetricConfig} import org.apache.kafka.common.utils.MockTime import org.scalatest.junit.JUnit3Suite import org.junit.{Test, Assert} @@ -28,18 +27,13 @@ class ClientQuotaMetricsTest extends JUnit3Suite { private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), Collections.emptyList(), new MockTime) - private val config = ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond = 500, + private val config = ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond = 500, defaultConsumerQuotaBytesPerSecond = 1000, - producerQuotaOverrides = "p1=2000,p2=4000", + producerQuotaOverrides = "p1=2000,p2=4000", consumerQuotaOverrides = "c1=2000,c2=4000") @Test def testQuotaParsing() { - val props = TestUtils.createBrokerConfig(1) - props.put(KafkaConfig.ProducerQuotaDefaultBytesPerSecondProp, "500") - props.put(KafkaConfig.ProducerQuotaOverridesProp, "p1=2000,p2=4000") - props.put(KafkaConfig.ConsumerQuotaDefaultBytesPerSecondProp, "1000") - props.put(KafkaConfig.ConsumerQuotaOverridesProp, "c1=2000,c2=4000") val clientMetrics = new ClientQuotaMetrics(config, metrics) Assert.assertEquals("Default producer quota should be 500", new Quota(500, true), clientMetrics.producerQuota("random-client-id")) @@ -71,9 +65,9 @@ class ClientQuotaMetricsTest extends JUnit3Suite { } // Create a spike. - val sleepTime = clientMetrics.recordBytesProduced("unknown", 2000); + val sleepTime = clientMetrics.recordBytesProduced("unknown", 2000) // 400*10 + 2000 = 6000/10 = 600 bytes per second. - // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200*1.2 = 2640 + // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200*1.2 delayfactor = 2640 Assert.assertEquals("Should be throttled", 2640, sleepTime) time.sleep(sleepTime) // At the end of sleep, the @@ -82,6 +76,7 @@ class ClientQuotaMetricsTest extends JUnit3Suite { @Test def testConsumerQuotaViolation() { + val time = new MockTime val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), Collections.emptyList(), @@ -94,13 +89,80 @@ class ClientQuotaMetricsTest extends JUnit3Suite { } // Create a spike. - val sleepTime = clientMetrics.recordBytesConsumed("c1", 8000); + val sleepTime = clientMetrics.recordBytesConsumed("c1", 8000) // 1600*10 + 8000 = 24000/10 = 2400 bytes per second. - // (2400 - quota)/quota*window-size = (2400-2000)/2000*11 seconds = 2200*1.2 = 2640 + // (2400 - quota)/quota*window-size = (2400-2000)/2000*11 seconds = 2200*1.2 delayfactor = 2640 Assert.assertEquals("Should be throttled", 2640, sleepTime) time.sleep(sleepTime) // At the end of sleep, the Assert.assertEquals("Should be unthrottled again", 0, clientMetrics.recordBytesConsumed("c1", 0)) } + @Test + def testOverrideParse() { + val time = new MockTime + val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + time) + // Case 1 - Default config + var testConfig = ClientQuotaMetricsConfig() + var clientMetrics = new ClientQuotaMetrics(testConfig, metrics) + Assert.assertEquals(new Quota(ClientQuotaMetricsConfig.DefaultProducerQuotaBytesPerSecond, true), + clientMetrics.producerQuota("p1")) + Assert.assertEquals(new Quota(ClientQuotaMetricsConfig.DefaultConsumerQuotaBytesPerSecond, true), + clientMetrics.consumerQuota("p1")) + + // Case 2 - Empty override + testConfig = ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond = 500, + defaultConsumerQuotaBytesPerSecond = 1000, + producerQuotaOverrides = "p1=2000,p2=4000,,", + consumerQuotaOverrides = "c1=2000,c2=4000") + + clientMetrics = new ClientQuotaMetrics(testConfig, metrics) + Assert.assertEquals(new Quota(2000, true), clientMetrics.producerQuota("p1")) + Assert.assertEquals(new Quota(4000, true), clientMetrics.producerQuota("p2")) + + // Case 3 - NumberFormatException for producer override + testConfig = ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond = 500, + defaultConsumerQuotaBytesPerSecond = 1000, + producerQuotaOverrides = "p1=2000,p2=4000,p3=p4", + consumerQuotaOverrides = "c1=2000,c2=4000") + try { + clientMetrics = new ClientQuotaMetrics(testConfig, metrics) + Assert.fail("Should fail to parse invalid config " + testConfig.producerQuotaOverrides) + } + catch { + // Swallow. + case nfe : NumberFormatException => + } + + // Case 4 - NumberFormatException for consumer override + testConfig = ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond = 500, + defaultConsumerQuotaBytesPerSecond = 1000, + producerQuotaOverrides = "p1=2000,p2=4000", + consumerQuotaOverrides = "c1=2000,c2=4000,c3=c4") + try { + clientMetrics = new ClientQuotaMetrics(testConfig, metrics) + Assert.fail("Should fail to parse invalid config " + testConfig.consumerQuotaOverrides) + } + catch { + // Swallow. + case nfe : NumberFormatException => + } + + // Case 5 - IllegalArgumentException for producer override + testConfig = ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond = 500, + defaultConsumerQuotaBytesPerSecond = 1000, + producerQuotaOverrides = "p1=2000=3000", + consumerQuotaOverrides = "c1=2000,c2=4000") + try { + clientMetrics = new ClientQuotaMetrics(testConfig, metrics) + Assert.fail("Should fail to parse invalid config " + testConfig.producerQuotaOverrides) + } + catch { + // Swallow. + case nfe : IllegalArgumentException => + } + + } } -- 1.7.12.4 From 28322372686a665bb3898e453ad6b33eb31d508f Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Fri, 10 Apr 2015 17:24:06 -0700 Subject: [PATCH 3/5] Some locking changes for reading/creating the sensors --- .../scala/kafka/server/ClientQuotaMetrics.scala | 96 +++++++++++++++------- 1 file changed, 68 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala index 5ec7ba7..c4ea832 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala @@ -22,6 +22,9 @@ import kafka.utils.Logging import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.{Avg, Max, Rate} +import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock import scala.collection.mutable @@ -36,21 +39,21 @@ import scala.collection.mutable * */ case class ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond : Long = - ClientQuotaMetricsConfig.DefaultProducerQuotaBytesPerSecond, + ClientQuotaMetricsConfig.DefaultProducerQuotaBytesPerSecond, defaultConsumerQuotaBytesPerSecond : Long = - ClientQuotaMetricsConfig.DefaultConsumerQuotaBytesPerSecond, + ClientQuotaMetricsConfig.DefaultConsumerQuotaBytesPerSecond, producerQuotaOverrides : String = - ClientQuotaMetricsConfig.DefaultProducerQuotaOverrides, + ClientQuotaMetricsConfig.DefaultProducerQuotaOverrides, consumerQuotaOverrides : String = - ClientQuotaMetricsConfig.DefaultConsumerQuotaOverrides, + ClientQuotaMetricsConfig.DefaultConsumerQuotaOverrides, quotaDelayFactor: Double = - ClientQuotaMetricsConfig.DefaultQuotaDelayFactor, + ClientQuotaMetricsConfig.DefaultQuotaDelayFactor, quotaEnforcementDelaySeconds : Int = - ClientQuotaMetricsConfig.DefaultQuotaEnforcementDelaySeconds, + ClientQuotaMetricsConfig.DefaultQuotaEnforcementDelaySeconds, numQuotaSamples : Int = - ClientQuotaMetricsConfig.DefaultNumQuotaSamples, + ClientQuotaMetricsConfig.DefaultNumQuotaSamples, quotaWindowSizeSeconds : Int = - ClientQuotaMetricsConfig.DefaultQuotaWindowSizeSeconds) + ClientQuotaMetricsConfig.DefaultQuotaWindowSizeSeconds) object ClientQuotaMetricsConfig { val DefaultProducerQuotaBytesPerSecond = Long.MaxValue @@ -81,6 +84,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, private val defaultBytesProducedQuota = Quota.lessThan(config.defaultProducerQuotaBytesPerSecond) private val consumerOverriddenQuota = initQuotaMap(config.consumerQuotaOverrides) private val defaultBytesConsumedQuota = Quota.lessThan(config.defaultConsumerQuotaBytesPerSecond) + private val lock = new ReentrantReadWriteLock() /** * Records that a fetch request consumed some data @@ -100,7 +104,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, * Zero otherwise */ def recordBytesProduced(clientId: String, bytes: Int) : Int = { - recordSensor(ClientQuotaMetrics.ProducerSensorPrefix, clientId, producerQuota(clientId), bytes) + recordSensor(ClientQuotaMetrics.ProducerSensorPrefix, clientId, producerQuota(clientId), bytes) } /** @@ -142,40 +146,76 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, delayTime.toInt } + /* + * This function either returns the sensors for a given client id or creates them if they don't exist + * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor + */ private def getOrCreateQuotaSensors(sensorPrefix : String, clientId : String, quota : Quota) : (Sensor, Sensor) = { + + // Names of the sensors to access val quotaSensorName = sensorPrefix + "-" + clientId val throttleTimeSensorName = sensorPrefix + "ThrottleTime-" + clientId - var sensorVal = metrics.getSensor(quotaSensorName) - if (sensorVal == null) { - this.synchronized { - sensorVal = metrics.getSensor(quotaSensorName) - if (sensorVal == null) { - sensorVal = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota)) - sensorVal.add(new MetricName("byte-rate", - sensorPrefix, - "Tracking byte-rate per client", - "client-id", clientId), - new Rate()) + var quotaSensor : Sensor = null + var throttleTimeSensor : Sensor = null + + /* Acquire the read lock to fetch the sensors. It is safe to call getSensor from multiple threads. + * The read lock allows a thread to create a sensor in isolation. The thread creating the sensor + * will acquire the write lock and prevent the sensors from being read while they are being created. + * It should be sufficient to simply check if the sensor is null without acquiring a read lock but the + * sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added. + * This read lock waits until the writer thread has released it's lock i.e. fully initialized the sensor + * at which point it is safe to read + */ + lock.readLock().lock() + try { + quotaSensor = metrics.getSensor(quotaSensorName) + throttleTimeSensor = metrics.getSensor(throttleTimeSensorName) + } + finally { + lock.readLock().unlock() + } + + /* If the sensor is null, try to create it else return the created sensor + * Also if quota sensor is null, the throttle time sensor must be null + */ + if(quotaSensor == null) { + /* Acquire a write lock because the sensor may not have been created and we only want one thread to create it. + * Note that multiple threads may acquire the write lock if they all see a null sensor initially + * In this case, the writer checks the sensor after acquiring the lock again. + * This is safe from Double Checked Locking because the references are read + * after acquiring read locks and hence they cannot see a partially published reference + */ + lock.writeLock().lock() + try { + quotaSensor = metrics.getSensor(quotaSensorName) + if(quotaSensor == null) { // create the throttle time sensor also - val throttleTimeSensor = metrics.sensor(throttleTimeSensorName) + throttleTimeSensor = metrics.sensor(throttleTimeSensorName) throttleTimeSensor.add(new MetricName("throttle-time-max", sensorPrefix, "Tracking throttle-time per client", - "client-id", clientId), - new Max(), - new MetricConfig()) + "client-id", + clientId), new Max(), new MetricConfig()) throttleTimeSensor.add(new MetricName("throttle-time-avg", sensorPrefix, "Tracking throttle-time per client", - "client-id", clientId), - new Avg(), - new MetricConfig()) + "client-id", + clientId), new Avg(), new MetricConfig()) + quotaSensor = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota)) + quotaSensor.add(new MetricName("byte-rate", + sensorPrefix, + "Tracking byte-rate per client", + "client-id", + clientId), new Rate()) } + } finally { + lock.writeLock().unlock() } } - (sensorVal, metrics.sensor(throttleTimeSensorName)) + // return the read or created sensors + (quotaSensor, throttleTimeSensor) } private def getQuotaMetricConfig(quota : Quota) : MetricConfig = { -- 1.7.12.4 From bd2fb0d6e495b1774e5f2e79483d09922f379393 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 21 Apr 2015 12:20:32 -0700 Subject: [PATCH 4/5] WIP patch --- .../apache/kafka/common/metrics/MetricConfig.java | 15 +++++----- .../common/metrics/QuotaViolationException.java | 4 ++- .../org/apache/kafka/common/metrics/Sensor.java | 2 +- .../scala/kafka/server/ClientQuotaMetrics.scala | 35 ++++++++++++---------- .../main/scala/kafka/server/ReplicaManager.scala | 1 - 5 files changed, 31 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java index e45692b..db349e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java @@ -28,7 +28,7 @@ public class MetricConfig { private long eventWindow; private long timeWindowMs; private TimeUnit unit; - private long quotaEnforcementDelayMs; + private long quotaEnforcementBlackoutMs; public MetricConfig() { super(); @@ -39,7 +39,7 @@ public class MetricConfig { this.unit = TimeUnit.SECONDS; // By default quota checking is disabled for the first window. // This builds up of sufficient data initially before making quota enforcement decisions - this.quotaEnforcementDelayMs = timeWindowMs; + this.quotaEnforcementBlackoutMs = timeWindowMs; } public Quota quota() { @@ -73,17 +73,16 @@ public class MetricConfig { * Disables quota enforcement on this metric for this certain period of time * after the metric is first created */ - public MetricConfig quotaEnforcementDelay(long time, TimeUnit unit) { - this.quotaEnforcementDelayMs = TimeUnit.MILLISECONDS.convert(time, unit); + public MetricConfig quotaEnforcementBlackout(long time, TimeUnit unit) { + this.quotaEnforcementBlackoutMs = TimeUnit.MILLISECONDS.convert(time, unit); return this; } - public long quotaEnforcementDelayMs() { - return quotaEnforcementDelayMs; + public long quotaEnforcementBlackoutMs() { + return quotaEnforcementBlackoutMs; } - - public int samples() { + public int samples() { return this.samples; } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java index 2e33dcf..9c929ea 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java @@ -24,15 +24,17 @@ import org.apache.kafka.common.KafkaException; public class QuotaViolationException extends KafkaException { private static final long serialVersionUID = 1L; - private final int delayTimeMs; + private int delayTimeMs; public QuotaViolationException(String m) { this(m, 0); } public QuotaViolationException(String m, int delayTimeMs) { + super(m); this.delayTimeMs = delayTimeMs; } + public void setDelayTimeMs(int delayTimeMs) { this.delayTimeMs = delayTimeMs; } public int getDelayTimeMs() {return delayTimeMs;} } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 3be3dce..565f101 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -115,7 +115,7 @@ public final class Sensor { Quota quota = config.quota(); if (quota != null) { double value = metric.value(timeMs); - if(timeMs - this.creationTime < config.quotaEnforcementDelayMs()) + if(timeMs - this.creationTime < config.quotaEnforcementBlackoutMs()) continue; if (!quota.acceptable(value)) { diff --git a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala index c4ea832..aebe93f 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaMetrics.scala @@ -39,21 +39,21 @@ import scala.collection.mutable * */ case class ClientQuotaMetricsConfig(defaultProducerQuotaBytesPerSecond : Long = - ClientQuotaMetricsConfig.DefaultProducerQuotaBytesPerSecond, + ClientQuotaMetricsConfig.DefaultProducerQuotaBytesPerSecond, defaultConsumerQuotaBytesPerSecond : Long = - ClientQuotaMetricsConfig.DefaultConsumerQuotaBytesPerSecond, + ClientQuotaMetricsConfig.DefaultConsumerQuotaBytesPerSecond, producerQuotaOverrides : String = - ClientQuotaMetricsConfig.DefaultProducerQuotaOverrides, + ClientQuotaMetricsConfig.DefaultProducerQuotaOverrides, consumerQuotaOverrides : String = - ClientQuotaMetricsConfig.DefaultConsumerQuotaOverrides, + ClientQuotaMetricsConfig.DefaultConsumerQuotaOverrides, quotaDelayFactor: Double = - ClientQuotaMetricsConfig.DefaultQuotaDelayFactor, + ClientQuotaMetricsConfig.DefaultQuotaDelayFactor, quotaEnforcementDelaySeconds : Int = - ClientQuotaMetricsConfig.DefaultQuotaEnforcementDelaySeconds, + ClientQuotaMetricsConfig.DefaultQuotaEnforcementDelaySeconds, numQuotaSamples : Int = - ClientQuotaMetricsConfig.DefaultNumQuotaSamples, + ClientQuotaMetricsConfig.DefaultNumQuotaSamples, quotaWindowSizeSeconds : Int = - ClientQuotaMetricsConfig.DefaultQuotaWindowSizeSeconds) + ClientQuotaMetricsConfig.DefaultQuotaWindowSizeSeconds) object ClientQuotaMetricsConfig { val DefaultProducerQuotaBytesPerSecond = Long.MaxValue @@ -86,6 +86,10 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, private val defaultBytesConsumedQuota = Quota.lessThan(config.defaultConsumerQuotaBytesPerSecond) private val lock = new ReentrantReadWriteLock() + def record(key : String, clientId: String, value : Int) = { + recordSensor(ClientQuotaMetrics.ConsumerSensorPrefix, clientId, consumerQuota(clientId), value) + } + /** * Records that a fetch request consumed some data * @param clientId clientId that fetched the data @@ -93,7 +97,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, * @return Number of milliseconds to delay the response in case of Quota violation. * Zero otherwise */ - def recordBytesConsumed(clientId: String, bytes: Int) : Int = { + def recordBytesConsumed(clientId: String, bytes: Int) = { recordSensor(ClientQuotaMetrics.ConsumerSensorPrefix, clientId, consumerQuota(clientId), bytes) } /** @@ -103,7 +107,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, * @return Number of milliseconds to delay the response in case of Quota violation. * Zero otherwise */ - def recordBytesProduced(clientId: String, bytes: Int) : Int = { + def recordBytesProduced(clientId: String, bytes: Int) = { recordSensor(ClientQuotaMetrics.ProducerSensorPrefix, clientId, producerQuota(clientId), bytes) } @@ -111,7 +115,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, * Returns the producer quota for the specified clientId * @return */ - def producerQuota(clientId : String) : Quota = { + private[server] def producerQuota(clientId : String) : Quota = { if(producerOverriddenQuota.contains(clientId)) producerOverriddenQuota(clientId) else @@ -122,7 +126,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, * Returns the consumer quota for the specified clientId * @return */ - def consumerQuota(clientId : String) : Quota = { + private[server] def consumerQuota(clientId : String) : Quota = { if(consumerOverriddenQuota.contains(clientId)) consumerOverriddenQuota(clientId) else @@ -142,6 +146,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, delayTime = qve.getDelayTimeMs*config.quotaDelayFactor sensors._2.record(delayTime) logger.warn("Quota violated for sensor (%s). Delay time: (%f)".format(sensors._1, delayTime), qve) + throw qve } delayTime.toInt } @@ -197,12 +202,12 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, sensorPrefix, "Tracking throttle-time per client", "client-id", - clientId), new Max(), new MetricConfig()) + clientId), new Max()) throttleTimeSensor.add(new MetricName("throttle-time-avg", sensorPrefix, "Tracking throttle-time per client", "client-id", - clientId), new Avg(), new MetricConfig()) + clientId), new Avg()) quotaSensor = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota)) quotaSensor.add(new MetricName("byte-rate", sensorPrefix, @@ -222,7 +227,7 @@ class ClientQuotaMetrics(private val config : ClientQuotaMetricsConfig, new MetricConfig() .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS) .samples(config.numQuotaSamples) - .quotaEnforcementDelay(config.quotaEnforcementDelaySeconds, TimeUnit.SECONDS) + .quotaEnforcementBlackout(config.quotaEnforcementDelaySeconds, TimeUnit.SECONDS) .quota(quota) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8ddd325..473be21 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -279,7 +279,6 @@ class ReplicaManager(val config: KafkaConfig, internalTopicsAllowed: Boolean, messagesPerPartition: Map[TopicAndPartition, MessageSet], responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { - if (isValidRequiredAcks(requiredAcks)) { val sTime = SystemTime.milliseconds -- 1.7.12.4 From 4eb5c5147503caa9ac5821330dc57bbbff85ad2d Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 21 Apr 2015 12:27:52 -0700 Subject: [PATCH 5/5] Sample usage in ReplicaManager --- .../scala/kafka/server/ClientQuotaMetrics2.scala | 200 +++++++++++++++++++++ .../main/scala/kafka/server/ReplicaManager.scala | 21 +++ .../kafka/server/ClientQuotaMetricsTest2.scala | 121 +++++++++++++ 3 files changed, 342 insertions(+) create mode 100644 core/src/main/scala/kafka/server/ClientQuotaMetrics2.scala create mode 100644 core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest2.scala diff --git a/core/src/main/scala/kafka/server/ClientQuotaMetrics2.scala b/core/src/main/scala/kafka/server/ClientQuotaMetrics2.scala new file mode 100644 index 0000000..bfaa694 --- /dev/null +++ b/core/src/main/scala/kafka/server/ClientQuotaMetrics2.scala @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.concurrent.TimeUnit + +import kafka.utils.Logging +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics._ +import org.apache.kafka.common.metrics.stats.{Avg, Max, Rate} +import java.util.concurrent.locks.ReentrantReadWriteLock + +import scala.collection.mutable + +/** + * Configuration settings for quota management + * @param quotaDelayFactor The quota delay factor modifies any delay by a fixed multiplier (default 1.2) + * @param quotaEnforcementDelaySeconds The Sensor does not report quota violations for this amount of time after + * the sensor is created. This is meant to prevent throttling all clients + * upon server startup + * @param numQuotaSamples The number of sample to retain in memory + * @param quotaWindowSizeSeconds The time span of each sample + * + */ +case class ClientQuotaMetricsConfig2(defaultQuotaPerSecond : Long = + ClientQuotaMetricsConfig2.DefaultQuotaPerSecond, + quotaOverrides : String = + ClientQuotaMetricsConfig2.QuotaOverrides, + quotaDelayFactor: Double = + ClientQuotaMetricsConfig2.DefaultQuotaDelayFactor, + quotaEnforcementDelaySeconds : Int = + ClientQuotaMetricsConfig2.DefaultQuotaEnforcementDelaySeconds, + numQuotaSamples : Int = + ClientQuotaMetricsConfig2.DefaultNumQuotaSamples, + quotaWindowSizeSeconds : Int = + ClientQuotaMetricsConfig2.DefaultQuotaWindowSizeSeconds) + +object ClientQuotaMetricsConfig2 { + val DefaultQuotaPerSecond = Long.MaxValue + val QuotaOverrides = "" + val DefaultQuotaDelayFactor = 1.2 + val DefaultQuotaEnforcementDelaySeconds = 5 + // Always have 10 whole windows + 1 current window + val DefaultNumQuotaSamples = 11 + val DefaultQuotaWindowSizeSeconds = 1 +} + +/** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * @param config @KafkaConfig Configs for the Kafka Server + * @param metrics @Metrics Metrics instance + */ +class ClientQuotaMetrics2(private val config : ClientQuotaMetricsConfig2, + private val metrics : Metrics, + private val apiKey : String) extends Logging { + private val overriddenQuota = initQuotaMap(config.quotaOverrides) + private val defaultQuota = Quota.lessThan(config.defaultQuotaPerSecond) + private val lock = new ReentrantReadWriteLock() + + /** + * Records that a produce request wrote some data + * @param clientId clientId that produced the data + * @param value amount of data written in bytes + * @return Number of milliseconds to delay the response in case of Quota violation. + * Zero otherwise + */ + def record(clientId: String, value : Int) = { + val sensors = getOrCreateQuotaSensors(clientId) + var delayTime = 0.0 + try { + sensors._1.record(value) + } catch { + case qve : QuotaViolationException => + delayTime = qve.getDelayTimeMs*config.quotaDelayFactor + qve.setDelayTimeMs(delayTime.toInt) + sensors._2.record(delayTime) + logger.warn("Quota violated for sensor (%s). Delay time: (%f)".format(sensors._1, delayTime), qve) + throw qve + } + } + + /** + * Returns the consumer quota for the specified clientId + * @return + */ + private[server] def quota(clientId : String) : Quota = { + if(overriddenQuota.contains(clientId)) + overriddenQuota(clientId) + else + defaultQuota + } + + /* + * This function either returns the sensors for a given client id or creates them if they don't exist + * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor + */ + private def getOrCreateQuotaSensors(clientId : String) : (Sensor, Sensor) = { + + // Names of the sensors to access + val quotaSensorName = apiKey + "-" + clientId + val throttleTimeSensorName = apiKey + "ThrottleTime-" + clientId + var quotaSensor : Sensor = null + var throttleTimeSensor : Sensor = null + + /* Acquire the read lock to fetch the sensors. It is safe to call getSensor from multiple threads. + * The read lock allows a thread to create a sensor in isolation. The thread creating the sensor + * will acquire the write lock and prevent the sensors from being read while they are being created. + * It should be sufficient to simply check if the sensor is null without acquiring a read lock but the + * sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added. + * This read lock waits until the writer thread has released it's lock i.e. fully initialized the sensor + * at which point it is safe to read + */ + lock.readLock().lock() + try { + quotaSensor = metrics.getSensor(quotaSensorName) + throttleTimeSensor = metrics.getSensor(throttleTimeSensorName) + } + finally { + lock.readLock().unlock() + } + + /* If the sensor is null, try to create it else return the created sensor + * Also if quota sensor is null, the throttle time sensor must be null + */ + if(quotaSensor == null) { + /* Acquire a write lock because the sensor may not have been created and we only want one thread to create it. + * Note that multiple threads may acquire the write lock if they all see a null sensor initially + * In this case, the writer checks the sensor after acquiring the lock again. + * This is safe from Double Checked Locking because the references are read + * after acquiring read locks and hence they cannot see a partially published reference + */ + lock.writeLock().lock() + try { + quotaSensor = metrics.getSensor(quotaSensorName) + if(quotaSensor == null) { + // create the throttle time sensor also + throttleTimeSensor = metrics.sensor(throttleTimeSensorName) + throttleTimeSensor.add(new MetricName("throttle-time-max", + apiKey, + "Tracking throttle-time per client", + "client-id", + clientId), new Max()) + throttleTimeSensor.add(new MetricName("throttle-time-avg", + apiKey, + "Tracking throttle-time per client", + "client-id", + clientId), new Avg()) + quotaSensor = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota(clientId))) + quotaSensor.add(new MetricName("byte-rate", + apiKey, + "Tracking byte-rate per client", + "client-id", + clientId), new Rate()) + } + } finally { + lock.writeLock().unlock() + } + } + // return the read or created sensors + (quotaSensor, throttleTimeSensor) + } + + private def getQuotaMetricConfig(quota : Quota) : MetricConfig = { + new MetricConfig() + .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS) + .samples(config.numQuotaSamples) + .quotaEnforcementBlackout(config.quotaEnforcementDelaySeconds, TimeUnit.SECONDS) + .quota(quota) + } + + private def initQuotaMap(input : String) : mutable.Map[String, Quota] = { + val output = mutable.Map[String, Quota]() + for(entry <- input.split(",")) { + val trimmedEntry = entry.trim + if(!trimmedEntry.equals("")) { + val pair: Array[String] = trimmedEntry.split("=") + if (pair.length != 2) + throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format( + entry)) + output(pair(0)) = new Quota(pair(1).toDouble, true) + } + } + output + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 473be21..d9d8862 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -111,6 +111,14 @@ class ReplicaManager(val config: KafkaConfig, val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) + // Method 1 + val clientMetrics = new ClientQuotaMetrics(null, null) + + // Method 2 + val producerMetrics = new ClientQuotaMetrics2(null, null, "producer") + val consumerMetrics = new ClientQuotaMetrics2(null, null, "consumer") + + newGauge( "LeaderCount", new Gauge[Int] { @@ -279,6 +287,13 @@ class ReplicaManager(val config: KafkaConfig, internalTopicsAllowed: Boolean, messagesPerPartition: Map[TopicAndPartition, MessageSet], responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { + + // Example of method 1 + clientMetrics.recordBytesProduced("xx", 1) + + // Method 2 + producerMetrics.record("xx", 1) + if (isValidRequiredAcks(requiredAcks)) { val sTime = SystemTime.milliseconds @@ -418,6 +433,12 @@ class ReplicaManager(val config: KafkaConfig, fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { + // Example of method 1 + clientMetrics.recordBytesConsumed("xx", 1) + + // Method 2 + consumerMetrics.record("xx", 1) + val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest2.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest2.scala new file mode 100644 index 0000000..5f3c5e1 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest2.scala @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.Collections + +import org.apache.kafka.common.metrics.{QuotaViolationException, Quota, MetricConfig} +import org.apache.kafka.common.utils.MockTime +import org.scalatest.junit.JUnit3Suite +import org.junit.{Test, Assert} + +class ClientQuotaMetricsTest2 extends JUnit3Suite { + private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + new MockTime) + private val config = ClientQuotaMetricsConfig2(defaultQuotaPerSecond = 500, + quotaOverrides = "p1=2000,p2=4000") + + @Test + def testQuotaParsing() { + val clientMetrics = new ClientQuotaMetrics2(config, metrics, "producer") + Assert.assertEquals("Default producer quota should be 500", + new Quota(500, true), clientMetrics.quota("random-client-id")) + Assert.assertEquals("Should return the overridden value (2000)", + new Quota(2000, true), clientMetrics.quota("p1")) + Assert.assertEquals("Should return the overridden value (4000)", + new Quota(4000, true), clientMetrics.quota("p2")) + } + + @Test + def testQuotaViolation() { + val time = new MockTime + val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + time) + val clientMetrics = new ClientQuotaMetrics2(config, metrics, "producer") + /* We have 10 second windows. Make sure that there is no quota violation + * if we produce under the quota + */ + for(i <- 0 until 10) { + clientMetrics.record("unknown", 400) + time.sleep(1000) + } + + // Create a spike. + var sleepTime = 0 + try { + clientMetrics.record("unknown", 2000) + } + catch { + case qve : QuotaViolationException => + sleepTime = qve.getDelayTimeMs + } + // 400*10 + 2000 = 6000/10 = 600 bytes per second. + // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200*1.2 delayfactor = 2640 + Assert.assertEquals("Should be throttled", 2640, sleepTime) + time.sleep(sleepTime) + // At the end of sleep, the + //Assert.assertEquals("Should be unthrottled again", 0, clientMetrics.record("unknown", 0)) + } + + @Test + def testOverrideParse() { + val time = new MockTime + val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + time) + // Case 1 - Default config + var testConfig = ClientQuotaMetricsConfig2() + var clientMetrics = new ClientQuotaMetrics2(testConfig, metrics, "consumer") + Assert.assertEquals(new Quota(ClientQuotaMetricsConfig2.DefaultQuotaPerSecond, true), + clientMetrics.quota("p1")) + + // Case 2 - Empty override + testConfig = ClientQuotaMetricsConfig2(defaultQuotaPerSecond = 500, + quotaOverrides = "p1=2000,p2=4000,,") + + clientMetrics = new ClientQuotaMetrics2(testConfig, metrics, "consumer") + Assert.assertEquals(new Quota(2000, true), clientMetrics.quota("p1")) + Assert.assertEquals(new Quota(4000, true), clientMetrics.quota("p2")) + + // Case 3 - NumberFormatException for override + testConfig = ClientQuotaMetricsConfig2(defaultQuotaPerSecond = 500, + quotaOverrides = "p1=2000,p2=4000,p3=p4") + try { + clientMetrics = new ClientQuotaMetrics2(testConfig, metrics, "consumer") + Assert.fail("Should fail to parse invalid config " + testConfig.quotaOverrides) + } + catch { + // Swallow. + case nfe : NumberFormatException => + } + + // Case 4 - IllegalArgumentException for override + testConfig = ClientQuotaMetricsConfig2(defaultQuotaPerSecond = 500, + quotaOverrides = "p1=2000=3000") + try { + clientMetrics = new ClientQuotaMetrics2(testConfig, metrics, "producer") + Assert.fail("Should fail to parse invalid config " + testConfig.quotaOverrides) + } + catch { + // Swallow. + case nfe : IllegalArgumentException => + } + + } +} -- 1.7.12.4