From ff97b96fdf74815804355e2a5acfa87bfe250525 Mon Sep 17 00:00:00 2001
From: Colin Blower <cblower@barracuda.com>
Date: Tue, 7 May 2013 17:30:09 -0700
Subject: [PATCH] Fix NPE in ConsoleConsumer

---
 core/src/main/scala/kafka/consumer/ConsoleConsumer.scala  | 4 ++--
 core/src/main/scala/kafka/consumer/ConsumerIterator.scala | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index e2b0041..576ff47 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -282,10 +282,10 @@ class DefaultMessageFormatter extends MessageFormatter {
   
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
     if(printKey) {
-      output.write(key)
+      output.write(if (key == null) "null".getBytes() else key)
       output.write(keySeparator)
     }
-    output.write(value)
+    output.write(if (value == null) "null".getBytes() else value)
     output.write(lineSeparator)
   }
 }
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index b80c0b0..b198908 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -103,7 +103,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
 
     val keyBuffer = item.message.key
     val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
-    val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
+    val value = if(item.message.payload == null) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
     new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
   }
 
-- 
1.8.2.1

