Index: core/src/main/scala/kafka/tools/KafkaMigrationTool.java
===================================================================
--- core/src/main/scala/kafka/tools/KafkaMigrationTool.java	(revision 1410624)
+++ core/src/main/scala/kafka/tools/KafkaMigrationTool.java	(working copy)
@@ -19,7 +19,6 @@
 
 import joptsimple.*;
 import kafka.javaapi.producer.Producer;
-import kafka.message.Message;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import kafka.utils.Utils;
@@ -181,8 +180,12 @@
 
       Constructor ConsumerConfigConstructor_07 = ConsumerConfig_07.getConstructor(Properties.class);
       Properties kafkaConsumerProperties_07 = new Properties();
-
       kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
+      /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
+      if(kafkaConsumerProperties_07.getProperty("shallowiterator.enable", "").equals("true")){
+        logger.warn("Shallow iterator should not be used in the migration tool");
+        kafkaConsumerProperties_07.setProperty("shallowiterator.enable", "false");
+      }
       Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
 
       /** Construct the 07 consumer connector **/
