diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index ac491b4..a4227a4 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -101,7 +101,10 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk item.message.ensureValid() // validate checksum of message to ensure it is valid - new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) + val keyBuffer = item.message.key + val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer)) + val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload)) + new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset) } def clearCurrentChunk() { diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala index d693abc..20c0e70 100644 --- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala +++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala @@ -17,18 +17,5 @@ package kafka.message -import kafka.serializer.Decoder -import kafka.utils.Utils - -case class MessageAndMetadata[K, V](topic: String, partition: Int, - private val rawMessage: Message, offset: Long, - keyDecoder: Decoder[K], valueDecoder: Decoder[V]) { - - /** - * Return the decoded message key and payload - */ - def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(rawMessage.key)) - - def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload)) -} +case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int, offset: Long) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 161f581..e459f0a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -351,11 +351,16 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) if (!isShuttingDown.get()) { - replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) => - new TopicAndPartition(partition) -> - BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get, - partition.getReplica().get.logEndOffset)} - ) + val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() + partitionState.foreach{ + case (partition, partitionStateInfo) => + if (partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader != -1) ( + partitionAndOffsets(new TopicAndPartition(partition)) = + BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get, + partition.getReplica().get.logEndOffset) + ) + } + replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) } else { stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 9347ea6..ef1de83 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -88,40 +88,4 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload)) assertEquals(unconsumed, receivedMessages) } - - @Test - def testConsumerIteratorDecodingFailure() { - val messageStrings = (0 until 10).map(_.toString).toList - val messages = messageStrings.map(s => new Message(s.getBytes)) - val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages:_*) - - topicInfos(0).enqueue(messageSet) - assertEquals(1, queue.size) - - val iter = new ConsumerIterator[String, String](queue, - ConsumerConfig.ConsumerTimeoutMs, - new FailDecoder(), - new FailDecoder(), - clientId = "") - - val receivedMessages = (0 until 5).map{ i => - assertTrue(iter.hasNext) - val message = iter.next - assertEquals(message.offset, i + consumedOffset) - - try { - message.message // should fail - } - catch { - case e: UnsupportedOperationException => // this is ok - case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage) - } - } - } - - class FailDecoder(props: VerifiableProperties = null) extends Decoder[String] { - def fromBytes(bytes: Array[Byte]): String = { - throw new UnsupportedOperationException("This decoder does not work at all..") - } - } }