Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/FetcherTest.scala (revision 1215522) +++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala (working copy) @@ -55,7 +55,8 @@ override def setUp() { super.setUp fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null) - fetcher.initConnections(topicInfos, cluster, Set(queue)) + fetcher.stopConnectionsToAllBrokers + fetcher.startConnections(topicInfos, cluster, null) } override def tearDown() { Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1215522) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -48,7 +48,8 @@ if(messagesPerTopic.size > 0) { val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray syncProducer.multiSend(requests) - trace("kafka producer sent messages for topics " + messagesPerTopic) + trace("kafka producer sent messages for topics %s to broker %s:%d" + .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port)) } } Index: core/src/main/scala/kafka/consumer/KafkaMessageStream.scala =================================================================== --- core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (revision 1215522) +++ core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (working copy) @@ -18,11 +18,10 @@ package kafka.consumer import java.util.concurrent.BlockingQueue -import kafka.message.Message -import kafka.serializer.{DefaultDecoder, Decoder} +import kafka.serializer.Decoder /** - * All calls to elements should produce the same thread-safe iterator? Should have a seperate thread + * All calls to elements should produce the same thread-safe iterator? Should have a separate thread * that feeds messages into a blocking queue for processing. */ class KafkaMessageStream[T](val topic: String, @@ -38,4 +37,13 @@ * Create an iterator over messages in the stream. */ def iterator(): ConsumerIterator[T] = iter + + /** + * This method clears the queue being iterated during the consumer rebalancing. This is mainly + * to reduce the number of duplicates received by the consumer + */ + def clear(commitOffsetsFn: () => Unit) { + iter.clearCurrentChunk(commitOffsetsFn) + } + } Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1215522) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -18,11 +18,8 @@ package kafka.consumer import java.net._ -import java.nio._ import java.nio.channels._ -import java.util.concurrent.atomic._ import kafka.api._ -import kafka.common._ import kafka.message._ import kafka.network._ import kafka.utils._ Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1215522) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -19,66 +19,129 @@ import kafka.utils.{IteratorTemplate, Logging} import java.util.concurrent.{TimeUnit, BlockingQueue} -import kafka.cluster.Partition -import kafka.message.{MessageAndOffset, MessageSet, Message} +import kafka.message.MessageAndOffset import kafka.serializer.Decoder +import java.util.concurrent.locks.{Lock, ReentrantLock} +import java.util.concurrent.atomic.AtomicReference /** * An iterator that blocks until a value can be read from the supplied queue. * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown - * + * */ class ConsumerIterator[T](private val topic: String, private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val decoder: Decoder[T]) - extends IteratorTemplate[T] with Logging { - - private var current: Iterator[MessageAndOffset] = null - private var currentDataChunk: FetchedDataChunk = null - private var currentTopicInfo: PartitionTopicInfo = null + extends IteratorTemplate[T] with Logging { + + // the following variables are also updated outside of the code block protected by clearCurrentChunkLock. So they + // need to be atomic references for correct visibility across threads + private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) + private var currentDataChunk: AtomicReference[FetchedDataChunk] = new AtomicReference(null) + private var currentTopicInfo: AtomicReference[PartitionTopicInfo] = new AtomicReference(null) private var consumedOffset: Long = -1L + private val clearCurrentChunkLock: Lock = new ReentrantLock() override def next(): T = { val decodedMessage = super.next() if(consumedOffset < 0) throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset)) - currentTopicInfo.resetConsumeOffset(consumedOffset) + currentTopicInfo.get().resetConsumeOffset(consumedOffset) trace("Setting consumed offset to %d".format(consumedOffset)) ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) decodedMessage } protected def makeNext(): T = { - // if we don't have an iterator, get one - if(current == null || !current.hasNext) { - if (consumerTimeoutMs < 0) - currentDataChunk = channel.take - else { - currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) - if (currentDataChunk == null) { - throw new ConsumerTimeoutException + debug("Waiting to acquire lock to consume next message") + try { + clearCurrentChunkLock.lock() + while(current.get() == null || !current.get().hasNext) { + val shutdown = getNextDataChunk() + if(shutdown) { + return allDone } + if(current.get() == null) + info("Data chunk cleared by rebalancing operation. Fetching the next data chunk again.") + // at this point, the current iterator could have been cleared by the last rebalancing operation. + // re-fetch the next data chunk } - if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { - debug("Received the shutdown command") - channel.offer(currentDataChunk) - return allDone - } else { - currentTopicInfo = currentDataChunk.topicInfo - if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) { - error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" - .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo)) - currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset) + + val item = current.get().next() + consumedOffset = item.offset + trace("Returning message " + item.message.checksum + " from consumer iterator") + decoder.toEvent(item.message) + }finally { + clearCurrentChunkLock.unlock() + } + } + + private def getNextDataChunk(): Boolean = { + var shutdown = false + try { + // following could be a blocking operation. So release the lock + clearCurrentChunkLock.unlock() + // if we don't have an iterator, get one + if(current.get() == null || !current.get().hasNext) { + currentDataChunk.set(null) + if (consumerTimeoutMs < 0) { + trace("Fetching the next data chunk for " + topic) + currentDataChunk.set(channel.take) + debug("Fetched the next data chunk for " + topic) } - current = currentDataChunk.messages.iterator + else { + currentDataChunk.set(channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)) + if (currentDataChunk.get() == null) { + throw new ConsumerTimeoutException + } + } + if(currentDataChunk.get() eq ZookeeperConsumerConnector.shutdownCommand) { + debug("Received the shutdown command") + channel.offer(currentDataChunk.get()) + shutdown = true + } else { + currentTopicInfo.set(currentDataChunk.get().topicInfo) + if (currentTopicInfo.get().getConsumeOffset != currentDataChunk.get().fetchOffset) { + error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" + .format(currentTopicInfo.get().getConsumeOffset, currentDataChunk.get().fetchOffset, currentTopicInfo.get())) + currentTopicInfo.get().resetConsumeOffset(currentDataChunk.get().fetchOffset) + } + current.set(currentDataChunk.get().messages.iterator) + } } + }catch { + case e => throw(e) + }finally { + clearCurrentChunkLock.lock() } - val item = current.next() - consumedOffset = item.offset - decoder.toEvent(item.message) + shutdown } - + + def clearCurrentChunk(commitOffsetsFn: () => Unit) = { + try { + debug("Waiting to acquire lock to clear the current data chunk") + clearCurrentChunkLock.lock() + + // here, we need to commit offsets before stopping the consumer from returning any more messages + // from the current data chunk. Since partition ownership is not yet released, this commit offsets + // call will ensure that the offsets committed now will be used by the next consumer thread owning the partition + // for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated + // by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes + // successfully and the fetchers restart to fetch more data chunks + info("Commit offsets is in progress") + if(currentTopicInfo.get() != null) + commitOffsetsFn() + else + error("Trying to clear the currently iterated data chunk, when one does not exist. Cannot commit offsets") + + info("Commit offsets is complete. Clearing the current data chunk for this consumer iterator") + current.set(null) + } + finally { + clearCurrentChunkLock.unlock() + } + } } class ConsumerTimeoutException() extends RuntimeException() Index: core/src/main/scala/kafka/consumer/FetcherRunnable.scala =================================================================== --- core/src/main/scala/kafka/consumer/FetcherRunnable.scala (revision 1215522) +++ core/src/main/scala/kafka/consumer/FetcherRunnable.scala (working copy) @@ -18,10 +18,9 @@ package kafka.consumer import java.util.concurrent.CountDownLatch -import java.nio.channels.{ClosedChannelException, ClosedByInterruptException} -import kafka.common.{OffsetOutOfRangeException, ErrorMapping} +import kafka.common.ErrorMapping import kafka.cluster.{Partition, Broker} -import kafka.api.{MultiFetchResponse, OffsetRequest, FetchRequest} +import kafka.api.{OffsetRequest, FetchRequest} import org.I0Itec.zkclient.ZkClient import kafka.utils._ import java.io.IOException Index: core/src/main/scala/kafka/consumer/Fetcher.scala =================================================================== --- core/src/main/scala/kafka/consumer/Fetcher.scala (revision 1215522) +++ core/src/main/scala/kafka/consumer/Fetcher.scala (working copy) @@ -21,11 +21,12 @@ import kafka.cluster._ import org.I0Itec.zkclient.ZkClient import java.util.concurrent.BlockingQueue +import kafka.utils._ /** * The fetcher is a background thread that fetches data from a set of servers */ -private[consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkClient) { +private [consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkClient) extends Logging { private val EMPTY_FETCHER_THREADS = new Array[FetcherRunnable](0) @volatile private var fetcherThreads : Array[FetcherRunnable] = EMPTY_FETCHER_THREADS @@ -40,18 +41,30 @@ fetcherThreads = EMPTY_FETCHER_THREADS } - /** - * Open connections. - */ - def initConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, - queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) { - shutdown + def stopConnectionsToAllBrokers = shutdown() + def clearFetcherQueues[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, + queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]], + kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]], + commitOffsetsFn: () => Unit) { + + // Clear all but the currently iterated upon chunk in the consumer thread's queue + queuesTobeCleared.foreach(_.clear) + info("Cleared all relevant queues for this fetcher") + + // Also clear the currently iterated upon chunk in the consumer threads + if(kafkaMessageStreams != null) + kafkaMessageStreams.foreach(_._2.foreach(s => s.clear(commitOffsetsFn))) + + info("Cleared the data chunks in all the consumer message iterators") + + } + + def startConnections[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, + kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) { if (topicInfos == null) return - queuesTobeCleared.foreach(_.clear) - // re-arrange by broker id val m = new mutable.HashMap[Int, List[PartitionTopicInfo]] for(info <- topicInfos) { Index: core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala =================================================================== --- core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala (revision 1215522) +++ core/src/main/scala/kafka/consumer/storage/OffsetStorage.scala (working copy) @@ -17,7 +17,6 @@ package kafka.consumer.storage -import kafka.utils.Range /** * A method for storing offsets for the consumer. Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 1215522) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (working copy) @@ -17,7 +17,6 @@ package kafka.consumer -import java.nio.channels._ import java.util.concurrent._ import java.util.concurrent.atomic._ import kafka.message._ Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1215522) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -93,6 +93,7 @@ // queues : (topic,consumerThreadId) -> queue private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false) + connectZk() createFetcher() if (config.autoCommit) { @@ -128,7 +129,7 @@ case Some(f) => f.shutdown() case None => } - sendShudownToAllQueues() + sendShutdownToAllQueues() if (config.autoCommit) commitOffsets() if (zkClient != null) { @@ -168,12 +169,12 @@ val topicCount = new TopicCount(consumerIdString, topicCountMap) // listener to consumer and partition changes - val loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString) + val loadBalancerListener = new ZKRebalancerListener[T](config.groupId, consumerIdString) registerConsumerInZK(dirs, consumerIdString, topicCount) // register listener for session expired event zkClient.subscribeStateChanges( - new ZKSessionExpireListenner(dirs, consumerIdString, topicCount, loadBalancerListener)) + new ZKSessionExpireListener[T](dirs, consumerIdString, topicCount, loadBalancerListener)) zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) @@ -194,6 +195,8 @@ zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) } + loadBalancerListener.setKafkaMessageStreams(ret) + // explicitly trigger load balancing for this consumer loadBalancerListener.syncedRebalance() ret @@ -205,7 +208,7 @@ info("end registering consumer " + consumerIdString + " in ZK") } - private def sendShudownToAllQueues() = { + private def sendShutdownToAllQueues() = { for (queue <- queues.values) { debug("Clearing up queue") queue.clear() @@ -227,8 +230,11 @@ } def commitOffsets() { - if (zkClient == null) + if (zkClient == null) { + error("zk client is null. Cannot commit offsets") return + } + info("Committing all offsets") for ((topic, infos) <- topicRegistry) { val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) for (info <- infos.values) { @@ -322,10 +328,10 @@ producedOffset } - class ZKSessionExpireListenner(val dirs: ZKGroupDirs, + class ZKSessionExpireListener[T](val dirs: ZKGroupDirs, val consumerIdString: String, val topicCount: TopicCount, - val loadBalancerListener: ZKRebalancerListener) + val loadBalancerListener: ZKRebalancerListener[T]) extends IZkStateListener { @throws(classOf[Exception]) def handleStateChanged(state: KeeperState) { @@ -358,18 +364,25 @@ } - class ZKRebalancerListener(val group: String, val consumerIdString: String) + class ZKRebalancerListener[T](val group: String, val consumerIdString: String) extends IZkChildListener { private val dirs = new ZKGroupDirs(group) private var oldPartitionsPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() + private var kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]] = new mutable.HashMap[String,List[KafkaMessageStream[T]]] @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { syncedRebalance } - private def releasePartitionOwnership() { + def setKafkaMessageStreams(messageStreams: Map[String,List[KafkaMessageStream[T]]]) { + kafkaMessageStreams = messageStreams + } + + def getKafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]] = kafkaMessageStreams + + private def releasePartitionOwnership()= { for ((topic, infos) <- topicRegistry) { val topicDirs = new ZKGroupTopicDirs(group, topic) for(partition <- infos.keys) { @@ -427,7 +440,7 @@ info("begin rebalancing consumer " + consumerIdString + " try #" + i) var done = false try { - done = rebalance() + done = rebalance(getKafkaMessageStreams) } catch { case e => @@ -441,7 +454,10 @@ return // release all partitions, reset state and retry releasePartitionOwnership() - resetState() + // rebalancing with large number of topics is actually slow and involves lot of computation + // In such cases, to avoid a lot of churn, it might be worthwhile to increase the time between + // two rebalancing attempts. + // TODO: Probably the time between 2 rebalancing attempts should be configurable and not set to zk sync time Thread.sleep(config.zkSyncTimeMs) } } @@ -449,7 +465,7 @@ throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries") } - private def rebalance(): Boolean = { + private def rebalance(kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]): Boolean = { val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic val cluster = ZkUtils.getCluster(zkClient) val consumersPerTopicMap = getConsumersPerTopic(group) @@ -460,13 +476,15 @@ return true } - info("Committing all offsets") - commitOffsets + // fetchers must be stopped to avoid data duplication, since if the current + // rebalancing attempt fails, the partitions that are released could be owned by another consumer. + // But if we don't stop the fetchers first, this consumer would continue returning data for released + // partitions in parallel. So, not stopping the fetchers leads to duplicate data. + closeFetchers(cluster, kafkaMessageStreams) info("Releasing partition ownership") releasePartitionOwnership() - val queuesToBeCleared = new mutable.HashSet[BlockingQueue[FetchedDataChunk]] for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) { topicRegistry.remove(topic) topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo]) @@ -503,17 +521,27 @@ else return false } - queuesToBeCleared += queues.get((topic, consumerThreadId)) } } } - updateFetcher(cluster, queuesToBeCleared) + updateFetcher(cluster, kafkaMessageStreams) oldPartitionsPerTopicMap = partitionsPerTopicMap oldConsumersPerTopicMap = consumersPerTopicMap true } - private def updateFetcher(cluster: Cluster, queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) { + private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) { + val queuesTobeCleared = queues.map(q => q._2) + var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten + fetcher match { + case Some(f) => f.stopConnectionsToAllBrokers + f.clearFetcherQueues(allPartitionInfos, cluster, queuesTobeCleared, kafkaMessageStreams, commitOffsets) + case None => + } + } + + private def updateFetcher[T](cluster: Cluster, + kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) { // update partitions for fetcher var allPartitionInfos : List[PartitionTopicInfo] = Nil for (partitionInfos <- topicRegistry.values) @@ -523,7 +551,8 @@ allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(",")) fetcher match { - case Some(f) => f.initConnections(allPartitionInfos, cluster, queuesTobeCleared) + case Some(f) => + f.startConnections(allPartitionInfos, cluster, kafkaMessageStreams) case None => } }