Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1226599) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -19,31 +19,35 @@ import kafka.utils.{IteratorTemplate, Logging} import java.util.concurrent.{TimeUnit, BlockingQueue} -import kafka.cluster.Partition -import kafka.message.{MessageAndOffset, MessageSet, Message} +import kafka.message.MessageAndOffset import kafka.serializer.Decoder +import java.util.concurrent.locks.{Lock, ReentrantLock} +import java.util.concurrent.atomic.AtomicReference /** * An iterator that blocks until a value can be read from the supplied queue. * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown - * + * */ class ConsumerIterator[T](private val topic: String, private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val decoder: Decoder[T]) - extends IteratorTemplate[T] with Logging { - - private var current: Iterator[MessageAndOffset] = null - private var currentDataChunk: FetchedDataChunk = null - private var currentTopicInfo: PartitionTopicInfo = null + extends IteratorTemplate[T] with Logging { + + // the following variables are also updated outside of the code block protected by clearCurrentChunkLock. So they + // need to be atomic references for correct visibility across threads + private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) + private var currentDataChunk: AtomicReference[FetchedDataChunk] = new AtomicReference(null) + private var currentTopicInfo: AtomicReference[PartitionTopicInfo] = new AtomicReference(null) private var consumedOffset: Long = -1L + private val clearCurrentChunkLock: Lock = new ReentrantLock() override def next(): T = { val decodedMessage = super.next() if(consumedOffset < 0) throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset)) - currentTopicInfo.resetConsumeOffset(consumedOffset) + currentTopicInfo.get().resetConsumeOffset(consumedOffset) trace("Setting consumed offset to %d".format(consumedOffset)) ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) decodedMessage @@ -51,7 +55,8 @@ protected def makeNext(): T = { // if we don't have an iterator, get one - if(current == null || !current.hasNext) { + var localCurrent = current.get() + if(localCurrent == null || !localCurrent.hasNext) { if (consumerTimeoutMs < 0) currentDataChunk = channel.take else { @@ -71,14 +76,39 @@ .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo)) currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset) } - current = currentDataChunk.messages.iterator + localCurrent = currentDataChunk.messages.iterator + current.set(localCurrent) } } - val item = current.next() + val item = localCurrent.next() consumedOffset = item.offset decoder.toEvent(item.message) } - + + def clearCurrentChunk(commitOffsetsFn: () => Unit) = { + try { + debug("Waiting to acquire lock to clear the current data chunk") + clearCurrentChunkLock.lock() + + // here, we need to commit offsets before stopping the consumer from returning any more messages + // from the current data chunk. Since partition ownership is not yet released, this commit offsets + // call will ensure that the offsets committed now will be used by the next consumer thread owning the partition + // for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated + // by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes + // successfully and the fetchers restart to fetch more data chunks + info("Commit offsets is in progress") + if(currentTopicInfo.get() != null) + commitOffsetsFn() + else + error("Trying to clear the currently iterated data chunk, when one does not exist. Cannot commit offsets") + + info("Commit offsets is complete. Clearing the current data chunk for this consumer iterator") + current.set(null) + } + finally { + clearCurrentChunkLock.unlock() + } + } } class ConsumerTimeoutException() extends RuntimeException()