diff --git core/src/main/scala/kafka/serializer/Decoder.scala core/src/main/scala/kafka/serializer/Decoder.scala index 7d1c138..f5be58e 100644 --- core/src/main/scala/kafka/serializer/Decoder.scala +++ core/src/main/scala/kafka/serializer/Decoder.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -18,20 +18,23 @@ package kafka.serializer import kafka.message.Message +import java.util.Properties trait Decoder[T] { def toEvent(message: Message):T } -class DefaultDecoder extends Decoder[Message] { +class DefaultDecoder(val properties: Properties) extends Decoder[Message] { + def this() = this(new Properties()) def toEvent(message: Message):Message = message } -class StringDecoder extends Decoder[String] { +class StringDecoder(val properties: Properties) extends Decoder[String] { + def this() = this(new Properties()) def toEvent(message: Message):String = { val buf = message.payload val arr = new Array[Byte](buf.remaining) buf.get(arr) - new String(arr) + new String(arr, properties.getProperty("kafka.character.encoding", "UTF-8")) } } diff --git core/src/main/scala/kafka/serializer/Encoder.scala core/src/main/scala/kafka/serializer/Encoder.scala index 222e51b..a54eb2c 100644 --- core/src/main/scala/kafka/serializer/Encoder.scala +++ core/src/main/scala/kafka/serializer/Encoder.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -18,15 +18,21 @@ package kafka.serializer import kafka.message.Message +import java.util.Properties trait Encoder[T] { def toMessage(event: T):Message } -class DefaultEncoder extends Encoder[Message] { +class DefaultEncoder(val properties: Properties) extends Encoder[Message] { + def this() = this(new Properties()) override def toMessage(event: Message):Message = event } -class StringEncoder extends Encoder[String] { - override def toMessage(event: String):Message = new Message(event.getBytes) +class StringEncoder(val properties: Properties) extends Encoder[String] { + def this() = this(new Properties()) + override def toMessage(event: String):Message = { + val charEncoding = properties.getProperty("kafka.character.encoding", "UTF-8") + new Message(event.getBytes(charEncoding)) + } }