diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index a4227a4..ac491b4 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -101,10 +101,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk item.message.ensureValid() // validate checksum of message to ensure it is valid - val keyBuffer = item.message.key - val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer)) - val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload)) - new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset) + new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) } def clearCurrentChunk() { diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala index 20c0e70..d693abc 100644 --- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala +++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala @@ -17,5 +17,18 @@ package kafka.message -case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int, offset: Long) +import kafka.serializer.Decoder +import kafka.utils.Utils + +case class MessageAndMetadata[K, V](topic: String, partition: Int, + private val rawMessage: Message, offset: Long, + keyDecoder: Decoder[K], valueDecoder: Decoder[V]) { + + /** + * Return the decoded message key and payload + */ + def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(rawMessage.key)) + + def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload)) +} diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index ef1de83..9347ea6 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -88,4 +88,40 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload)) assertEquals(unconsumed, receivedMessages) } + + @Test + def testConsumerIteratorDecodingFailure() { + val messageStrings = (0 until 10).map(_.toString).toList + val messages = messageStrings.map(s => new Message(s.getBytes)) + val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages:_*) + + topicInfos(0).enqueue(messageSet) + assertEquals(1, queue.size) + + val iter = new ConsumerIterator[String, String](queue, + ConsumerConfig.ConsumerTimeoutMs, + new FailDecoder(), + new FailDecoder(), + clientId = "") + + val receivedMessages = (0 until 5).map{ i => + assertTrue(iter.hasNext) + val message = iter.next + assertEquals(message.offset, i + consumedOffset) + + try { + message.message // should fail + } + catch { + case e: UnsupportedOperationException => // this is ok + case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage) + } + } + } + + class FailDecoder(props: VerifiableProperties = null) extends Decoder[String] { + def fromBytes(bytes: Array[Byte]): String = { + throw new UnsupportedOperationException("This decoder does not work at all..") + } + } }