Index: core/src/main/scala/kafka/producer/ProducerPool.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerPool.scala (revision 1367339) +++ core/src/main/scala/kafka/producer/ProducerPool.scala (working copy) @@ -33,7 +33,7 @@ val props = new Properties() props.put("host", broker.host) props.put("port", broker.port.toString) - props.putAll(config.props) + props.putAll(config.props.props) val producer = new SyncProducer(new SyncProducerConfig(props)) info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port) syncProducers.put(broker.id, producer) Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducerConfig.scala (revision 1367339) +++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala (working copy) @@ -17,10 +17,14 @@ package kafka.producer -import kafka.utils.Utils import java.util.Properties +import kafka.utils.{VerifiableProperties, Utils} -class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared { +class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared { + def this(originalProps: Properties) { + this(new VerifiableProperties(originalProps)) + } + /** the broker to which the producer sends events */ val host = Utils.getString(props, "host") @@ -29,7 +33,7 @@ } trait SyncProducerConfigShared { - val props: Properties + val props: VerifiableProperties val bufferSize = Utils.getInt(props, "buffer.size", 100*1024) Index: core/src/main/scala/kafka/producer/ProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerConfig.scala (revision 1367339) +++ core/src/main/scala/kafka/producer/ProducerConfig.scala (working copy) @@ -19,12 +19,17 @@ import async.AsyncProducerConfig import java.util.Properties -import kafka.utils.{ZKConfig, Utils} import kafka.common.InvalidConfigException +import kafka.utils.{VerifiableProperties, ZKConfig, Utils} -class ProducerConfig(val props: Properties) extends ZKConfig(props) +class ProducerConfig private (val props: VerifiableProperties) extends ZKConfig(props) with AsyncProducerConfig with SyncProducerConfigShared{ + def this(originalProps: Properties) { + this(new VerifiableProperties(originalProps)) + props.verify() + } + /** For bypassing zookeeper based auto partition discovery, use this config * * to pass in static broker and per-broker partition information. Format- * * brokerid1:host1:port1, brokerid2:host2:port2*/ Index: core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (revision 1367339) +++ core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (working copy) @@ -16,11 +16,10 @@ */ package kafka.producer.async -import java.util.Properties -import kafka.utils.Utils +import kafka.utils.{VerifiableProperties, Utils} trait AsyncProducerConfig { - val props: Properties + val props: VerifiableProperties /* maximum time, in milliseconds, for buffering data on the producer queue */ val queueTime = Utils.getInt(props, "queue.time", 5000) Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1367339) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -18,8 +18,9 @@ package kafka.consumer import java.util.Properties -import kafka.utils.{ZKConfig, Utils} import kafka.api.OffsetRequest +import kafka.utils.{VerifiableProperties, ZKConfig, Utils} + object ConsumerConfig { val SocketTimeout = 30 * 1000 val SocketBufferSize = 64*1024 @@ -43,9 +44,14 @@ val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" } -class ConsumerConfig(props: Properties) extends ZKConfig(props) { +class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) { import ConsumerConfig._ + def this(originalProps: Properties) { + this(new VerifiableProperties(originalProps)) + props.verify() + } + /** a string that uniquely identifies a set of consumers within the same consumer group */ val groupId = Utils.getString(props, "groupid") Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1367339) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -211,7 +211,7 @@ /** * Read a required integer property value or throw an exception if no such property is found */ - def getInt(props: Properties, name: String): Int = { + def getInt(props: VerifiableProperties, name: String): Int = { if(props.containsKey(name)) return getInt(props, name, -1) else @@ -225,10 +225,10 @@ * @param default The default value to use if the property is not found * @return the integer value */ - def getInt(props: Properties, name: String, default: Int): Int = + def getInt(props: VerifiableProperties, name: String, default: Int): Int = getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue)) - def getShort(props: Properties, name: String, default: Short): Short = + def getShort(props: VerifiableProperties, name: String, default: Short): Short = getShortInRange(props, name, default, (Short.MinValue, Short.MaxValue)) /** @@ -241,7 +241,7 @@ * @throws KafkaException If the value is not in the given range * @return the integer value */ - def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = { + def getIntInRange(props: VerifiableProperties, name: String, default: Int, range: (Int, Int)): Int = { val v = if(props.containsKey(name)) props.getProperty(name).toInt @@ -253,7 +253,7 @@ v } - def getShortInRange(props: Properties, name: String, default: Short, range: (Short, Short)): Short = { + def getShortInRange(props: VerifiableProperties, name: String, default: Short, range: (Short, Short)): Short = { val v = if(props.containsKey(name)) props.getProperty(name).toShort @@ -289,7 +289,7 @@ /** * Read a required long property value or throw an exception if no such property is found */ - def getLong(props: Properties, name: String): Long = { + def getLong(props: VerifiableProperties, name: String): Long = { if(props.containsKey(name)) return getLong(props, name, -1) else @@ -303,7 +303,7 @@ * @param default The default value to use if the property is not found * @return the long value */ - def getLong(props: Properties, name: String, default: Long): Long = + def getLong(props: VerifiableProperties, name: String, default: Long): Long = getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue)) /** @@ -316,7 +316,7 @@ * @throws KafkaException If the value is not in the given range * @return the long value */ - def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = { + def getLongInRange(props: VerifiableProperties, name: String, default: Long, range: (Long, Long)): Long = { val v = if(props.containsKey(name)) props.getProperty(name).toLong @@ -335,7 +335,7 @@ * @param default The default value to use if the property is not found * @return the boolean value */ - def getBoolean(props: Properties, name: String, default: Boolean): Boolean = { + def getBoolean(props: VerifiableProperties, name: String, default: Boolean): Boolean = { if(!props.containsKey(name)) default else if("true" == props.getProperty(name)) @@ -349,7 +349,7 @@ /** * Get a string property, or, if no such property is defined, return the given default value */ - def getString(props: Properties, name: String, default: String): String = { + def getString(props: VerifiableProperties, name: String, default: String): String = { if(props.containsKey(name)) props.getProperty(name) else @@ -359,7 +359,7 @@ /** * Get a string property or throw and exception if no such property is defined. */ - def getString(props: Properties, name: String): String = { + def getString(props: VerifiableProperties, name: String): String = { if(props.containsKey(name)) props.getProperty(name) else @@ -367,48 +367,6 @@ } /** - * Get a property of type java.util.Properties or throw and exception if no such property is defined. - */ - def getProps(props: Properties, name: String): Properties = { - if(props.containsKey(name)) { - val propString = props.getProperty(name) - val propValues = propString.split(",") - val properties = new Properties - for(i <- 0 until propValues.length) { - val prop = propValues(i).split("=") - if(prop.length != 2) - throw new KafkaException("Illegal format of specifying properties '" + propValues(i) + "'") - properties.put(prop(0), prop(1)) - } - properties - } - else - throw new KafkaException("Missing required property '" + name + "'") - } - - /** - * Get a property of type java.util.Properties or return the default if no such property is defined - */ - def getProps(props: Properties, name: String, default: Properties): Properties = { - if(props.containsKey(name)) { - val propString = props.getProperty(name) - val propValues = propString.split(",") - if(propValues.length < 1) - throw new KafkaException("Illegal format of specifying properties '" + propString + "'") - val properties = new Properties - for(i <- 0 until propValues.length) { - val prop = propValues(i).split("=") - if(prop.length != 2) - throw new KafkaException("Illegal format of specifying properties '" + propValues(i) + "'") - properties.put(prop(0), prop(1)) - } - properties - } - else - default - } - - /** * Open a channel for the given file */ def openChannel(file: File, mutable: Boolean): FileChannel = { @@ -741,7 +699,7 @@ else true } - def getCompressionCodec(props: Properties, codec: String): CompressionCodec = { + def getCompressionCodec(props: VerifiableProperties, codec: String): CompressionCodec = { val codecValueString = props.getProperty(codec) if(codecValueString == null) NoCompressionCodec @@ -905,4 +863,29 @@ def durationMs: Double = (end.get - start) / (1000.0 * 1000.0) } +} + +class VerifiableProperties(val props: Properties) extends Logging { + val referenceSet = mutable.HashSet[String]() + + def containsKey(name: String): Boolean = { + props.containsKey(name) + } + + def getProperty(name: String): String = { + val value = props.getProperty(name) + referenceSet.add(name) + return value + } + + def verify() { + val specifiedProperties = props.propertyNames() + while (specifiedProperties.hasMoreElements) { + val key = specifiedProperties.nextElement().asInstanceOf[String] + if (!referenceSet.contains(key)) + warn("property %s is not valid".format(key)) + else + info("property %s is overridden to %s".format(key, props.getProperty(key))) + } + } } \ No newline at end of file Index: core/src/main/scala/kafka/utils/ZkUtils.scala =================================================================== --- core/src/main/scala/kafka/utils/ZkUtils.scala (revision 1367339) +++ core/src/main/scala/kafka/utils/ZkUtils.scala (working copy) @@ -17,7 +17,6 @@ package kafka.utils -import java.util.Properties import kafka.cluster.{Broker, Cluster} import kafka.common.NoEpochForPartitionException import kafka.consumer.TopicCount @@ -534,7 +533,7 @@ } -class ZKConfig(props: Properties) { +class ZKConfig(props: VerifiableProperties) { /** ZK host string */ val zkConnect = Utils.getString(props, "zk.connect", null) Index: core/src/main/scala/kafka/utils/Mx4jLoader.scala =================================================================== --- core/src/main/scala/kafka/utils/Mx4jLoader.scala (revision 1367339) +++ core/src/main/scala/kafka/utils/Mx4jLoader.scala (working copy) @@ -33,10 +33,10 @@ object Mx4jLoader extends Logging { def maybeLoad(): Boolean = { - if (!Utils.getBoolean(System.getProperties(), "kafka_mx4jenable", false)) + if (!Utils.getBoolean(new VerifiableProperties(System.getProperties()), "kafka_mx4jenable", false)) false val address = System.getProperty("mx4jaddress", "0.0.0.0") - val port = Utils.getInt(System.getProperties(), "mx4jport", 8082) + val port = Utils.getInt(new VerifiableProperties(System.getProperties()), "mx4jport", 8082) try { debug("Will try to load MX4j now, if it's in the classpath"); Index: core/src/main/scala/kafka/server/KafkaConfig.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1367339) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (working copy) @@ -18,14 +18,20 @@ package kafka.server import java.util.Properties -import kafka.utils.{Utils, ZKConfig} import kafka.message.Message import kafka.consumer.ConsumerConfig +import kafka.utils.{VerifiableProperties, Utils, ZKConfig} /** * Configuration settings for the kafka server */ -class KafkaConfig(props: Properties) extends ZKConfig(props) { +class KafkaConfig private (props: VerifiableProperties) extends ZKConfig(props) { + + def this(originalProps: Properties) { + this(new VerifiableProperties(originalProps)) + props.verify() + } + /* the port to listen and accept connections on */ val port: Int = Utils.getInt(props, "port", 6667)