From 9efdd8b5f832827c65349b2f2c386f4286635b03 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 15 Apr 2013 13:42:30 -0700
Subject: [PATCH] invalid message handling

---
 .../scala/kafka/message/ByteBufferMessageSet.scala |    2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |    5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 03590ad..ef2cd1c 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -154,7 +154,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
           return allDone()
         val offset = topIter.getLong()
         val size = topIter.getInt()
-        if(size < 0)
+        if(size <= 0)
           throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
         
         // we have an incomplete message
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b6845e4..dbdabdd 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -19,8 +19,7 @@ package kafka.server
 
 import kafka.cluster.Broker
 import collection.mutable
-import kafka.message.ByteBufferMessageSet
-import kafka.message.MessageAndOffset
+import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
@@ -131,6 +130,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                     processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                   } catch {
+                    case ime: InvalidMessageException =>
+                      logger.warn("Found invalid messages during fetch for topic " + topic + " partition " + partitionId + " offset " + currentOffset.get + " error " + ime.getMessage)
                     case e =>
                       throw new KafkaException("error processing data for topic %s partititon %d offset %d"
                                                .format(topic, partitionId, currentOffset.get), e)
-- 
1.7.1

