diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index a4227a4..8bf1fee 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -35,76 +35,91 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], val clientId: String) - extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { + extends Iterator[MessageAndMetadata[K, V]] with java.util.Iterator[MessageAndMetadata[K, V]] with Logging { - private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) + private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) - override def next(): MessageAndMetadata[K, V] = { - val item = super.next() - if(consumedOffset < 0) - throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) - currentTopicInfo.resetConsumeOffset(consumedOffset) - val topic = currentTopicInfo.topic - trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) - consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() - consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() - item - } + private val iter: IteratorTemplate[MessageAndOffset] = new IteratorTemplate[MessageAndOffset] { + def makeNext(): MessageAndOffset = { + var currentDataChunk: FetchedDataChunk = null + // if we don't have an iterator, get one + var localCurrent = current.get() + if(localCurrent == null || !localCurrent.hasNext) { + if (consumerTimeoutMs < 0) + currentDataChunk = channel.take + else { + currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) + if (currentDataChunk == null) { + // There is no more data in the chunk, reset state to make the iterator re-iterable + resetState() + throw new ConsumerTimeoutException + } + } + if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { + debug("Received the shutdown command") + channel.offer(currentDataChunk) + return allDone + } else { + currentTopicInfo = currentDataChunk.topicInfo + val cdcFetchOffset = currentDataChunk.fetchOffset + val ctiConsumeOffset = currentTopicInfo.getConsumeOffset + if (ctiConsumeOffset < cdcFetchOffset) { + error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" + .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) + currentTopicInfo.resetConsumeOffset(cdcFetchOffset) + } + localCurrent = currentDataChunk.messages.iterator - protected def makeNext(): MessageAndMetadata[K, V] = { - var currentDataChunk: FetchedDataChunk = null - // if we don't have an iterator, get one - var localCurrent = current.get() - if(localCurrent == null || !localCurrent.hasNext) { - if (consumerTimeoutMs < 0) - currentDataChunk = channel.take - else { - currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) - if (currentDataChunk == null) { - // reset state to make the iterator re-iterable - resetState() - throw new ConsumerTimeoutException + current.set(localCurrent) } + // if we just updated the current chunk and it is empty that means the fetch size is too small! + if(currentDataChunk.messages.validBytes == 0) + throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + + "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." + .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } - if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { - debug("Received the shutdown command") - channel.offer(currentDataChunk) - return allDone - } else { - currentTopicInfo = currentDataChunk.topicInfo - val cdcFetchOffset = currentDataChunk.fetchOffset - val ctiConsumeOffset = currentTopicInfo.getConsumeOffset - if (ctiConsumeOffset < cdcFetchOffset) { - error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" - .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) - currentTopicInfo.resetConsumeOffset(cdcFetchOffset) - } - localCurrent = currentDataChunk.messages.iterator + var item = localCurrent.next() - current.set(localCurrent) + // reject the messages that have already been consumed + while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { + item = localCurrent.next() } - // if we just updated the current chunk and it is empty that means the fetch size is too small! - if(currentDataChunk.messages.validBytes == 0) - throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + - "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." - .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) - } - var item = localCurrent.next() - // reject the messages that have already been consumed - while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { - item = localCurrent.next() + consumedOffset = item.nextOffset + + // validate checksum of message to ensure it is valid + item.message.ensureValid() + item } - consumedOffset = item.nextOffset - item.message.ensureValid() // validate checksum of message to ensure it is valid + } + + def hasNext(): Boolean = iter.hasNext + + // Currently the iterator template does not support remove, + // hence calling this function will throw an exception directly + def remove() = iter.remove + + def next(): MessageAndMetadata[K, V] = { + val item = iter.next() + if(consumedOffset < 0) + throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) + // decode the message val keyBuffer = item.message.key val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer)) val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload)) - new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset) + + // update the consumer offsets and metrics + currentTopicInfo.resetConsumeOffset(consumedOffset) + val topic = currentTopicInfo.topic + val partitionId = currentTopicInfo.partitionId + trace("Setting [%s,%d] consumed offset to %d".format(topic, partitionId, consumedOffset)) + consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() + consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() + MessageAndMetadata(key, value, topic, partitionId, item.offset) } def clearCurrentChunk() { diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index ef1de83..d69ec8d 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -88,4 +88,40 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload)) assertEquals(unconsumed, receivedMessages) } + + @Test + def testConsumerIteratorDecodingFailure() { + val messageStrings = (0 until 10).map(_.toString).toList + val messages = messageStrings.map(s => new Message(s.getBytes)) + val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages:_*) + + topicInfos(0).enqueue(messageSet) + assertEquals(1, queue.size) + + val iter = new ConsumerIterator[String, String](queue, + consumerConfig.consumerTimeoutMs, + new FailDecoder(), + new FailDecoder(), + clientId = "") + + var needBreak = false + while (iter.hasNext() && !needBreak) { + try { + iter.next + } + catch { + case e: UnsupportedOperationException => needBreak = true // this is ok + case e2: Throwable => throw e2 + } + } + + // the offset should not have changed + assertEquals(5, consumedOffset) + } + + class FailDecoder(props: VerifiableProperties = null) extends Decoder[String] { + def fromBytes(bytes: Array[Byte]): String = { + throw new UnsupportedOperationException("This decoder does not work at all..") + } + } } diff --git a/project/Build.scala b/project/Build.scala index 098e874..500040b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -71,9 +71,7 @@ object KafkaBuild extends Build { - , - mappings in packageBin in Compile += file("LICENSE") -> "LICENSE", - mappings in packageBin in Compile += file("NOTICE") -> "NOTICE" + ) val hadoopSettings = Seq(