Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(revision 15bb3961d9171c1c54c4c840a554ce2c76168163)
+++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala	(revision )
@@ -95,7 +95,7 @@
     } catch {
       case t: Throwable =>
         if (isRunning.get) {
-          warn("Error in fetch %s".format(fetchRequest), t)
+          error("Error in fetch %s".format(fetchRequest), t)
           partitionMapLock synchronized {
             partitionsWithError ++= partitionMap.keys
           }
@@ -132,7 +132,7 @@
                       // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       //    should get fixed in the subsequent fetches
-                      logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
+                      logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
                     case e: Throwable =>
                       throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                                                .format(topic, partitionId, currentOffset.get), e)
@@ -141,16 +141,16 @@
                   try {
                     val newOffset = handleOffsetOutOfRange(topicAndPartition)
                     partitionMap.put(topicAndPartition, newOffset)
-                    warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
+                    error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                       .format(currentOffset.get, topic, partitionId, newOffset))
                   } catch {
                     case e: Throwable =>
-                      warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
+                      error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                       partitionsWithError += topicAndPartition
                   }
                 case _ =>
                   if (isRunning.get) {
-                    warn("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
+                    error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
                       ErrorMapping.exceptionFor(partitionData.error).getClass))
                     partitionsWithError += topicAndPartition
                   }
