diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
index f03120a..1c18832 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
@@ -29,8 +29,7 @@ import kafka.common.ErrorMapping;
 import kafka.javaapi.MultiFetchResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndMetadata;
+import kafka.message.MessageAndOffset;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -59,7 +58,7 @@ public class KafkaETLContext {
     protected long _count; /*current count*/
 
     protected MultiFetchResponse _response = null;  /*fetch response*/
-    protected Iterator<MessageAndMetadata<Message>> _messageIt = null; /*message iterator*/
+    protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
     protected Iterator<ByteBufferMessageSet> _respIterator = null;
     protected int _retry = 0;
     protected long _requestTime = 0; /*accumulative request time*/
@@ -138,7 +137,7 @@ public class KafkaETLContext {
             while ( !gotNext && _respIterator.hasNext()) {
                 ByteBufferMessageSet msgSet = _respIterator.next();
                 if ( hasError(msgSet)) return false;
-                _messageIt =  (Iterator<MessageAndMetadata<Message>>) msgSet.iterator();
+                _messageIt = msgSet.iterator();
                 gotNext = get(key, value);
             }
         }
@@ -189,17 +188,17 @@ public class KafkaETLContext {
     
     protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
         if (_messageIt != null && _messageIt.hasNext()) {
-            MessageAndMetadata<Message> msgAndMetadata = _messageIt.next();
+            MessageAndOffset messageAndOffset = _messageIt.next();
             
-            ByteBuffer buf = msgAndMetadata.message().payload();
+            ByteBuffer buf = messageAndOffset.message().payload();
             int origSize = buf.remaining();
             byte[] bytes = new byte[origSize];
           buf.get(bytes, buf.position(), origSize);
             value.set(bytes, 0, origSize);
             
-            key.set(_index, _offset, msgAndMetadata.message().checksum());
+            key.set(_index, _offset, messageAndOffset.message().checksum());
             
-            _offset = msgAndMetadata.offset();  //increase offset
+            _offset = messageAndOffset.offset();  //increase offset
             _count ++;  //increase count
             
             return true;
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
index 69aafed..ddd6b72 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
@@ -16,6 +16,7 @@
  */
 package kafka.etl;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.Map;
@@ -23,13 +24,13 @@ import kafka.consumer.SimpleConsumer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
 
 
 @SuppressWarnings("deprecation")
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
index 5b8e77d..1a4bcba 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java
@@ -16,13 +16,13 @@
  */
 package kafka.etl;
 
+
 import java.net.URI;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputs;
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
index e448366..aafecea 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java
@@ -16,11 +16,11 @@
  */
 package kafka.etl;
 
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import org.apache.hadoop.io.WritableComparable;
-import kafka.etl.KafkaETLKey;
 
 public class KafkaETLKey implements WritableComparable<KafkaETLKey>{
 
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
index 9691b94..02d79a1 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java
@@ -17,6 +17,7 @@
 
 package kafka.etl;
 
+
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -33,7 +34,6 @@ import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
index ba9646b..5166358 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
@@ -17,32 +17,24 @@
 
 package kafka.etl.impl;
 
-import java.io.IOException;
+
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
-import java.util.Map.Entry;
 import java.util.Properties;
-
-import kafka.message.NoCompressionCodec;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.JobConf;
-
+import java.util.Random;
 import kafka.etl.KafkaETLKey;
 import kafka.etl.KafkaETLRequest;
-import kafka.etl.KafkaETLUtils;
 import kafka.etl.Props;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
 import kafka.javaapi.producer.SyncProducer;
+import kafka.message.Message;
 import kafka.producer.SyncProducerConfig;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Use this class to produce test events to Kafka server. Each event contains a
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
index 815c13f..5acbcee 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java
@@ -16,8 +16,9 @@
  */
 package kafka.bridge.examples;
 
-import kafka.bridge.hadoop.KafkaOutputFormat;
 
+import java.io.IOException;
+import kafka.bridge.hadoop.KafkaOutputFormat;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -27,8 +28,6 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
-import java.io.IOException;
-
 public class TextPublisher
 {
   public static void main(String[] args) throws Exception
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index 5733267..4b9343f 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -16,24 +16,26 @@
  */
 package kafka.bridge.hadoop;
 
-import java.util.Properties;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.Properties;
 import kafka.javaapi.producer.Producer;
 import kafka.message.Message;
 import kafka.producer.ProducerConfig;
-
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
-import java.net.URI;
-
 public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<NullWritable, W>
 {
   private Logger log = Logger.getLogger(KafkaOutputFormat.class);
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
index efb2b5e..af9c650 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -16,19 +16,18 @@
  */
 package kafka.bridge.hadoop;
 
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import kafka.javaapi.producer.Producer;
 import kafka.javaapi.producer.ProducerData;
 import kafka.message.Message;
-
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
 public class KafkaRecordWriter<W extends BytesWritable> extends RecordWriter<NullWritable, W>
 {
   protected Producer<Integer, Message> producer;
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
index cac634b..faa1950 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java
@@ -16,9 +16,12 @@
  */
 package kafka.bridge.pig;
 
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import kafka.bridge.hadoop.KafkaOutputFormat;
 import kafka.bridge.hadoop.KafkaRecordWriter;
-
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Encoder;
 import org.apache.hadoop.fs.Path;
@@ -33,10 +36,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
 import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
 public class AvroKafkaStorage extends StoreFunc
 {
   protected KafkaRecordWriter writer;
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 35320d2..1c7033f 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -20,7 +20,6 @@ package kafka.consumer
 import java.util.Properties
 import kafka.utils.{ZKConfig, Utils}
 import kafka.api.OffsetRequest
-import kafka.common.InvalidConfigException
 object ConsumerConfig {
   val SocketTimeout = 30 * 1000
   val SocketBufferSize = 64*1024
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index ba48b56..94cb2f1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -30,27 +30,27 @@ trait ConsumerConnector {
    *
    *  @param topicCountMap  a map of (topic, #streams) pair
    *  @param decoder Decoder to decode each Message to type T
-   *  @return a map of (topic, list of  KafkaMessageAndMetadataStream) pairs.
+   *  @return a map of (topic, list of  KafkaStream) pairs.
    *          The number of items in the list is #streams. Each stream supports
    *          an iterator over message/metadata pairs.
    */
   def createMessageStreams[T](topicCountMap: Map[String,Int],
                               decoder: Decoder[T] = new DefaultDecoder)
-    : Map[String,List[KafkaMessageAndMetadataStream[T]]]
+    : Map[String,List[KafkaStream[T]]]
 
   /**
    *  Create a list of message streams for all topics that match a given filter.
    *
-   *  @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec object.
+   *  @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
    *  @param numStreams Number of streams to return
    *  @param decoder Decoder to decode each Message to type T
-   *  @return a list of KafkaMessageAndMetadataStream each of which provides an
+   *  @return a list of KafkaStream each of which provides an
    *          iterator over message/metadata pairs over allowed topics.
    */
-  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
+  def createMessageStreamsByFilter[T](topicFilter: TopicFilter,
                                       numStreams: Int = 1,
                                       decoder: Decoder[T] = new DefaultDecoder)
-    : Seq[KafkaMessageAndMetadataStream[T]]
+    : Seq[KafkaStream[T]]
 
   /**
    *  Commit the offsets of all broker partitions connected by this connector.
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index f89c130..6007d36 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -21,7 +21,7 @@ import kafka.utils.{IteratorTemplate, Logging}
 import java.util.concurrent.{TimeUnit, BlockingQueue}
 import kafka.serializer.Decoder
 import java.util.concurrent.atomic.AtomicReference
-import kafka.message.{Message, MessageAndMetadata}
+import kafka.message.{MessageAndOffset, MessageAndMetadata}
 
 
 /**
@@ -34,7 +34,7 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
                           private val decoder: Decoder[T])
   extends IteratorTemplate[MessageAndMetadata[T]] with Logging {
 
-  private var current: AtomicReference[Iterator[MessageAndMetadata[Message]]] = new AtomicReference(null)
+  private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
   private var currentTopicInfo:PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
 
diff --git a/core/src/main/scala/kafka/consumer/Fetcher.scala b/core/src/main/scala/kafka/consumer/Fetcher.scala
index cff2ec4..5e65df9 100644
--- a/core/src/main/scala/kafka/consumer/Fetcher.scala
+++ b/core/src/main/scala/kafka/consumer/Fetcher.scala
@@ -44,7 +44,7 @@ private [consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkCl
 
   def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
                             queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
-                            messageStreams: Map[String,List[KafkaMessageAndMetadataStream[_]]]) {
+                            messageStreams: Map[String,List[KafkaStream[_]]]) {
 
     // Clear all but the currently iterated upon chunk in the consumer thread's queue
     queuesTobeCleared.foreach(_.clear)
diff --git a/core/src/main/scala/kafka/consumer/KafkaMessageAndMetadataStream.scala b/core/src/main/scala/kafka/consumer/KafkaMessageAndMetadataStream.scala
deleted file mode 100644
index 677dc19..0000000
--- a/core/src/main/scala/kafka/consumer/KafkaMessageAndMetadataStream.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.consumer
-
-
-import java.util.concurrent.BlockingQueue
-import kafka.serializer.Decoder
-import kafka.message.MessageAndMetadata
-
-
-class KafkaMessageAndMetadataStream[T](private val queue: BlockingQueue[FetchedDataChunk],
-                                    consumerTimeoutMs: Int,
-                                    private val decoder: Decoder[T])
-   extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] {
-
-  private val iter: ConsumerIterator[T] =
-    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder)
-
-  /**
-   *  Create an iterator over messages in the stream.
-   */
-  def iterator(): ConsumerIterator[T] = iter
-
-  /**
-   * This method clears the queue being iterated during the consumer rebalancing. This is mainly
-   * to reduce the number of duplicates received by the consumer
-   */
-  def clear() {
-    iter.clearCurrentChunk()
-  }
-
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
new file mode 100644
index 0000000..a9a52c5
--- /dev/null
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+
+import java.util.concurrent.BlockingQueue
+import kafka.serializer.Decoder
+import kafka.message.MessageAndMetadata
+
+
+class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk],
+                                    consumerTimeoutMs: Int,
+                                    private val decoder: Decoder[T])
+   extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] {
+
+  private val iter: ConsumerIterator[T] =
+    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder)
+
+  /**
+   *  Create an iterator over messages in the stream.
+   */
+  def iterator(): ConsumerIterator[T] = iter
+
+  /**
+   * This method clears the queue being iterated during the consumer rebalancing. This is mainly
+   * to reduce the number of duplicates received by the consumer
+   */
+  def clear() {
+    iter.clearCurrentChunk()
+  }
+
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 95b161a..c60f5a3 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -48,6 +48,7 @@ private[kafka] object TopicCount extends Logging {
   /*
    * Example of whitelist topic count stored in ZooKeeper:
    * Topics with whitetopic as prefix, and four streams: *4*whitetopic.*
+   *
    * Example of blacklist topic count stored in ZooKeeper:
    * Topics with blacktopic as prefix, and four streams: !4!blacktopic.*
    */
@@ -55,9 +56,9 @@ private[kafka] object TopicCount extends Logging {
   val WHITELIST_MARKER = "*"
   val BLACKLIST_MARKER = "!"
   private val WHITELIST_PATTERN =
-    Pattern.compile("\\*(\\p{Digit}+)\\*(.*)")
+    Pattern.compile("""\*(\p{Digit}+)\*(.*)""")
   private val BLACKLIST_PATTERN =
-    Pattern.compile("!(\\p{Digit}+)!(.*)")
+    Pattern.compile("""!(\p{Digit}+)!(.*)""")
 
   val myConversionFunc = {input : String => input.toInt}
   JSON.globalNumberParser = myConversionFunc
@@ -84,9 +85,9 @@ private[kafka] object TopicCount extends Logging {
       val numStreams = matcher.group(1).toInt
       val regex = matcher.group(2)
       val filter = if (hasWhitelist)
-        new TopicFilter(new Whitelist(regex))
+        new Whitelist(regex)
       else
-        new TopicFilter(new Blacklist(regex))
+        new Blacklist(regex)
 
       new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
     }
@@ -167,12 +168,12 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
   }
 
   def dbString = {
-    val marker = if (topicFilter.usesWhitelist)
-      TopicCount.WHITELIST_MARKER
-    else
-      TopicCount.BLACKLIST_MARKER
+    val marker = topicFilter match {
+      case wl: Whitelist => TopicCount.WHITELIST_MARKER
+      case bl: Blacklist => TopicCount.BLACKLIST_MARKER
+    }
 
-    "%s%d%s%s".format(marker, numStreams, marker, topicFilter.filterSpec.regex)
+    "%s%d%s%s".format(marker, numStreams, marker, topicFilter.regex)
   }
 
 }
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index ecadc05..cf3853b 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -22,13 +22,14 @@ import kafka.utils.Logging
 import java.util.regex.{PatternSyntaxException, Pattern}
 
 
-sealed abstract class TopicFilterSpec(rawRegex: String) {
+sealed abstract class TopicFilter(rawRegex: String) extends Logging {
+
   val regex = rawRegex
           .trim
           .replace(',', '|')
           .replace(" ", "")
-          .replaceAll("^[\"\']+","")
-          .replaceAll("[\"\']+$","") // property files may bring quotes
+          .replaceAll("""^["']+""","")
+          .replaceAll("""["']+$""","") // property files may bring quotes
 
   try {
     Pattern.compile(regex)
@@ -38,42 +39,38 @@ sealed abstract class TopicFilterSpec(rawRegex: String) {
       throw new RuntimeException(regex + " is an invalid regex.")
   }
 
-}
+  override def toString = regex
+
+  def requiresTopicEventWatcher: Boolean
 
-case class Whitelist(rawRegex: String) extends TopicFilterSpec(rawRegex) {
-  def isTrivial = regex.matches("[\\p{Alnum}-|]+")
+  def isTopicAllowed(topic: String): Boolean
 }
 
-case class Blacklist(rawRegex: String) extends TopicFilterSpec(rawRegex)
+case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
+  override def requiresTopicEventWatcher = !regex.matches("""[\p{Alnum}-|]+""")
 
-class TopicFilter(val filterSpec: TopicFilterSpec) extends Logging {
+  override def isTopicAllowed(topic: String) = {
+    val allowed = topic.matches(regex)
+
+    debug("%s %s".format(
+      topic, if (allowed) "allowed" else "filtered"))
 
-  def usesWhitelist = filterSpec match {
-    case wl: Whitelist => true
-    case bl: Blacklist => false
+    allowed
   }
 
-  def usesBlacklist = !usesWhitelist
 
-  def requiresTopicEventWatcher = {
-    filterSpec match {
-      case wl: Whitelist => wl.isTrivial
-      case _ => false
-    }
-  }
+}
 
-  def isTopicAllowed(topic: String) = {
-    val allowed = if (usesWhitelist)
-      topic.matches(filterSpec.regex)
-    else
-      !topic.matches(filterSpec.regex)
+case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
+  override def requiresTopicEventWatcher = true
+
+  override def isTopicAllowed(topic: String) = {
+    val allowed = !topic.matches(regex)
 
     debug("%s %s".format(
       topic, if (allowed) "allowed" else "filtered"))
 
     allowed
   }
-
-  override def toString = filterSpec.regex
 }
 
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 5133717..fd1971e 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -130,15 +130,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   def createMessageStreams[T](topicCountMap: Map[String,Int],
                               decoder: Decoder[T])
-      : Map[String,List[KafkaMessageAndMetadataStream[T]]] = {
+      : Map[String,List[KafkaStream[T]]] = {
     if (messageStreamCreated.getAndSet(true))
       throw new RuntimeException(this.getClass.getSimpleName +
                                    " can create message streams at most once")
     consume(topicCountMap, decoder)
   }
 
-  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec, numStreams: Int, decoder: Decoder[T]) = {
-    val wildcardStreamsHandler = new WildcardStreamsHandler[T](filterSpec, numStreams, decoder)
+  def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = {
+    val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder)
     wildcardStreamsHandler.streams
   }
 
@@ -183,7 +183,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   def consume[T](topicCountMap: scala.collection.Map[String,Int],
                  decoder: Decoder[T])
-      : Map[String,List[KafkaMessageAndMetadataStream[T]]] = {
+      : Map[String,List[KafkaStream[T]]] = {
     debug("entering consume ")
     if (topicCountMap == null)
       throw new RuntimeException("topicCountMap is null")
@@ -196,15 +196,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
-        val stream = new KafkaMessageAndMetadataStream[T](
+        val stream = new KafkaStream[T](
           queue, config.consumerTimeoutMs, decoder)
         (queue, stream)
       })
     ).flatten.toList
 
+    val dirs = new ZKGroupDirs(config.groupId)
+    registerConsumerInZK(dirs, consumerIdString, topicCount)
     reinitializeConsumer(topicCount, queuesAndStreams)
 
-    loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaMessageAndMetadataStream[T]]]]
+    loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]]
   }
 
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
@@ -375,7 +377,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   class ZKRebalancerListener(val group: String, val consumerIdString: String,
-                             val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaMessageAndMetadataStream[_]]])
+                             val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
     extends IZkChildListener {
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
@@ -543,7 +545,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
 
     private def closeFetchersForQueues(cluster: Cluster,
-                                       messageStreams: Map[String,List[KafkaMessageAndMetadataStream[_]]],
+                                       messageStreams: Map[String,List[KafkaStream[_]]],
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
@@ -563,7 +565,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       }
     }
 
-    private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaMessageAndMetadataStream[_]]],
+    private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
                               relevantTopicThreadIdsMap: Map[String, Set[String]]) {
       // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
       // after this rebalancing attempt
@@ -655,38 +657,42 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
   }
 
