diff --git core/src/main/scala/kafka/consumer/ConsoleConsumer.scala core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 21aaccc..fd77653 100644
--- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -108,7 +108,7 @@ object ConsoleConsumer {
     
     val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
 
-    val connector = Consumer.create(config)
+    val connector = Consumer.create[Message](config)
     
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
@@ -119,7 +119,8 @@ object ConsoleConsumer {
       }
     })
     
-    var stream: KafkaMessageStream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
+    var stream: KafkaMessageStream[Message] =
+      connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
     val iter =
       if(maxMessages >= 0)
         stream.slice(0, maxMessages)
diff --git core/src/main/scala/kafka/consumer/ConsumerConfig.scala core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 1210f3f..c7aacee 100644
--- core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -101,5 +101,8 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
 
   val mirrorConsumerNumThreads = Utils.getInt(
     props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
+
+  val deserializerClass = Utils.getString(
+    props, "deserializer.class", "kafka.serializer.DefaultDecoder")
 }
 
diff --git core/src/main/scala/kafka/consumer/ConsumerConnector.scala core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index d807b25..da2fb0a 100644
--- core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger
 /**
  *  Main interface for consumer
  */
-trait ConsumerConnector {
+trait ConsumerConnector[V] {
   /**
    *  Create a list of MessageStreams for each topic.
    *
@@ -32,7 +32,8 @@ trait ConsumerConnector {
    *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
    *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
    */
-  def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]]
+  def createMessageStreams(
+      topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream[V]]]
 
   /**
    *  Commit the offsets of all broker partitions connected by this connector.
@@ -55,8 +56,8 @@ object Consumer {
    *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
    *                 connection string zk.connect.
    */
-  def create(config: ConsumerConfig): ConsumerConnector = {
-    val consumerConnect = new ZookeeperConsumerConnector(config)
+  def create[V](config: ConsumerConfig): ConsumerConnector[V] = {
+    val consumerConnect = new ZookeeperConsumerConnector[V](config)
     Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect, consumerStatsMBeanName))
     consumerConnect
   }
@@ -67,9 +68,12 @@ object Consumer {
    *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
    *                 connection string zk.connect.
    */
-  def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
-    val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
+  def createJavaConsumerConnector[V](
+      config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector[V] = {
+    val consumerConnect =
+      new kafka.javaapi.consumer.ZookeeperConsumerConnector[V](config)
     Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName))
     consumerConnect
   }
 }
+
diff --git core/src/main/scala/kafka/consumer/ConsumerIterator.scala core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 9b24781..b52aa78 100644
--- core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -20,34 +20,35 @@ package kafka.consumer
 import kafka.utils.IteratorTemplate
 import org.apache.log4j.Logger
 import java.util.concurrent.{TimeUnit, BlockingQueue}
-import kafka.cluster.Partition
-import kafka.message.{MessageAndOffset, MessageSet, Message}
+import kafka.message.{MessageAndOffset, Message}
+import kafka.serializer.Decoder
+
 
 /**
  * An iterator that blocks until a value can be read from the supplied queue.
  * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
  * 
  */
