diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index f582919..47aad81 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -18,7 +18,7 @@ package kafka.producer import async.{DefaultEventHandler, ProducerSendThread, EventHandler} import kafka.utils._ -import java.util.Random +import java.util.Properties import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import kafka.serializer.Encoder import java.util.concurrent.atomic.AtomicBoolean @@ -58,7 +58,7 @@ class Producer[K,V](val config: ProducerConfig, this(config, new DefaultEventHandler[K,V](config, Utils.createObject[Partitioner[K]](config.partitionerClass, config.props), - Utils.createObject[Encoder[V]](config.serializerClass, config.props), + Utils.createObject[Encoder[V]](config.serializerClass, config.props.asInstanceOf[Properties]), Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), new ProducerPool(config))) diff --git a/core/src/main/scala/kafka/serializer/Encoder.scala b/core/src/main/scala/kafka/serializer/Encoder.scala index 020e73c..88fbfe6 100644 --- a/core/src/main/scala/kafka/serializer/Encoder.scala +++ b/core/src/main/scala/kafka/serializer/Encoder.scala @@ -17,6 +17,7 @@ package kafka.serializer +import java.util.Properties import kafka.utils.VerifiableProperties /** @@ -43,12 +44,12 @@ class NullEncoder[T](props: VerifiableProperties = null) extends Encoder[T] { * The string encoder takes an optional parameter serializer.encoding which controls * the character set used in encoding the string into bytes. */ -class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] { +class StringEncoder(props: Properties = null) extends Encoder[String] { val encoding = if(props == null) "UTF8" else - props.getString("serializer.encoding", "UTF8") + props.getProperty("serializer.encoding", "UTF8") override def toBytes(s: String): Array[Byte] = if(s == null) diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index e0a5a27..5174204 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -455,6 +455,7 @@ object Utils extends Logging { */ def createObject[T<:AnyRef](className: String, args: AnyRef*): T = { val klass = Class.forName(className).asInstanceOf[Class[T]] + val constructor = klass.getConstructor(args.map(_.getClass): _*) constructor.newInstance(args: _*).asInstanceOf[T] } diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala index d694ba9..bbfc3c0 100644 --- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -20,7 +20,7 @@ package kafka.utils import java.util.Properties import scala.collection._ -class VerifiableProperties(val props: Properties) extends Logging { +class VerifiableProperties(val props: Properties) extends Properties with Logging { private val referenceSet = mutable.HashSet[String]() def this() = this(new Properties) @@ -29,10 +29,16 @@ class VerifiableProperties(val props: Properties) extends Logging { props.containsKey(name) } - def getProperty(name: String): String = { + override def getProperty(name: String): String = { val value = props.getProperty(name) referenceSet.add(name) - return value + value + } + + override def getProperty(name: String, defaultValue: String): String = { + val value = props.getProperty(name, defaultValue) + referenceSet.add(name) + value } /** @@ -40,7 +46,7 @@ class VerifiableProperties(val props: Properties) extends Logging { */ def getInt(name: String): Int = { require(containsKey(name), "Missing required property '" + name + "'") - return getInt(name, -1) + getInt(name, -1) } def getIntInRange(name: String, range: (Int, Int)): Int = { @@ -94,7 +100,7 @@ class VerifiableProperties(val props: Properties) extends Logging { */ def getLong(name: String): Long = { require(containsKey(name), "Missing required property '" + name + "'") - return getLong(name, -1) + getLong(name, -1) } /**