-  private def reinitializeConsumer[T](topicCount: TopicCount,
-                                      queuesAndStreams:
-                                        List[Tuple2[
-                                          LinkedBlockingQueue[FetchedDataChunk],
-                                          KafkaMessageAndMetadataStream[T]]]) {
+  private def reinitializeConsumer[T](
+      topicCount: TopicCount,
+      queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) {
+
     val dirs = new ZKGroupDirs(config.groupId)
 
     // listener to consumer and partition changes
     if (loadBalancerListener == null) {
-      val topicStreamsMap = new mutable.HashMap[String,List[KafkaMessageAndMetadataStream[T]]]
+      val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]]
       loadBalancerListener = new ZKRebalancerListener(
-        config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaMessageAndMetadataStream[_]]]])
+        config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]])
     }
 
-    registerConsumerInZK(dirs, consumerIdString, topicCount)
-
     // register listener for session expired event
     if (sessionExpirationListener == null)
       sessionExpirationListener = new ZKSessionExpireListener(
         dirs, consumerIdString, topicCount, loadBalancerListener)
 
     val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
-    val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
+
+    // map of {topic -> Set(thread-1, thread-2, ...)}
+    val consumerThreadIdsPerTopic: Map[String, Set[String]] =
+      topicCount.getConsumerThreadIdsPerTopic
 
     /*
      * This usage of map flatten breaks up consumerThreadIdsPerTopic into
-     * a list of (topic, thread-id) pairs that we then use to construct
+     * a set of (topic, thread-id) pairs that we then use to construct
      * the updated (topic, thread-id) -> queues map
      */
