diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index a4227a4..5c05e33 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -35,76 +35,90 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], val clientId: String) - extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { + extends Iterator[MessageAndMetadata[K, V]] with java.util.Iterator[MessageAndMetadata[K, V]] with Logging { - private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) + private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) - override def next(): MessageAndMetadata[K, V] = { - val item = super.next() - if(consumedOffset < 0) - throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) - currentTopicInfo.resetConsumeOffset(consumedOffset) - val topic = currentTopicInfo.topic - trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) - consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() - consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() - item - } + private val iter: Iterator[MessageAndOffset] = new IteratorTemplate[MessageAndOffset] { + def makeNext(): MessageAndOffset = { + var currentDataChunk: FetchedDataChunk = null + // if we don't have an iterator, get one + var localCurrent = current.get() + if(localCurrent == null || !localCurrent.hasNext) { + if (consumerTimeoutMs < 0) + currentDataChunk = channel.take + else { + currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) + if (currentDataChunk == null) { + // There is no more data in the chunk, reset state to make the iterator re-iterable + resetState() + throw new ConsumerTimeoutException + } + } + if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { + debug("Received the shutdown command") + channel.offer(currentDataChunk) + return allDone + } else { + currentTopicInfo = currentDataChunk.topicInfo + val cdcFetchOffset = currentDataChunk.fetchOffset + val ctiConsumeOffset = currentTopicInfo.getConsumeOffset + if (ctiConsumeOffset < cdcFetchOffset) { + error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" + .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) + currentTopicInfo.resetConsumeOffset(cdcFetchOffset) + } + localCurrent = currentDataChunk.messages.iterator - protected def makeNext(): MessageAndMetadata[K, V] = { - var currentDataChunk: FetchedDataChunk = null - // if we don't have an iterator, get one - var localCurrent = current.get() - if(localCurrent == null || !localCurrent.hasNext) { - if (consumerTimeoutMs < 0) - currentDataChunk = channel.take - else { - currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) - if (currentDataChunk == null) { - // reset state to make the iterator re-iterable - resetState() - throw new ConsumerTimeoutException + current.set(localCurrent) } + // if we just updated the current chunk and it is empty that means the fetch size is too small! + if(currentDataChunk.messages.validBytes == 0) + throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + + "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." + .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } - if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { - debug("Received the shutdown command") - channel.offer(currentDataChunk) - return allDone - } else { - currentTopicInfo = currentDataChunk.topicInfo - val cdcFetchOffset = currentDataChunk.fetchOffset - val ctiConsumeOffset = currentTopicInfo.getConsumeOffset - if (ctiConsumeOffset < cdcFetchOffset) { - error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" - .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) - currentTopicInfo.resetConsumeOffset(cdcFetchOffset) - } - localCurrent = currentDataChunk.messages.iterator + var item = localCurrent.next() - current.set(localCurrent) + // reject the messages that have already been consumed + while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { + item = localCurrent.next() } - // if we just updated the current chunk and it is empty that means the fetch size is too small! - if(currentDataChunk.messages.validBytes == 0) - throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + - "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." - .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) - } - var item = localCurrent.next() - // reject the messages that have already been consumed - while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { - item = localCurrent.next() + consumedOffset = item.nextOffset + + // validate checksum of message to ensure it is valid + item.message.ensureValid() + item } - consumedOffset = item.nextOffset - item.message.ensureValid() // validate checksum of message to ensure it is valid + } + + def hasNext(): Boolean = iter.hasNext + + // Currently the iterator template does not support remove, + // hence calling this function will throw an exception directly + def remove() = iter.asInstanceOf[IteratorTemplate[MessageAndOffset]].remove + + def next(): MessageAndMetadata[K, V] = { + val item = iter.next() + if(consumedOffset < 0) + throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) + // decode the message val keyBuffer = item.message.key val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer)) val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload)) - new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset) + + // update the consumer offsets and metrics + currentTopicInfo.resetConsumeOffset(consumedOffset) + val topic = currentTopicInfo.topic + trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) + consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() + consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() + MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset) } def clearCurrentChunk() {