From 9efdd8b5f832827c65349b2f2c386f4286635b03 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 15 Apr 2013 13:42:30 -0700
Subject: [PATCH 1/2] invalid message handling

---
 .../scala/kafka/message/ByteBufferMessageSet.scala |    2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |    5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 03590ad..ef2cd1c 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -154,7 +154,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
           return allDone()
         val offset = topIter.getLong()
         val size = topIter.getInt()
-        if(size < 0)
+        if(size <= 0)
           throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
         
         // we have an incomplete message
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b6845e4..dbdabdd 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,8 +19,7 @@ package kafka.server
 
 import kafka.cluster.Broker
 import collection.mutable
-import kafka.message.ByteBufferMessageSet
-import kafka.message.MessageAndOffset
+import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
@@ -131,6 +130,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                     processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                   } catch {
+                    case ime: InvalidMessageException =>
+                      logger.warn("Found invalid messages during fetch for topic " + topic + " partition " + partitionId + " offset " + currentOffset.get + " error " + ime.getMessage)
                     case e =>
                       throw new KafkaException("error processing data for topic %s partititon %d offset %d"
                                                .format(topic, partitionId, currentOffset.get), e)
-- 
1.7.1


From fe32299550269e6b9179fd072f891a2639c3eb59 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 15 Apr 2013 17:31:35 -0700
Subject: [PATCH 2/2] 1. add more logging

---
 .../scala/kafka/message/ByteBufferMessageSet.scala |    2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |    8 ++++++--
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index ef2cd1c..88691b5 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -154,7 +154,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
           return allDone()
         val offset = topIter.getLong()
         val size = topIter.getInt()
-        if(size <= 0)
+        if(size <= Message.MinHeaderSize)
           throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
         
         // we have an incomplete message
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index dbdabdd..fed3b86 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -131,9 +131,13 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                     processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                   } catch {
                     case ime: InvalidMessageException =>
-                      logger.warn("Found invalid messages during fetch for topic " + topic + " partition " + partitionId + " offset " + currentOffset.get + " error " + ime.getMessage)
+                      // we log the error and continue. This ensures two things
+                      // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
+                      // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
+                      //    should get fixed in the subsequent fetches
+                      logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
                     case e =>
-                      throw new KafkaException("error processing data for topic %s partititon %d offset %d"
+                      throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                                                .format(topic, partitionId, currentOffset.get), e)
                   }
                 case ErrorMapping.OffsetOutOfRangeCode =>
-- 
1.7.1