-    implicit def getTopicThreadIds(v: (String, Set[String])) = v._2.map((v._1, _))
-    val topicThreadIds = consumerThreadIdsPerTopic.flatten.toList
+    implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1, _))
 
+    // iterator over (topic, thread-id) tuples
+    val topicThreadIds: Iterable[(String, String)] =
+      consumerThreadIdsPerTopic.flatten
+
+    // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream))
     val threadQueueStreamPairs = topicCount match {
       case wildTopicCount: WildcardTopicCount =>
         for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs)
@@ -727,7 +733,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     loadBalancerListener.syncedRebalance()
   }
 
-  class WildcardStreamsHandler[T](filterSpec: TopicFilterSpec,
+  class WildcardStreamsHandler[T](topicFilter: TopicFilter,
                                   numStreams: Int,
                                   decoder: Decoder[T])
                                 extends TopicEventHandler[String] {
@@ -739,12 +745,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private val wildcardQueuesAndStreams = (1 to numStreams)
       .map(e => {
         val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
-        val stream = new KafkaMessageAndMetadataStream[T](
+        val stream = new KafkaStream[T](
           queue, config.consumerTimeoutMs, decoder)
         (queue, stream)
     }).toList
 
-    private val topicFilter = new TopicFilter(filterSpec)
      // bootstrap with existing topics
     private var wildcardTopics =
       getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
@@ -753,13 +758,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private val wildcardTopicCount = TopicCount.constructTopicCount(
       consumerIdString, topicFilter, numStreams, zkClient)
 
+    val dirs = new ZKGroupDirs(config.groupId)
+    registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
     reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
 
-    if (topicFilter.requiresTopicEventWatcher) {
-      info("Not creating event watcher for trivial whitelist " + filterSpec)
+    if (!topicFilter.requiresTopicEventWatcher) {
+      info("Not creating event watcher for trivial whitelist " + topicFilter)
     }
     else {
-      info("Creating topic event watcher for whitelist " + filterSpec)
+      info("Creating topic event watcher for whitelist " + topicFilter)
       wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this)
 
       /*
@@ -780,10 +787,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                              .format(addedTopics))
 
       /*
-       * Deleted topics are interesting (and will not be a concern until 0.8
-       * release): if topics have been deleted, we do not need to unsubscribe
-       * watching their child changes, since if they come back, we do want to
-       * get notified.
+       * TODO: Deleted topics are interesting (and will not be a concern until
+       * 0.8 release). We may need to remove these topics from the rebalance
+       * listener's map in reinitializeConsumer.
        */
       val deletedTopics = wildcardTopics filterNot (updatedTopics contains)
       if (deletedTopics.nonEmpty)
@@ -797,7 +803,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
     }
 
-    def streams: Seq[KafkaMessageAndMetadataStream[T]] =
+    def streams: Seq[KafkaStream[T]] =
       wildcardQueuesAndStreams.map(_._2)
   }
 }
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index 3e445e4..afb6b0a 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -20,8 +20,8 @@ package kafka.javaapi.consumer;
 
 import java.util.List;
 import java.util.Map;
-import kafka.consumer.KafkaMessageAndMetadataStream;
-import kafka.consumer.TopicFilterSpec;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.TopicFilter;
 import kafka.message.Message;
 import kafka.serializer.Decoder;
 
@@ -31,31 +31,31 @@ public interface ConsumerConnector {
    *
    *  @param topicCountMap  a map of (topic, #streams) pair
    *  @param decoder a decoder that converts from Message to T
-   *  @return a map of (topic, list of  KafkaMessageAndMetadataStream) pairs.
+   *  @return a map of (topic, list of  KafkaStream) pairs.
    *          The number of items in the list is #streams. Each stream supports
    *          an iterator over message/metadata pairs.
    */
-  public <T> Map<String, List<KafkaMessageAndMetadataStream<T>>> createMessageStreams(
+  public <T> Map<String, List<KafkaStream<T>>> createMessageStreams(
       Map<String, Integer> topicCountMap, Decoder<T> decoder);
-  public Map<String, List<KafkaMessageAndMetadataStream<Message>>> createMessageStreams(
+  public Map<String, List<KafkaStream<Message>>> createMessageStreams(
       Map<String, Integer> topicCountMap);
 
   /**
    *  Create a list of MessageAndTopicStreams containing messages of type T.
    *
-   *  @param filterSpec a TopicFilterSpec that specifies which topics to
+   *  @param topicFilter a TopicFilter that specifies which topics to
    *                    subscribe to (encapsulates a whitelist or a blacklist).
    *  @param numStreams the number of message streams to return.
    *  @param decoder a decoder that converts from Message to T
-   *  @return a list of KafkaMessageAndMetadataStream. Each stream supports an
+   *  @return a list of KafkaStream. Each stream supports an
    *          iterator over its MessageAndMetadata elements.
    */
-  public <T> List<KafkaMessageAndMetadataStream<T>> createMessageStreamsByFilter(
-      TopicFilterSpec filterSpec, int numStreams, Decoder<T> decoder);
-  public List<KafkaMessageAndMetadataStream<Message>> createMessageStreamsByFilter(
-      TopicFilterSpec filterSpec, int numStreams);
-  public List<KafkaMessageAndMetadataStream<Message>> createMessageStreamsByFilter(
-      TopicFilterSpec filterSpec);
+  public <T> List<KafkaStream<T>> createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams, Decoder<T> decoder);
+  public List<KafkaStream<Message>> createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams);
+  public List<KafkaStream<Message>> createMessageStreamsByFilter(
+      TopicFilter topicFilter);
 
   /**
    *  Commit the offsets of all broker partitions connected by this connector.
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index f9e9414..f1a469b 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -70,14 +70,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def createMessageStreams[T](
         topicCountMap: java.util.Map[String,java.lang.Integer],
         decoder: Decoder[T])
-      : java.util.Map[String,java.util.List[KafkaMessageAndMetadataStream[T]]] = {
+      : java.util.Map[String,java.util.List[KafkaStream[T]]] = {
     import scala.collection.JavaConversions._
 
     val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
     val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
-    val ret = new java.util.HashMap[String,java.util.List[KafkaMessageAndMetadataStream[T]]]
+    val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
     for ((topic, streams) <- scalaReturn) {
-      var javaStreamList = new java.util.ArrayList[KafkaMessageAndMetadataStream[T]]
+      var javaStreamList = new java.util.ArrayList[KafkaStream[T]]
       for (stream <- streams)
         javaStreamList.add(stream)
       ret.put(topic, javaStreamList)
@@ -87,17 +87,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   def createMessageStreams(
         topicCountMap: java.util.Map[String,java.lang.Integer])
-      : java.util.Map[String,java.util.List[KafkaMessageAndMetadataStream[Message]]] =
+      : java.util.Map[String,java.util.List[KafkaStream[Message]]] =
     createMessageStreams(topicCountMap, new DefaultDecoder)
 
-  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec, numStreams: Int, decoder: Decoder[T]) =
-    asList(underlying.createMessageStreamsByFilter(filterSpec, numStreams, decoder))
+  def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
+    asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
 
-  def createMessageStreamsByFilter(filterSpec: TopicFilterSpec, numStreams: Int) =
-    createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder)
+  def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
+    createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
 
-  def createMessageStreamsByFilter(filterSpec: TopicFilterSpec) =
-    createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder)
+  def createMessageStreamsByFilter(topicFilter: TopicFilter) =
+    createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder)
 
   def commitOffsets() {
     underlying.commitOffsets
diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index 2d1b5f1..7ebeb9c 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -47,13 +47,13 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
 
   def getErrorCode = errorCode
 
-  override def iterator: java.util.Iterator[MessageAndMetadata[Message]] = new java.util.Iterator[MessageAndMetadata[Message]] {
+  override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
     val underlyingIterator = underlying.iterator
     override def hasNext(): Boolean = {
       underlyingIterator.hasNext
     }
 
-    override def next(): MessageAndMetadata[Message] = {
+    override def next(): MessageAndOffset = {
       underlyingIterator.next
     }
 
diff --git a/core/src/main/scala/kafka/javaapi/message/MessageSet.scala b/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
index cd4b5a9..9c9c72f 100644
--- a/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
@@ -18,7 +18,7 @@
 package kafka.javaapi.message
 
 
-import kafka.message.{MessageAndMetadata, Message, InvalidMessageException}
+import kafka.message.{MessageAndOffset, InvalidMessageException}
 
 
 /**
@@ -28,12 +28,12 @@ import kafka.message.{MessageAndMetadata, Message, InvalidMessageException}
  * 4 byte size containing an integer N
  * N message bytes as described in the message class
  */
-abstract class MessageSet extends java.lang.Iterable[MessageAndMetadata[Message]] {
+abstract class MessageSet extends java.lang.Iterable[MessageAndOffset] {
 
   /**
    * Provides an iterator over the messages in this set
    */
-  def iterator: java.util.Iterator[MessageAndMetadata[Message]]
+  def iterator: java.util.Iterator[MessageAndOffset]
 
   /**
    * Gives the total size of this message set in bytes
@@ -47,8 +47,8 @@ abstract class MessageSet extends java.lang.Iterable[MessageAndMetadata[Message]
   def validate(): Unit = {
     val thisIterator = this.iterator
     while(thisIterator.hasNext) {
-      val messageAndMetadata = thisIterator.next
-      if(!messageAndMetadata.message.isValid)
+      val messageAndOffset = thisIterator.next
+      if(!messageAndOffset.message.isValid)
         throw new InvalidMessageException
     }
   }
diff --git a/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java b/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java
index 0e0df3d..2b93974 100644
--- a/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java
+++ b/core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java
@@ -16,9 +16,9 @@
 */
 package kafka.javaapi.producer.async;
 
-import kafka.producer.async.QueueItem;
 
 import java.util.Properties;
+import kafka.producer.async.QueueItem;
 
 /**
  * Callback handler APIs for use in the async producer. The purpose is to
diff --git a/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java b/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java
index 381f8e2..842799d 100644
--- a/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java
+++ b/core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java
@@ -16,12 +16,12 @@
 */
 package kafka.javaapi.producer.async;
 
-import kafka.javaapi.producer.SyncProducer;
-import kafka.producer.async.QueueItem;
-import kafka.serializer.Encoder;
 
 import java.util.List;
 import java.util.Properties;
+import kafka.javaapi.producer.SyncProducer;
+import kafka.producer.async.QueueItem;
+import kafka.serializer.Encoder;
 
 /**
  * Handler that dispatches the batched data from the queue of the
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 1ed4f00..2f8738b 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -62,8 +62,8 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
     if(shallowValidByteCount < 0) {
       val iter = this.internalIterator(true)
       while(iter.hasNext) {
-        val messageAndMetadata = iter.next
-        shallowValidByteCount = messageAndMetadata.offset
+        val messageAndOffset = iter.next
+        shallowValidByteCount = messageAndOffset.offset
       }
     }
     if(shallowValidByteCount < initialOffset) 0
@@ -78,31 +78,31 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
     written
   }
   
-  override def iterator: Iterator[MessageAndMetadata[Message]] = internalIterator()
+  override def iterator: Iterator[MessageAndOffset] = internalIterator()
 
 
   def verifyMessageSize(maxMessageSize: Int){
     var shallowIter = internalIterator(true)
     while(shallowIter.hasNext){
-      var messageAndMetadata = shallowIter.next
-      val payloadSize = messageAndMetadata.message.payloadSize
+      var messageAndOffset = shallowIter.next
+      val payloadSize = messageAndOffset.message.payloadSize
       if ( payloadSize > maxMessageSize)
         throw new MessageSizeTooLargeException("payload size of " + payloadSize + " larger than " + maxMessageSize)
     }
   }
 
   /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
-  private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndMetadata[Message]] = {
+  private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
     ErrorMapping.maybeThrowException(errorCode)
-    new IteratorTemplate[MessageAndMetadata[Message]] {
+    new IteratorTemplate[MessageAndOffset] {
       var topIter = buffer.slice()
       var currValidBytes = initialOffset
-      var innerIter:Iterator[MessageAndMetadata[Message]] = null
+      var innerIter:Iterator[MessageAndOffset] = null
       var lastMessageSize = 0L
 
       def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)
 
-      def makeNextOuter: MessageAndMetadata[Message] = {
+      def makeNextOuter: MessageAndOffset = {
         if (topIter.remaining < 4) {
           return allDone()
         }
@@ -127,7 +127,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
         if(isShallow){
           currValidBytes += 4 + size
           trace("shallow iterator currValidBytes = " + currValidBytes)
-          new MessageAndMetadata[Message](newMessage, offset = currValidBytes)
+          new MessageAndOffset(newMessage, currValidBytes)
         }
         else{
           newMessage.compressionCodec match {
@@ -138,7 +138,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
               innerIter = null
               currValidBytes += 4 + size
               trace("currValidBytes = " + currValidBytes)
-              new MessageAndMetadata[Message](newMessage, offset = currValidBytes)
+              new MessageAndOffset(newMessage, currValidBytes)
             case _ =>
               if(!newMessage.isValid)
                 throw new InvalidMessageException("Compressed message is invalid")
@@ -153,7 +153,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
         }
       }
 
-      override def makeNext(): MessageAndMetadata[Message] = {
+      override def makeNext(): MessageAndOffset = {
         if(isShallow){
           makeNextOuter
         }
@@ -163,11 +163,10 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
           isInnerDone match {
             case true => makeNextOuter
             case false => {
-              val messageAndMetadata = innerIter.next
+              val messageAndOffset = innerIter.next
               if (!innerIter.hasNext)
                 currValidBytes += 4 + lastMessageSize
-              new MessageAndMetadata[Message](
-                messageAndMetadata.message, offset = currValidBytes)
+              new MessageAndOffset(messageAndOffset.message, currValidBytes)
             }
           }
         }
diff --git a/core/src/main/scala/kafka/message/FileMessageSet.scala b/core/src/main/scala/kafka/message/FileMessageSet.scala
index 5602759..7c9b4f8 100644
--- a/core/src/main/scala/kafka/message/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/message/FileMessageSet.scala
@@ -104,11 +104,11 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
   /**
    * Get an iterator over the messages in the set
    */
-  override def iterator: Iterator[MessageAndMetadata[Message]] = {
-    new IteratorTemplate[MessageAndMetadata[Message]] {
+  override def iterator: Iterator[MessageAndOffset] = {
+    new IteratorTemplate[MessageAndOffset] {
       var location = offset
       
-      override def makeNext(): MessageAndMetadata[Message] = {
+      override def makeNext(): MessageAndOffset = {
         // read the size of the item
         val sizeBuffer = ByteBuffer.allocate(4)
         channel.read(sizeBuffer, location)
@@ -129,7 +129,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
         
         // increment the location and return the item
         location += size + 4
-        new MessageAndMetadata[Message](new Message(buffer), offset = location)
+        new MessageAndOffset(new Message(buffer), location)
       }
     }
   }
diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
index e5a7a1a..710308e 100644
--- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala
+++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala
@@ -17,5 +17,5 @@
 
 package kafka.message
 
-case class MessageAndMetadata[T](message: T, topic: String = "", offset: Long = -1L)
+case class MessageAndMetadata[T](message: T, topic: String = "")
 
diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala
new file mode 100644
index 0000000..d769fc6
--- /dev/null
+++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+
+case class MessageAndOffset(message: Message, offset: Long)
+
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index ba602d1..bf45d91 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -84,7 +84,7 @@ object MessageSet {
  * 4 byte size containing an integer N
  * N message bytes as described in the message class
  */
-abstract class MessageSet extends Iterable[MessageAndMetadata[Message]] {
+abstract class MessageSet extends Iterable[MessageAndOffset] {
 
   /** Write the messages in this set to the given channel starting at the given offset byte. 
     * Less than the complete amount may be written, but no more than maxSize can be. The number
@@ -94,7 +94,7 @@ abstract class MessageSet extends Iterable[MessageAndMetadata[Message]] {
   /**
    * Provides an iterator over the messages in this set
    */
-  def iterator: Iterator[MessageAndMetadata[Message]]
+  def iterator: Iterator[MessageAndOffset]
   
   /**
    * Gives the total size of this message set in bytes
@@ -106,8 +106,8 @@ abstract class MessageSet extends Iterable[MessageAndMetadata[Message]] {
    * match the payload for any message.
    */
   def validate(): Unit = {
-    for(messageAndMetadata <- this)
-      if(!messageAndMetadata.message.isValid)
+    for(messageAndOffset <- this)
+      if(!messageAndOffset.message.isValid)
         throw new InvalidMessageException
   }
   
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index 5279284..417da27 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -22,9 +22,7 @@ import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.AppenderSkeleton
 import org.apache.log4j.helpers.LogLog
 import kafka.utils.Logging
-import kafka.serializer.Encoder
 import java.util.{Properties, Date}
-import kafka.message.Message
 import scala.collection._
 
 class KafkaLog4jAppender extends AppenderSkeleton with Logging {
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index 7e0a9f5..dafa6d2 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -22,7 +22,7 @@ import kafka.utils._
 import java.util.Properties
 import kafka.cluster.{Partition, Broker}
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
+import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException}
 import kafka.api.ProducerRequest
 
 class Producer[K,V](config: ProducerConfig,
diff --git a/core/src/main/scala/kafka/tools/ConsumerShell.scala b/core/src/main/scala/kafka/tools/ConsumerShell.scala
index e8a3888..5eb5269 100644
--- a/core/src/main/scala/kafka/tools/ConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerShell.scala
@@ -82,7 +82,7 @@ object ConsumerShell {
   }
 }
 
-class ZKConsumerThread(stream: KafkaMessageAndMetadataStream[String]) extends Thread with Logging {
+class ZKConsumerThread(stream: KafkaStream[String]) extends Thread with Logging {
   val shutdownLatch = new CountDownLatch(1)
 
   override def run() {
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b60b249..98dd65d 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -97,6 +97,7 @@ object MirrorMaker extends Logging {
       Runtime.getRuntime.addShutdownHook(new Thread() {
         override def run() {
           connectors.foreach(_.shutdown())
+          producer.close()
         }
       })
 
@@ -118,7 +119,7 @@ object MirrorMaker extends Logging {
     threads.foreach(_.awaitShutdown())
   }
 
-  class MirrorMakerThread(stream: KafkaMessageAndMetadataStream[Message],
+  class MirrorMakerThread(stream: KafkaStream[Message],
                           producer: Producer[Null, Message],
                           threadId: Int)
           extends Thread with Logging {
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 40df889..1300cf6 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -151,7 +151,7 @@ object ReplayLogProducer extends Logging {
     }
   }
 
-  class ZKConsumerThread(config: Config, stream: KafkaMessageAndMetadataStream[Message]) extends Thread with Logging {
+  class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
     val props = new Properties()
     val brokerInfoList = config.brokerInfo.split("=")
diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
index 14beb33..fa709de 100644
--- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
+++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
@@ -56,7 +56,7 @@ object TestZKConsumerOffsets {
   }
 }
 
-private class ConsumerThread(stream: KafkaMessageAndMetadataStream[Message]) extends Thread {
+private class ConsumerThread(stream: KafkaStream[Message]) extends Thread {
   val shutdownLatch = new CountDownLatch(1)
 
   override def run() {
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 77a98b7..40a2bf7 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -28,28 +28,24 @@ class TopicFilterTest extends JUnitSuite {
   @Test
   def testWhitelists() {
 
-    val whitelist1 = new Whitelist("white1,white2")
-    val topicFilter1 = new TopicFilter(whitelist1)
-    assertTrue(topicFilter1.requiresTopicEventWatcher)
+    val topicFilter1 = new Whitelist("white1,white2")
+    assertFalse(topicFilter1.requiresTopicEventWatcher)
     assertTrue(topicFilter1.isTopicAllowed("white2"))
     assertFalse(topicFilter1.isTopicAllowed("black1"))
 
-    val whitelist2 = new Whitelist(".+")
-    val topicFilter2 = new TopicFilter(whitelist2)
-    assertFalse(topicFilter2.requiresTopicEventWatcher)
+    val topicFilter2 = new Whitelist(".+")
+    assertTrue(topicFilter2.requiresTopicEventWatcher)
     assertTrue(topicFilter2.isTopicAllowed("alltopics"))
     
-    val whitelist3 = new Whitelist("white_listed-topic.+")
-    val topicFilter3 = new TopicFilter(whitelist3)
-    assertFalse(topicFilter3.requiresTopicEventWatcher)
+    val topicFilter3 = new Whitelist("white_listed-topic.+")
+    assertTrue(topicFilter3.requiresTopicEventWatcher)
     assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1"))
     assertFalse(topicFilter3.isTopicAllowed("black1"))
   }
 
   @Test
   def testBlacklists() {
-    val blacklist1 = new Blacklist("black1")
-    val topicFilter1 = new TopicFilter(blacklist1)
-    assertFalse(topicFilter1.requiresTopicEventWatcher)
+    val topicFilter1 = new Blacklist("black1")
+    assertTrue(topicFilter1.requiresTopicEventWatcher)
   }
 }
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index bf759ba..0df05d3 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -270,7 +270,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageAndMetadataStream[Message]]]): List[Message]= {
+  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[Message]]]): List[Message]= {
     var messages: List[Message] = Nil
     for ((topic, messageStreams) <- topicMessageStreams) {
       for (messageStream <- messageStreams) {
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index f86e5cf..f7a4b15 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConversions._
 import kafka.javaapi.message.ByteBufferMessageSet
 import org.apache.log4j.{Level, Logger}
 import kafka.message.{NoCompressionCodec, CompressionCodec, Message}
-import kafka.consumer.{KafkaMessageAndMetadataStream, ConsumerConfig}
+import kafka.consumer.{KafkaStream, ConsumerConfig}
 
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
@@ -92,7 +92,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageAndMetadataStream[Message]]])
+  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[Message]]])
   : List[Message]= {
     var messages: List[Message] = Nil
     val topicMessageStreams = asMap(jTopicMessageStreams)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a536e40..25f6b49 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -290,7 +290,7 @@ object TestUtils {
 
   }
 
-  def getMessageIterator(iter: Iterator[MessageAndMetadata[Message]]): Iterator[Message] = {
+  def getMessageIterator(iter: Iterator[MessageAndOffset]): Iterator[Message] = {
     new IteratorTemplate[Message] {
       override def makeNext(): Message = {
         if (iter.hasNext)
diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java
index 509d802..cb01577 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Properties;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaMessageAndMetadataStream;
+import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.Message;
 
@@ -56,8 +56,8 @@ public class Consumer extends Thread
   public void run() {
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
     topicCountMap.put(topic, new Integer(1));
-    Map<String, List<KafkaMessageAndMetadataStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-    KafkaMessageAndMetadataStream<Message> stream =  consumerMap.get(topic).get(0);
+    Map<String, List<KafkaStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    KafkaStream<Message> stream =  consumerMap.get(topic).get(0);
     ConsumerIterator<Message> it = stream.iterator();
     while(it.hasNext())
       System.out.println(ExampleUtils.getMessage(it.next().message()));
diff --git a/examples/src/main/java/kafka/examples/ExampleUtils.java b/examples/src/main/java/kafka/examples/ExampleUtils.java
index c301a52..34fd1c0 100644
--- a/examples/src/main/java/kafka/examples/ExampleUtils.java
+++ b/examples/src/main/java/kafka/examples/ExampleUtils.java
@@ -16,8 +16,8 @@
  */
 package kafka.examples;
 
-import java.nio.ByteBuffer;
 
+import java.nio.ByteBuffer;
 import kafka.message.Message;
 
 public class ExampleUtils
diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java
index c663a54..353a7eb 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -16,9 +16,10 @@
  */
 package kafka.examples;
 
+
+import java.util.Properties;
 import kafka.javaapi.producer.ProducerData;
 import kafka.producer.ProducerConfig;
-import java.util.Properties;
 
 public class Producer extends Thread
 {
diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
index 3921197..c2b88da 100644
--- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
+++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
@@ -23,16 +23,15 @@ import kafka.api.FetchRequest;
 import kafka.javaapi.MultiFetchResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndMetadata;
+import kafka.message.MessageAndOffset;
 
 
 public class SimpleConsumerDemo
 {
   private static void printMessages(ByteBufferMessageSet messageSet)
   {
-    for (MessageAndMetadata<Message> messageAndMetadata : messageSet) {
-      System.out.println(ExampleUtils.getMessage(messageAndMetadata.message()));
+    for (MessageAndOffset messageAndOffset : messageSet) {
+      System.out.println(ExampleUtils.getMessage(messageAndOffset.message()));
     }
   }
 
diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index ee2fcb1..414c965 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -136,7 +136,7 @@ object ConsumerPerformance {
     val hideHeader = options.has(hideHeaderOpt)
   }
 
-  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaMessageAndMetadataStream[Message],
+  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Message],
                            config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
     extends Thread(name) {
     private val shutdownLatch = new CountDownLatch(1)
diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala
index 265caa8..db2c1a1 100644
--- a/perf/src/main/scala/kafka/perf/PerfConfig.scala
+++ b/perf/src/main/scala/kafka/perf/PerfConfig.scala
@@ -18,7 +18,7 @@
 package kafka.perf
 
 import joptsimple.OptionParser
-import java.text.SimpleDateFormat
+
 
 class PerfConfig(args: Array[String]) {
   val parser = new OptionParser
diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
index 02c3008..ca8df59 100644
--- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
@@ -18,9 +18,7 @@
 package kafka.perf
 
 import java.net.URI
-import joptsimple._
 import kafka.utils._
-import kafka.server._
 import kafka.consumer.SimpleConsumer
 import org.apache.log4j.Logger
 import kafka.api.{OffsetRequest, FetchRequest}
