From 17c49b46194494396dd917c61d9b0996a0413693 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Wed, 23 Jan 2013 19:24:41 -0800
Subject: [PATCH] 	modified:   core/src/main/scala/kafka/tools/KafkaMigrationTool.java

---
 .../main/scala/kafka/tools/KafkaMigrationTool.java |   34 ++++++++++++-------
 1 files changed, 21 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index 1f5c7ba..3da3d79 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -276,22 +276,30 @@ public class KafkaMigrationTool
         Iterator<Producer> producerCircularIterator = Utils.circularIterator(JavaConversions.asBuffer(producers));
 
         while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()){
-          Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator);
-          Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07);
-          Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07);
-          Object payload_07 = MessageGetPayloadMethod_07.invoke(message_07);
-          int size = ((ByteBuffer)payload_07).remaining();
-          byte[] bytes = new byte[size];
-          ((ByteBuffer)payload_07).get(bytes);
-          logger.debug(String.format("Send kafka 08 message of size %d to topic %s", bytes.length, topic));
-          KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
-          Producer nextProducer = producerCircularIterator.next();
-          nextProducer.send(producerData);
+          try{
+            Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator);
+            Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07);
+            Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07);
+            Object payload_07 = MessageGetPayloadMethod_07.invoke(message_07);
+            int size = ((ByteBuffer)payload_07).remaining();
+            byte[] bytes = new byte[size];
+            ((ByteBuffer)payload_07).get(bytes);
+            logger.debug(String.format("Send kafka 08 message of size %d to topic %s", bytes.length, topic));
+            KeyedMessage<String, byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
+            Producer nextProducer = producerCircularIterator.next();
+            nextProducer.send(producerData);
+          }
+          catch (Exception e){
+            logger.error("Migration thread recoverable failure during consumption " + t);
+          }
+          catch (Throwable t){
+            logger.fatal("Migration thread irrecoverable failure during consumption " + t);
+            break;
+          }
         }
         logger.info(String.format("Migration thread %s finishes running", threadName));
       } catch (Throwable t){
-        System.out.println("Migration thread failure due to " + t);
-        t.printStackTrace(System.out);
+        logger.fatal("Migration thread failure during initialization " + t);
       }
     }
   }
-- 
1.7.1

