From 85c88d3e13e18821bcbecfa0dd42ad21bd0b7b86 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 2 Jun 2015 17:24:06 -0700 Subject: [PATCH] Added 4 metrics to KafkaConfig: - kafka.metrics.reporters - kafka.metrics.polling.interval.secs - kafka.csv.metrics.reporter.enabled - kafka.csv.metrics.dir --- core/src/main/scala/kafka/server/KafkaConfig.scala | 45 +++++++++++++++++++++- .../kafka/server/KafkaConfigConfigDefTest.scala | 28 ++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9efa15c..e01d6b8 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -123,8 +123,13 @@ object Defaults { val OffsetCommitTimeoutMs = OffsetManagerConfig.DefaultOffsetCommitTimeoutMs val OffsetCommitRequiredAcks = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks - val DeleteTopicEnable = false + /** ********* Metrics Configuration ***********/ + val MetricsReporters = "" + val MetricsPollingIntervalSecs = 10 + val CsvMetricsReporterEnabled = false + val CsvMetricsDirectory = "kafka_metrics" + val DeleteTopicEnable = false val CompressionType = "producer" } @@ -230,6 +235,12 @@ object KafkaConfig { val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms" val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks" + /** ********* Metrics Configuration ***********/ + val MetricsReportersProp = "kafka.metrics.reporters" + val MetricsPollingIntervalSecsProp = "kafka.metrics.polling.interval.secs" + val CsvMetricsReporterEnabledProp = "kafka.csv.metrics.reporter.enabled" + val CsvMetricsDirectoryProp = "kafka.csv.metrics.dir" + val DeleteTopicEnableProp = "delete.topic.enable" val CompressionTypeProp = "compression.type" @@ -359,6 +370,11 @@ object KafkaConfig { val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + "or this timeout is reached. This is similar to the producer request timeout." val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden" + /** ********* Metrics Configuration ***********/ + val MetricsReportersDoc = "Comma-separated list of reporter types. These classes should be on the classpath and will be instantiated at run-time." + val MetricsPollingIntervalSecsDoc = "The metrics polling interval (in seconds)." + val CsvMetricsReporterEnabledDoc = "Indicates whether the Csv Metrics Reporter is enabled or not. Disabled by default" + val CsvMetricsDirectoryDoc = "Directory to store the Csv Metric reports (if kafka.csv.metrics.reporter.enabled is true" val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + @@ -477,6 +493,13 @@ object KafkaConfig { .define(OffsetsRetentionCheckIntervalMsProp, LONG, Defaults.OffsetsRetentionCheckIntervalMs, atLeast(1), HIGH, OffsetsRetentionCheckIntervalMsDoc) .define(OffsetCommitTimeoutMsProp, INT, Defaults.OffsetCommitTimeoutMs, atLeast(1), HIGH, OffsetCommitTimeoutMsDoc) .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc) + + /** ********* Metrics Configuration ***********/ + .define(MetricsReportersProp, STRING, Defaults.MetricsReporters, LOW, MetricsReportersDoc) + .define(MetricsPollingIntervalSecsProp, INT, Defaults.MetricsPollingIntervalSecs, atLeast(1), LOW, MetricsPollingIntervalSecsDoc) + .define(CsvMetricsReporterEnabledProp, BOOLEAN, Defaults.CsvMetricsReporterEnabled, LOW, CsvMetricsReporterEnabledDoc) + .define(CsvMetricsDirectoryProp, STRING, Defaults.CsvMetricsDirectory, LOW, CsvMetricsDirectoryDoc) + .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc) .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc) } @@ -597,6 +620,13 @@ object KafkaConfig { offsetsRetentionCheckIntervalMs = parsed.get(OffsetsRetentionCheckIntervalMsProp).asInstanceOf[Long], offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int], offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short], + + /** ********* Metrics Configuration ***********/ + metricsReporters = parsed.get(MetricsReportersProp).asInstanceOf[String], + metricsPollingIntervalSecs = parsed.get(MetricsPollingIntervalSecsProp).asInstanceOf[Int], + csvMetricsReporterEnabled = parsed.get(CsvMetricsReporterEnabledProp).asInstanceOf[Boolean], + csvMetricsDirectory = parsed.get(CsvMetricsDirectoryProp).asInstanceOf[String], + deleteTopicEnable = parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean], compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String] ) @@ -741,6 +771,12 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val offsetCommitTimeoutMs: Int = Defaults.OffsetCommitTimeoutMs, val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks, + /** ********* Metrics Configuration ***********/ + val metricsReporters: String = Defaults.MetricsReporters, + val metricsPollingIntervalSecs: Int = Defaults.MetricsPollingIntervalSecs, + val csvMetricsReporterEnabled: Boolean = Defaults.CsvMetricsReporterEnabled, + val csvMetricsDirectory: String = Defaults.CsvMetricsDirectory, + val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable, val compressionType: String = Defaults.CompressionType ) { @@ -962,6 +998,13 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(OffsetsRetentionCheckIntervalMsProp, offsetsRetentionCheckIntervalMs.toString) props.put(OffsetCommitTimeoutMsProp, offsetCommitTimeoutMs.toString) props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString) + + /** ********* Metrics Configuration ***********/ + props.put(MetricsReportersProp, metricsReporters) + props.put(MetricsPollingIntervalSecsProp, metricsPollingIntervalSecs.toString) + props.put(CsvMetricsReporterEnabledProp, csvMetricsReporterEnabled.toString) + props.put(CsvMetricsDirectoryProp, csvMetricsDirectory) + props.put(DeleteTopicEnableProp, deleteTopicEnable.toString) props.put(CompressionTypeProp, compressionType.toString) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 8014a5a..64dc5f5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -20,7 +20,9 @@ import java.util.Properties import kafka.api.ApiVersion import kafka.message._ +import kafka.metrics.KafkaMetricsConfig import kafka.server.{Defaults, KafkaConfig} +import kafka.utils.VerifiableProperties import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.{Assert, Test} import org.scalatest.junit.JUnit3Suite @@ -144,6 +146,11 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.offsetCommitTimeoutMs, actualConfig.offsetCommitTimeoutMs) Assert.assertEquals(expectedConfig.offsetCommitRequiredAcks, actualConfig.offsetCommitRequiredAcks) + Assert.assertEquals(expectedConfig.metricsReporters, actualConfig.metricsReporters) + Assert.assertEquals(expectedConfig.metricsPollingIntervalSecs, actualConfig.metricsPollingIntervalSecs) + Assert.assertEquals(expectedConfig.csvMetricsReporterEnabled, actualConfig.csvMetricsReporterEnabled) + Assert.assertEquals(expectedConfig.csvMetricsDirectory, actualConfig.csvMetricsDirectory) + Assert.assertEquals(expectedConfig.deleteTopicEnable, actualConfig.deleteTopicEnable) Assert.assertEquals(expectedConfig.compressionType, actualConfig.compressionType) } @@ -240,6 +247,10 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.OffsetCommitRequiredAcksProp => expected.setProperty(name, "-1") case KafkaConfig.OffsetsTopicReplicationFactorProp => expected.setProperty(name, inRangeIntProp(-1, Short.MaxValue)) //BrokerCompressionCodec.isValid(compressionType) + case KafkaConfig.MetricsReportersProp => expected.setProperty(name, "NewMetricsReporter") + case KafkaConfig.MetricsPollingIntervalSecsProp => expected.setProperty(name, atLeastOneIntProp) + case KafkaConfig.CsvMetricsReporterEnabledProp => expected.setProperty(name, randFrom("true", "false")) + case KafkaConfig.CsvMetricsDirectoryProp => expected.setProperty(name, "XXX") case KafkaConfig.CompressionTypeProp => expected.setProperty(name, randFrom(BrokerCompressionCodec.brokerCompressionOptions)) case nonNegativeIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString) @@ -340,6 +351,10 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") + case KafkaConfig.MetricsReportersProp => // ignore string + case KafkaConfig.MetricsPollingIntervalSecsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.CsvMetricsReporterEnabledProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") + case KafkaConfig.CsvMetricsDirectoryProp => // ignore string case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") @@ -384,6 +399,19 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec) } + @Test + def testMetricProperties() { + val defaults = new Properties() + defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") + defaults.put(KafkaConfig.MetricsReportersProp, "A,B") + defaults.put(KafkaConfig.MetricsPollingIntervalSecsProp, "30") + val config = KafkaConfig.fromProps(defaults) + + val metricsCfg = new KafkaMetricsConfig(new VerifiableProperties(config.toProps)) + Assert.assertEquals(30, metricsCfg.pollingIntervalSecs) + Assert.assertEquals(Seq("A", "B"), metricsCfg.reporters) + } + private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) { values.foreach((value) => { val props = validRequiredProps -- 1.7.12.4