-class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int)
-        extends IteratorTemplate[Message] {
-  
-  private val logger = Logger.getLogger(classOf[ConsumerIterator])
+class ConsumerIterator[V](private val channel: BlockingQueue[FetchedDataChunk],
+      consumerTimeoutMs: Int, decoder: Decoder[V]) extends IteratorTemplate[V] {
+
+  private val logger = Logger.getLogger(classOf[ConsumerIterator[V]])
   private var current: Iterator[MessageAndOffset] = null
   private var currentDataChunk: FetchedDataChunk = null
   private var currentTopicInfo: PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
 
-  override def next(): Message = {
-    val message = super.next
+  override def next(): V = {
+    val decodedMessage = super.next()
     if(consumedOffset < 0)
       throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
     currentTopicInfo.resetConsumeOffset(consumedOffset)
     if(logger.isTraceEnabled)
       logger.trace("Setting consumed offset to %d".format(consumedOffset))
-    message
+    decodedMessage
   }
 
-  protected def makeNext(): Message = {
+  protected def makeNext(): V = {
     // if we don't have an iterator, get one
     if(current == null || !current.hasNext) {
       if (consumerTimeoutMs < 0)
@@ -62,7 +63,7 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
         if(logger.isDebugEnabled)
           logger.debug("Received the shutdown command")
     	  channel.offer(currentDataChunk)
-        return allDone
+        return allDone()
       } else {
         currentTopicInfo = currentDataChunk.topicInfo
         if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
@@ -73,11 +74,11 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
         current = currentDataChunk.messages.iterator
       }
     }
-    val item = current.next
+    val item = current.next()
     consumedOffset = item.offset
-    item.message
+    decoder.toEvent(item.message)
   }
-  
+
 }
 
 class ConsumerTimeoutException() extends RuntimeException()
diff --git core/src/main/scala/kafka/consumer/KafkaMessageStream.scala core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
index f6074a8..694748e 100644
--- core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
+++ core/src/main/scala/kafka/consumer/KafkaMessageStream.scala
@@ -20,20 +20,24 @@ package kafka.consumer
 import java.util.concurrent.BlockingQueue
 import org.apache.log4j.Logger
 import kafka.message.Message
+import kafka.serializer.{DefaultDecoder, Decoder}
 
 
 /**
  * All calls to elements should produce the same thread-safe iterator? Should have a seperate thread
  * that feeds messages into a blocking queue for processing.
  */
-class KafkaMessageStream(private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int)
-   extends Iterable[Message] with java.lang.Iterable[Message]{
+class KafkaMessageStream[V](private val queue: BlockingQueue[FetchedDataChunk],
+                            consumerTimeoutMs: Int,
+                            private val decoder: Decoder[V])
+        extends Iterable[V] with java.lang.Iterable[V] {
 
   private val logger = Logger.getLogger(getClass())
-  private val iter: ConsumerIterator = new ConsumerIterator(queue, consumerTimeoutMs)
+  private val iter: ConsumerIterator[V] =
+    new ConsumerIterator[V](queue, consumerTimeoutMs, decoder)
     
   /**
    *  Create an iterator over messages in the stream.
    */
-  def iterator(): ConsumerIterator = iter
+  def iterator(): ConsumerIterator[V] = iter
 }
diff --git core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 3aee550..10a3d3a 100644
--- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -17,7 +17,6 @@
 
 package kafka.consumer
 
-import java.nio.channels._
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.message._
@@ -32,23 +31,23 @@ private[consumer] class PartitionTopicInfo(val topic: String,
                                            private val consumedOffset: AtomicLong,
                                            private val fetchedOffset: AtomicLong,
                                            private val fetchSize: AtomicInteger) {
-  private val logger = Logger.getLogger(getClass())
+  private val logger = Logger.getLogger(getClass)
   if (logger.isDebugEnabled) {
     logger.debug("initial consumer offset of " + this + " is " + consumedOffset.get)
     logger.debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
   }
 
-  def getConsumeOffset() = consumedOffset.get
+  def getConsumeOffset = consumedOffset.get
 
-  def getFetchOffset() = fetchedOffset.get
+  def getFetchOffset = fetchedOffset.get
 
-  def resetConsumeOffset(newConsumeOffset: Long) = {
+  def resetConsumeOffset(newConsumeOffset: Long) {
     consumedOffset.set(newConsumeOffset)
     if (logger.isDebugEnabled)
       logger.debug("reset consume offset of " + this + " to " + newConsumeOffset)
   }
 
-  def resetFetchOffset(newFetchOffset: Long) = {
+  def resetFetchOffset(newFetchOffset: Long) {
     fetchedOffset.set(newFetchOffset)
     if (logger.isDebugEnabled)
       logger.debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
@@ -75,11 +74,11 @@ private[consumer] class PartitionTopicInfo(val topic: String,
   /**
    *  add an empty message with the exception to the queue so that client can see the error
    */
-  def enqueueError(e: Throwable, fetchOffset: Long) = {
+  def enqueueError(e: Throwable, fetchOffset: Long) {
     val messages = new ByteBufferMessageSet(ErrorMapping.EmptyByteBuffer, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
   }
 
-  override def toString(): String = topic + ":" + partition.toString + ": fetched offset = " + fetchedOffset.get +
+  override def toString: String = topic + ":" + partition.toString + ": fetched offset = " + fetchedOffset.get +
     ": consumed offset = " + consumedOffset.get
 }
diff --git core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 429d499..cc23b89 100644
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -29,6 +29,8 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.api.OffsetRequest
 import java.util.UUID
+import kafka.serializer.Decoder
+
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -81,9 +83,9 @@ trait ZookeeperConsumerConnectorMBean {
   def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
 }
 
-private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
+private[kafka] class ZookeeperConsumerConnector[V](val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
-  extends ConsumerConnector with ZookeeperConsumerConnectorMBean {
+  extends ConsumerConnector[V] with ZookeeperConsumerConnectorMBean {
 
   private val logger = Logger.getLogger(getClass())
   private val isShuttingDown = new AtomicBoolean(false)
@@ -94,6 +96,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   // queues : (topic,consumerThreadId) -> queue
   private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+  private val decoder: Decoder[V] = Utils.getObject(config.deserializerClass)
   connectZk()
   createFetcher()
   if (config.autoCommit) {
@@ -103,7 +106,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   def this(config: ConsumerConfig) = this(config, true)
 
-  def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]] = {
+  def createMessageStreams(
+      topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream[V]]] = {
     consume(topicCountMap)
   }
 
@@ -143,13 +147,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
   }
 
-  def consume(topicCountMap: scala.collection.Map[String,Int]): Map[String,List[KafkaMessageStream]] = {
+  def consume(topicCountMap:scala.collection.Map[String,Int]): Map[String,List[KafkaMessageStream[V]]] = {
     logger.debug("entering consume ")
     if (topicCountMap == null)
       throw new RuntimeException("topicCountMap is null")
 
     val dirs = new ZKGroupDirs(config.groupId)
-    var ret = new mutable.HashMap[String,List[KafkaMessageStream]]
+    var ret = new mutable.HashMap[String,List[KafkaMessageStream[V]]]
 
     var consumerUuid : String = null
     config.consumerId match {
@@ -177,11 +181,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     // create a queue per topic per consumer thread
     val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
     for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
-      var streamList: List[KafkaMessageStream] = Nil
+      var streamList: List[KafkaMessageStream[V]] = Nil
       for (threadId <- threadIdSet) {
         val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         queues.put((topic, threadId), stream)
-        streamList ::= new KafkaMessageStream(stream, config.consumerTimeoutMs)
+        streamList ::= new KafkaMessageStream[V](stream, config.consumerTimeoutMs, decoder)
       }
       ret += (topic -> streamList)
       logger.debug("adding topic " + topic + " and stream to map..")
diff --git core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index 2f3355a..07d6382 100644
--- core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -22,7 +22,7 @@ import kafka.consumer.KafkaMessageStream;
 import java.util.List;
 import java.util.Map;
 
-public interface ConsumerConnector {
+public interface ConsumerConnector<T> {
     /**
      *  Create a list of MessageStreams for each topic.
      *
@@ -30,7 +30,7 @@ public interface ConsumerConnector {
      *  @return a map of (topic, list of  KafkaMessageStream) pair. The number of items in the
      *          list is #streams. Each KafkaMessageStream supports an iterator of messages.
      */
-    public Map<String, List<KafkaMessageStream>> createMessageStreams(Map<String, Integer> topicCountMap);
+    public Map<String, List<KafkaMessageStream<T>>> createMessageStreams(Map<String, Integer> topicCountMap);
 
     /**
      *  Commit the offsets of all broker partitions connected by this connector.
diff --git core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 856e6e7..cfe1586 100644
--- core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -54,24 +54,24 @@ import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
  *
 */
 
-private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
+private[kafka] class ZookeeperConsumerConnector[V](val config: ConsumerConfig,
                                  val enableFetcher: Boolean) // for testing only
-    extends ConsumerConnector {
+    extends ConsumerConnector[V] {
 
-  val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
+  val underlying = new kafka.consumer.ZookeeperConsumerConnector[V](config, enableFetcher)
 
   def this(config: ConsumerConfig) = this(config, true)
 
  // for java client
   def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]):
-    java.util.Map[String,java.util.List[KafkaMessageStream]] = {
+    java.util.Map[String,java.util.List[KafkaMessageStream[V]]] = {
     import scala.collection.JavaConversions._
 
     val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
     val scalaReturn = underlying.consume(scalaTopicCountMap)
-    val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream]]
+    val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream[V]]]
     for ((topic, streams) <- scalaReturn) {
-      var javaStreamList = new java.util.ArrayList[KafkaMessageStream]
+      var javaStreamList = new java.util.ArrayList[KafkaMessageStream[V]]
       for (stream <- streams)
         javaStreamList.add(stream)
       ret.put(topic, javaStreamList)
diff --git core/src/main/scala/kafka/serializer/Decoder.scala core/src/main/scala/kafka/serializer/Decoder.scala
index 7d1c138..d02ea7d 100644
--- core/src/main/scala/kafka/serializer/Decoder.scala
+++ core/src/main/scala/kafka/serializer/Decoder.scala
@@ -35,3 +35,4 @@ class StringDecoder extends Decoder[String] {
     new String(arr)
   }
 }
+
diff --git core/src/main/scala/kafka/server/KafkaServerStartable.scala core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 6ffa827..f15459f 100644
--- core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -24,6 +24,8 @@ import kafka.message.Message
 import org.apache.log4j.Logger
 
 import scala.collection.Map
+import kafka.serializer.Decoder
+
 
 class KafkaServerStartable(val serverConfig: KafkaConfig,
                            val consumerConfig: ConsumerConfig,
@@ -74,7 +76,7 @@ class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
   // mirrorTopics should be accessed by handleTopicEvent only
   private var mirrorTopics:Seq[String] = List()
 
-  private var consumerConnector: ConsumerConnector = null
+  private var consumerConnector: ConsumerConnector[Message] = null
   private var topicEventWatcher:ZookeeperTopicEventWatcher = null
 
   private val producer = new Producer[Null, Message](producerConfig)
diff --git core/src/main/scala/kafka/tools/ConsumerPerformance.scala core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index f28b03f..255a4d0 100644
--- core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -25,6 +25,8 @@ import joptsimple._
 import org.apache.log4j.Logger
 import kafka.utils.Utils
 import kafka.consumer.{ConsumerConfig, ConsumerConnector, Consumer}
+import kafka.message.Message
+
 
 abstract class ShutdownableThread(name: String) extends Thread(name) {
   def shutdown(): Unit  
@@ -85,7 +87,7 @@ object ConsumerPerformance {
     var totalNumBytes = new AtomicLong(0)
     
     val consumerConfig = new ConsumerConfig(Utils.loadProps(propsFile))
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val consumerConnector: ConsumerConnector[Message] = Consumer.create(consumerConfig)
 
     val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> numThreads))
     var threadList = List[ShutdownableThread]()
diff --git core/src/main/scala/kafka/tools/ConsumerShell.scala core/src/main/scala/kafka/tools/ConsumerShell.scala
index a083fa1..c7b1bdd 100644
--- core/src/main/scala/kafka/tools/ConsumerShell.scala
+++ core/src/main/scala/kafka/tools/ConsumerShell.scala
@@ -22,6 +22,8 @@ import kafka.utils.Utils
 import java.util.concurrent.CountDownLatch
 import org.apache.log4j.Logger
 import kafka.consumer._
+import kafka.message.Message
+
 
 /**
  * Program to read using the rich consumer and dump the results to standard out
@@ -62,7 +64,7 @@ object ConsumerShell {
     println("Starting consumer...")
 
     val consumerConfig = new ConsumerConfig(Utils.loadProps(propsFile))
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val consumerConnector: ConsumerConnector[Message] = Consumer.create(consumerConfig)
     val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> partitions))
     var threadList = List[ZKConsumerThread]()
     for ((topic, streamList) <- topicMessageStreams)
@@ -83,7 +85,7 @@ object ConsumerShell {
   }
 }
 
-class ZKConsumerThread(stream: KafkaMessageStream) extends Thread {
+class ZKConsumerThread(stream: KafkaMessageStream[Message]) extends Thread {
   val shutdownLatch = new CountDownLatch(1)
   val logger = Logger.getLogger(getClass)
 
diff --git core/src/main/scala/kafka/tools/ReplayLogProducer.scala core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index eb96574..1e56870 100644
--- core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -40,7 +40,7 @@ object ReplayLogProducer {
     consumerProps.put("fetch.size", (1024*1024).toString)
     consumerProps.put("socket.buffer.size", (2 * 1024 * 1024).toString)
     val consumerConfig = new ConsumerConfig(consumerProps)
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val consumerConnector: ConsumerConnector[Message] = Consumer.create(consumerConfig)
     val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
     var threadList = List[ZKConsumerThread]()
     for ((topic, streamList) <- topicMessageStreams)
@@ -139,7 +139,7 @@ object ReplayLogProducer {
     }
   }
 
-  class ZKConsumerThread(config: Config, stream: KafkaMessageStream) extends Thread {
+  class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread {
     val shutdownLatch = new CountDownLatch(1)
     val logger = Logger.getLogger(getClass)
     val props = new Properties()
diff --git core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
index 1c5f542..0b67a45 100644
--- core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
+++ core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
@@ -18,6 +18,7 @@
 package kafka
 
 import consumer._
+import message.Message
 import utils.Utils
 import java.util.concurrent.CountDownLatch
 
@@ -34,7 +35,7 @@ object TestZKConsumerOffsets {
     props.put("autooffset.reset", "largest")
     
     val config = new ConsumerConfig(props)
-    val consumerConnector: ConsumerConnector = Consumer.create(config)
+    val consumerConnector: ConsumerConnector[Message] = Consumer.create(config)
     val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
     var threadList = List[ConsumerThread]()
     for ((topic, streamList) <- topicMessageStreams)
@@ -55,7 +56,7 @@ object TestZKConsumerOffsets {
   }
 }
 
-private class ConsumerThread(stream: KafkaMessageStream) extends Thread {
+private class ConsumerThread(stream: KafkaMessageStream[Message]) extends Thread {
   val shutdownLatch = new CountDownLatch(1)
 
   override def run() {
diff --git core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 4002ccf..44f32fd 100644
--- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -27,6 +27,8 @@ import kafka.utils.{TestZKUtils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
+import java.nio.ByteBuffer
+import kafka.serializer.StringDecoder
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness {
   private val logger = Logger.getLogger(getClass())
@@ -61,7 +63,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
       TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
       override val consumerTimeoutMs = 200
     }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector[Message](consumerConfig0, true)
     val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
     try {
       getMessages(nMessages*2, topicMessageStreams0)
@@ -79,7 +81,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector[Message](consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
@@ -89,7 +91,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer2))
-    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector[Message](consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
     // send some messages to each broker
     val sentMessages2 = sendMessages(nMessages, "batch2")
@@ -102,7 +104,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // create a consumer with empty map
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
-    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val zkConsumerConnector3 = new ZookeeperConsumerConnector[Message](consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
     Thread.sleep(200)
@@ -124,33 +126,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
     requestHandlerLogger.setLevel(Level.FATAL)
 
-    var actualMessages: List[Message] = Nil
-
-    // test consumer timeout logic
-    val consumerConfig0 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
-      override val consumerTimeoutMs = 200
-    }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
-    try {
-      getMessages(nMessages*2, topicMessageStreams0)
-      fail("should get an exception")
-    }
-    catch {
-      case e: ConsumerTimeoutException => // this is ok
-        println("This is ok")
-      case e => throw e
-    }
-    zkConsumerConnector0.shutdown
-
     println("Sending messages for 1st consumer")
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector[Message](consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
@@ -161,7 +143,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer2))
-    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector[Message](consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
     // send some messages to each broker
     val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
@@ -175,7 +157,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     println("Sending more messages for 3rd consumer")
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
-    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val zkConsumerConnector3 = new ZookeeperConsumerConnector[Message](consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
     Thread.sleep(200)
@@ -210,13 +192,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
       TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
       override val consumerTimeoutMs = 5000
     }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector[Message](consumerConfig0, true)
     val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
     getMessages(100, topicMessageStreams0)
     zkConsumerConnector0.shutdown
     // at this point, only some part of the message set was consumed. So consumed offset should still be 0
     // also fetched offset should be 0
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector[Message](consumerConfig0, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
     val receivedMessages = getMessages(400, topicMessageStreams1)
     val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
@@ -227,6 +209,43 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
+  def testConsumerDecoder() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).
+            map(m => Utils.toString(m.payload, "UTF-8")).
+            sortWith((s, t) => s.compare(t) == -1)
+    val consumerConfig = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1)) {
+      override val deserializerClass = new StringDecoder().getClass.getName
+    }
+
+    val zkConsumerConnector =
+      new ZookeeperConsumerConnector[String](consumerConfig, true)
+    val topicMessageStreams =
+      zkConsumerConnector.createMessageStreams(
+        Predef.Map(topic -> numNodes*numParts/2))
+
+    var receivedMessages: List[String] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessages * 2) {
+          assertTrue(iterator.hasNext())
+          val message = iterator.next()
+          receivedMessages ::= message
+          logger.debug("received message: " + message)
+        }
+      }
+    }
+    receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1)
+    assertEquals(sentMessages, receivedMessages)
+
+    zkConsumerConnector.shutdown()
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
   def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec): List[Message]= {
     var messages: List[Message] = Nil
     val producer = TestUtils.createProducer("localhost", conf.port)
@@ -250,7 +269,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream]]): List[Message]= {
+  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= {
     var messages: List[Message] = Nil
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
diff --git core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 9923665..a07d9c7 100644
--- core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -27,6 +27,8 @@ import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.{TestUtils, TestZKUtils}
+import kafka.message.Message
+
 
 class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -77,7 +79,7 @@ class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness {
     logger.info("Updated consumer offset to " + largeOffset)
 
     Thread.sleep(500)
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val consumerConnector: ConsumerConnector[Message] = Consumer.create(consumerConfig)
     val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
 
     var threadList = List[Thread]()
@@ -131,7 +133,7 @@ class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness {
     logger.info("Updated consumer offset to " + smallOffset)
 
 
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val consumerConnector: ConsumerConnector[Message] = Consumer.create(consumerConfig)
     val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
 
     var threadList = List[Thread]()
@@ -184,7 +186,7 @@ class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness {
     logger.info("Updated consumer offset to " + largeOffset)
 
 
-    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val consumerConnector: ConsumerConnector[Message] = Consumer.create(consumerConfig)
     val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
 
     var threadList = List[Thread]()
diff --git core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 066e8f7..f96728e 100644
--- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -31,6 +31,8 @@ import kafka.consumer.{Consumer, ConsumerConfig, KafkaMessageStream, ConsumerTim
 import javax.management.NotCompliantMBeanException
 import org.apache.log4j.{Level, Logger}
 import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, CompressionCodec, Message}
+import kafka.serializer.StringDecoder
+
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness {
   private val logger = Logger.getLogger(getClass())
@@ -64,7 +66,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
       TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
       override val consumerTimeoutMs = 200
     }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector[Message](consumerConfig0, true)
     val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
     try {
       getMessages(nMessages*2, topicMessageStreams0)
@@ -81,7 +83,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector[Message](consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
@@ -91,7 +93,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer2))
-    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector[Message](consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
     // send some messages to each broker
     val sentMessages2 = sendMessages(nMessages, "batch2")
@@ -104,7 +106,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // create a consumer with empty map
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
-    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val zkConsumerConnector3 = new ZookeeperConsumerConnector[Message](consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(toJavaMap(new mutable.HashMap[String, Int]()))
     // send some messages to each broker
     Thread.sleep(200)
@@ -127,29 +129,12 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
     var actualMessages: List[Message] = Nil
 
-    // test consumer timeout logic
-    val consumerConfig0 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
-      override val consumerTimeoutMs = 200
-    }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
-    try {
-      getMessages(nMessages*2, topicMessageStreams0)
-      fail("should get an exception")
-    }
-    catch {
-      case e: ConsumerTimeoutException => // this is ok
-      case e => throw e
-    }
-    zkConsumerConnector0.shutdown
-
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector[Message](consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
@@ -159,7 +144,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer2))
-    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector[Message](consumerConfig2, true)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
     // send some messages to each broker
     val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
@@ -172,7 +157,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // create a consumer with empty map
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
-    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val zkConsumerConnector3 = new ZookeeperConsumerConnector[Message](consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(toJavaMap(new mutable.HashMap[String, Int]()))
     // send some messages to each broker
     Thread.sleep(200)
@@ -207,13 +192,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
       TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
       override val consumerTimeoutMs = 5000
     }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector[Message](consumerConfig0, true)
     val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> 1)))
     getMessages(100, topicMessageStreams0)
     zkConsumerConnector0.shutdown
     // at this point, only some part of the message set was consumed. So consumed offset should still be 0
     // also fetched offset should be 0
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector[Message](consumerConfig0, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> 1)))
     val receivedMessages = getMessages(400, topicMessageStreams1)
     val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
@@ -224,6 +209,43 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
+  def testConsumerDecoder() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).
+            map(m => Utils.toString(m.payload, "UTF-8")).
+            sortWith((s, t) => s.compare(t) == -1)
+    val consumerConfig = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1)) {
+      override val deserializerClass = new StringDecoder().getClass.getName
+    }
+
+    val zkConsumerConnector =
+      new ZookeeperConsumerConnector[String](consumerConfig, true)
+    val topicMessageStreams =
+      zkConsumerConnector.createMessageStreams(
+        toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+
+    var receivedMessages: List[String] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessages * 2) {
+          assertTrue(iterator.hasNext())
+          val message = iterator.next()
+          receivedMessages ::= message
+          logger.debug("received message: " + message)
+        }
+      }
+    }
+    receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1)
+    assertEquals(sentMessages, receivedMessages)
+
+    zkConsumerConnector.shutdown()
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
   def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
     var messages: List[Message] = Nil
     val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost", conf.port))
@@ -247,7 +269,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream]])
+  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream[Message]]])
   : List[Message]= {
     var messages: List[Message] = Nil
     val topicMessageStreams = asMap(jTopicMessageStreams)
diff --git examples/src/main/java/kafka/examples/Consumer.java examples/src/main/java/kafka/examples/Consumer.java
index f63b0df..8deda4f 100644
--- examples/src/main/java/kafka/examples/Consumer.java
+++ examples/src/main/java/kafka/examples/Consumer.java
@@ -25,12 +25,14 @@ import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaMessageStream;
 import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.Message;
+
 
 public class Consumer extends Thread
 {
-  private final ConsumerConnector consumer;
+  private final ConsumerConnector<Message> consumer;
   private final String topic;
-  
+
   public Consumer(String topic)
   {
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
@@ -53,9 +55,9 @@ public class Consumer extends Thread
   public void run() {
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
     topicCountMap.put(topic, new Integer(1));
-    Map<String, List<KafkaMessageStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
-    KafkaMessageStream stream =  consumerMap.get(topic).get(0);
-    ConsumerIterator it = stream.iterator();
+    Map<String, List<KafkaMessageStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    KafkaMessageStream<Message> stream =  consumerMap.get(topic).get(0);
+    ConsumerIterator<Message> it = stream.iterator();
     while(it.hasNext())
       System.out.println(ExampleUtils.getMessage(it.next()));
   }
