diff --git core/src/main/scala/kafka/consumer/ConsumerConfig.scala core/src/main/scala/kafka/consumer/ConsumerConfig.scala index c531cd1..71ed95d 100644 --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -94,5 +94,8 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) { * overhead of decompression. * */ val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false) + + /** the user-defined character encoding for StringDecoder, or default to UTF-8 */ + val characterEncoding = Utils.getString(props, "character.encoding", "UTF-8") } diff --git core/src/main/scala/kafka/producer/ProducerConfig.scala core/src/main/scala/kafka/producer/ProducerConfig.scala index 8a5b53c..1bc630f 100644 --- core/src/main/scala/kafka/producer/ProducerConfig.scala +++ core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -86,4 +86,7 @@ class ProducerConfig(val props: Properties) extends ZKConfig(props) * This parameter specifies the number of times the producer attempts to refresh this ZK cache. */ val zkReadRetries = Utils.getInt(props, "zk.read.num.retries", 3) + + /** the user-defined character encoding for StringDecoder, or default to UTF-8 */ + val characterEncoding = Utils.getString(props, "character.encoding", "UTF-8") } diff --git core/src/main/scala/kafka/serializer/Decoder.scala core/src/main/scala/kafka/serializer/Decoder.scala index 7d1c138..1922417 100644 --- core/src/main/scala/kafka/serializer/Decoder.scala +++ core/src/main/scala/kafka/serializer/Decoder.scala @@ -27,11 +27,11 @@ class DefaultDecoder extends Decoder[Message] { def toEvent(message: Message):Message = message } -class StringDecoder extends Decoder[String] { +class StringDecoder(val characterEncoding:String = "UTF-8") extends Decoder[String] { def toEvent(message: Message):String = { val buf = message.payload val arr = new Array[Byte](buf.remaining) buf.get(arr) - new String(arr) + new String(arr, characterEncoding) } } diff --git core/src/main/scala/kafka/serializer/Encoder.scala core/src/main/scala/kafka/serializer/Encoder.scala index 222e51b..d7da5f5 100644 --- core/src/main/scala/kafka/serializer/Encoder.scala +++ core/src/main/scala/kafka/serializer/Encoder.scala @@ -27,6 +27,13 @@ class DefaultEncoder extends Encoder[Message] { override def toMessage(event: Message):Message = event } -class StringEncoder extends Encoder[String] { - override def toMessage(event: String):Message = new Message(event.getBytes) +class StringEncoder() extends Encoder[String] { + private var characterEncoding:String = "UTF-8" + + def this(charEnc:String) = { + this() + characterEncoding = charEnc + } + + override def toMessage(event:String):Message = new Message(event.getBytes(characterEncoding)) } diff --git core/src/main/scala/kafka/tools/ConsumerShell.scala core/src/main/scala/kafka/tools/ConsumerShell.scala index 5eb5269..8bf044d 100644 --- core/src/main/scala/kafka/tools/ConsumerShell.scala +++ core/src/main/scala/kafka/tools/ConsumerShell.scala @@ -62,7 +62,7 @@ object ConsumerShell { val consumerConfig = new ConsumerConfig(Utils.loadProps(propsFile)) val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) - val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> partitions), new StringDecoder) + val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> partitions), new StringDecoder(consumerConfig.characterEncoding)) var threadList = List[ZKConsumerThread]() for ((topic, streamList) <- topicMessageStreams) for (stream <- streamList) diff --git core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala index e749597..3262dd1 100644 --- core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala +++ core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala @@ -396,10 +396,11 @@ class ProducerTest extends JUnitSuite { val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.producer.StaticPartitioner") + props.put("character.encoding", "UTF-8") props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new ProducerConfig(props) - val serializer = new StringEncoder + val serializer = new StringEncoder(config.characterEncoding) val producer = new Producer[String, String](config) try { @@ -425,11 +426,12 @@ class ProducerTest extends JUnitSuite { def testZKSendWithDeadBroker() { val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("character.encoding", "UTF-8") props.put("partitioner.class", "kafka.producer.StaticPartitioner") props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new ProducerConfig(props) - val serializer = new StringEncoder + val serializer = new StringEncoder(config.characterEncoding) val producer = new Producer[String, String](config) try {