Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/FetcherTest.scala (revision 1358897) +++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala (working copy) @@ -51,18 +51,18 @@ new AtomicLong(0), new AtomicInteger(0))) - var fetcher: Fetcher = null + var fetcher: ConsumerFetcherManager = null override def setUp() { super.setUp CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString) - fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null) - fetcher.stopConnectionsToAllBrokers + fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient) + fetcher.stopAllConnections() fetcher.startConnections(topicInfos, cluster) } override def tearDown() { - fetcher.stopConnectionsToAllBrokers + fetcher.shutdown() super.tearDown } @@ -103,5 +103,5 @@ return } } - + } Index: core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (revision 1358897) +++ core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (working copy) @@ -42,10 +42,8 @@ def testMessageSizeTooLarge() { val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) - val fetcherLogger = Logger.getLogger(classOf[kafka.consumer.FetcherRunnable]) requestHandlerLogger.setLevel(Level.FATAL) - fetcherLogger.setLevel(Level.FATAL) // send some messages val producerData = new ProducerData[String, Message](topic, topic, List(new Message("hello".getBytes()))) @@ -100,6 +98,5 @@ zkConsumerConnector1.shutdown requestHandlerLogger.setLevel(Level.ERROR) - fetcherLogger.setLevel(Level.ERROR) } } Index: core/src/main/scala/kafka/consumer/FetcherRunnable.scala =================================================================== --- core/src/main/scala/kafka/consumer/FetcherRunnable.scala (revision 1358897) +++ core/src/main/scala/kafka/consumer/FetcherRunnable.scala (working copy) @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer - -import java.io.IOException -import java.nio.channels.ClosedByInterruptException -import java.util.concurrent.CountDownLatch -import kafka.api.{FetchRequestBuilder, OffsetRequest} -import kafka.cluster.Broker -import kafka.common.ErrorMapping -import kafka.message.ByteBufferMessageSet -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient - -class FetcherRunnable(val name: String, - val zkClient : ZkClient, - val config: ConsumerConfig, - val broker: Broker, - val partitionTopicInfos: List[PartitionTopicInfo]) - extends Thread(name) with Logging { - private val shutdownLatch = new CountDownLatch(1) - private val simpleConsumer = new SimpleConsumer(broker.host, broker.port, config.maxFetchWaitMs + config.socketTimeoutMs, - config.socketBufferSize) - @volatile - private var stopped = false - - def shutdown(): Unit = { - stopped = true - interrupt - debug("awaiting shutdown on fetcher " + name) - shutdownLatch.await - debug("shutdown of fetcher " + name + " thread complete") - } - - override def run() { - for (infopti <- partitionTopicInfos) - info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partitionId + " offset: " - + infopti.getFetchOffset + " from " + broker.host + ":" + broker.port) - - var reqId = 0 - try { - while (!stopped) { - val builder = new FetchRequestBuilder(). - correlationId(reqId). - clientId(config.consumerId.getOrElse(name)). - maxWait(config.maxFetchWaitMs). - minBytes(config.minFetchBytes) - partitionTopicInfos.foreach(pti => - builder.addFetch(pti.topic, pti.partitionId, pti.getFetchOffset(), config.fetchSize) - ) - - val fetchRequest = builder.build() - val start = System.currentTimeMillis - val response = simpleConsumer.fetch(fetchRequest) - trace("Fetch request %s completed in %d ms with max wait of %d".format(fetchRequest, - (System.currentTimeMillis - start), config.maxFetchWaitMs)) - - var read = 0L - for(infopti <- partitionTopicInfos) { - val messages = response.messageSet(infopti.topic, infopti.partitionId).asInstanceOf[ByteBufferMessageSet] - try { - var done = false - if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) { - info("Offset for " + infopti + " out of range") - // see if we can fix this error - val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partitionId) - if(resetOffset >= 0) { - infopti.resetFetchOffset(resetOffset) - infopti.resetConsumeOffset(resetOffset) - done = true - } - } - if (!done) - read += infopti.enqueue(messages, infopti.getFetchOffset) - } catch { - case e: IOException => - // something is wrong with the socket, re-throw the exception to stop the fetcher - throw e - case e => - if (!stopped) { - // this is likely a repeatable error, log it and trigger an exception in the consumer - error("Error in fetcher for " + infopti, e) - infopti.enqueueError(e, infopti.getFetchOffset) - } - // re-throw the exception to stop the fetcher - throw e - } - } - reqId = if(reqId == Int.MaxValue) 0 else reqId + 1 - - trace("fetched bytes: " + read) - } - } catch { - case e: ClosedByInterruptException => - // we interrupted ourselves, close quietly - debug("Fetch request interrupted, exiting...") - case e => - if(stopped) - info("Fetcher stopped...") - else - error("Error in fetcher ", e) - } - - info("Stopping fetcher " + name + " to host " + broker.host) - Utils.swallow(logger.info, simpleConsumer.close) - shutdownComplete() - } - - /** - * Record that the thread shutdown is complete - */ - private def shutdownComplete() = shutdownLatch.countDown - - private def resetConsumerOffsets(topic : String, - partitionId: Int) : Long = { - var offset : Long = 0 - config.autoOffsetReset match { - case OffsetRequest.SmallestTimeString => offset = OffsetRequest.EarliestTime - case OffsetRequest.LargestTimeString => offset = OffsetRequest.LatestTime - case _ => return -1 - } - - // get mentioned offset from the broker - val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, offset, 1) - val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - - // reset manually in zookeeper - info("Updating partition " + partitionId + " for topic " + topic + " with " + - (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0)) - ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partitionId, offsets(0).toString) - - offsets(0) - } -} Index: core/src/main/scala/kafka/consumer/Fetcher.scala =================================================================== --- core/src/main/scala/kafka/consumer/Fetcher.scala (revision 1358897) +++ core/src/main/scala/kafka/consumer/Fetcher.scala (working copy) @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer - -import scala.collection._ -import kafka.cluster._ -import org.I0Itec.zkclient.ZkClient -import java.util.concurrent.BlockingQueue -import kafka.utils._ -import java.lang.IllegalStateException - -/** - * The fetcher is a background thread that fetches data from a set of servers - */ -private [consumer] class Fetcher(val config: ConsumerConfig, val zkClient : ZkClient) extends Logging { - private val EMPTY_FETCHER_THREADS = new Array[FetcherRunnable](0) - @volatile - private var fetcherThreads : Array[FetcherRunnable] = EMPTY_FETCHER_THREADS - - /** - * shutdown all fetcher threads - */ - def stopConnectionsToAllBrokers = { - // shutdown the old fetcher threads, if any - for (fetcherThread <- fetcherThreads) - fetcherThread.shutdown - fetcherThreads = EMPTY_FETCHER_THREADS - } - - def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, - queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]], - messageStreams: Map[String,List[KafkaStream[_]]]) { - - // Clear all but the currently iterated upon chunk in the consumer thread's queue - queuesTobeCleared.foreach(_.clear) - info("Cleared all relevant queues for this fetcher") - - // Also clear the currently iterated upon chunk in the consumer threads - if(messageStreams != null) - messageStreams.foreach(_._2.foreach(s => s.clear())) - - info("Cleared the data chunks in all the consumer message iterators") - - } - - def startConnections(topicInfos: Iterable[PartitionTopicInfo], - cluster: Cluster) { - if (topicInfos == null) - return - - // re-arrange by broker id - val m = new mutable.HashMap[Int, List[PartitionTopicInfo]] - for(info <- topicInfos) { - m.get(info.brokerId) match { - case None => m.put(info.brokerId, List(info)) - case Some(lst) => m.put(info.brokerId, info :: lst) - } - } - - // open a new fetcher thread for each broker - val ids = Set() ++ topicInfos.map(_.brokerId) - val brokers = ids.map { id => - cluster.getBroker(id) match { - case Some(broker) => broker - case None => throw new IllegalStateException("Broker " + id + " is unavailable, fetchers could not be started") - } - } - - fetcherThreads = new Array[FetcherRunnable](brokers.size) - var i = 0 - for(broker <- brokers) { - val fetcherThread = new FetcherRunnable("FetchRunnable-" + i, zkClient, config, broker, m.get(broker.id).get) - fetcherThreads(i) = fetcherThread - fetcherThread.start - i +=1 - } - } -} - - Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1358897) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -41,6 +41,7 @@ val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" + val NoneReplicaFetcherId = -1 } class ConsumerConfig(props: Properties) extends ZKConfig(props) { @@ -83,6 +84,9 @@ /** backoff time between retries during rebalance */ val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs) + /** backoff time to refresh the leader of a partition after it loses the current leader */ + val refreshLeaderBackoffMs = Utils.getInt(props, "refresh.leader.backoff.ms", 200) + /* what to do if an offset is out of range. smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset @@ -97,5 +101,11 @@ * overhead of decompression. * */ val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false) + + /** max wait time for each fetcher request */ + val fetcherMaxWaitTimeMs = Utils.getInt(props, "fetch.wait.time.ms", 100) + + /** minimum bytes expected for each fetch response. If not enough bytes, wait up to fetcherMaxWaitTimeMs */ + val fetcherMinBytes = Utils.getInt(props, "fetch.min.bytes", 4086) } Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 1358897) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (working copy) @@ -50,20 +50,20 @@ /** * Enqueue a message set for processing - * @return the number of valid bytes + * @return the next fetch offset */ - def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = { + def enqueue(messages: ByteBufferMessageSet): Long = { val size = messages.validBytes if(size > 0) { // update fetched offset to the compressed data chunk size, not the decompressed message set size trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size) - chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) + chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) val newOffset = fetchedOffset.addAndGet(size) debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size) ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size) } - size + fetchedOffset.get } /** Index: core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (revision 0) +++ core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (revision 0) @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import kafka.cluster.Broker +import kafka.server.AbstractFetcherThread +import kafka.api.{OffsetRequest, PartitionData} +import kafka.message.ByteBufferMessageSet + +class ConsumerFetcherThread(name: String, + val config: ConsumerConfig, + sourceBroker: Broker, + val consumerFetcherManager: ConsumerFetcherManager) + extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, + socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize, + fetcherBrokerId = ConsumerConfig.NoneReplicaFetcherId, maxWait = config.fetcherMaxWaitTimeMs, + minBytes = config.fetcherMinBytes) { + + // process fetched data and return the new fetch offset + def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) { + val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionData.partition)) + if (pti.getFetchOffset != fetchOffset) + throw new RuntimeException("offset doesn't match for topic %s partition: %d pti offset: %d fetch ofset: %d" + .format(topic, partitionData.partition, pti.getFetchOffset, fetchOffset)) + pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) + } + + // handle a partition whose offset is out of range and return a new fetch offset + def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = { + var startTimestamp : Long = 0 + config.autoOffsetReset match { + case OffsetRequest.SmallestTimeString => startTimestamp = OffsetRequest.EarliestTime + case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime + case _ => startTimestamp = OffsetRequest.LatestTime + } + val newOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, startTimestamp, 1)(0) + + val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionId)) + pti.resetFetchOffset(newOffset) + pti.resetConsumeOffset(newOffset) + return newOffset + } + + // any logic for partitions whose leader has changed + def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) { + consumerFetcherManager.addPartitionsWithError(partitions) + } +} Index: core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (revision 0) +++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (revision 0) @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import org.I0Itec.zkclient.ZkClient +import kafka.server.{AbstractFetcherThread, AbstractFetcherManager} +import kafka.cluster.{Cluster, Broker} +import scala.collection.immutable +import scala.collection.mutable +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.AtomicBoolean +import kafka.utils.ZkUtils._ +import kafka.utils.SystemTime +import java.util.concurrent.CountDownLatch + +/** + * Usage: + * Once ConsumerFetcherManager is created, startConnections() and stopAllConnections() can be called repeatedly + * until shutdown() is called. + */ +class ConsumerFetcherManager(private val consumerIdString: String, + private val config: ConsumerConfig, + private val zkClient : ZkClient) + extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) { + private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null + private var cluster: Cluster = null + private val noLeaderPartitionSet = new mutable.HashSet[(String, Int)] + private val lock = new ReentrantLock + private val cond = lock.newCondition() + private val isShuttingDown = new AtomicBoolean(false) + private val leaderFinderThreadShutdownLatch = new CountDownLatch(1) + private val leaderFinderThread = new Thread(consumerIdString + "_leader_finder_thread") { + // thread responsible for adding the fetcher to the right broker when leader is available + override def run() { + info("starting %s".format(getName)) + while (!isShuttingDown.get) { + try { + lock.lock() + try { + if (noLeaderPartitionSet.isEmpty) + cond.await() + for ((topic, partitionId) <- noLeaderPartitionSet) { + // find the leader for this partition + getLeaderForPartition(zkClient, topic, partitionId) match { + case Some(leaderId) => + cluster.getBroker(leaderId) match { + case Some(broker) => + val pti = partitionMap((topic, partitionId)) + addFetcher(topic, partitionId, pti.getFetchOffset(), broker) + noLeaderPartitionSet.remove((topic, partitionId)) + case None => + error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started" + .format(leaderId, topic, partitionId)) + } + case None => // let it go since we will keep retrying + } + } + } finally { + lock.unlock() + } + Thread.sleep(config.refreshLeaderBackoffMs) + } catch { + case t => + if (!isShuttingDown.get()) + error("error in %s".format(getName), t) + } + } + leaderFinderThreadShutdownLatch.countDown() + info("stopping %s".format(getName)) + } + } + leaderFinderThread.start() + + + override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { + new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), + config, sourceBroker, this) + } + + def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) { + if (isShuttingDown.get) + throw new RuntimeException("%s already shutdown".format(name)) + lock.lock() + try { + partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap + this.cluster = cluster + noLeaderPartitionSet ++= topicInfos.map(tpi => (tpi.topic, tpi.partitionId)) + cond.signalAll() + } finally { + lock.unlock() + } + } + + def stopAllConnections() { + lock.lock() + try { + partitionMap = null + noLeaderPartitionSet.clear() + } finally { + lock.unlock() + } + closeAllFetchers() + } + + def getPartitionTopicInfo(key: (String, Int)) : PartitionTopicInfo = { + var pti :PartitionTopicInfo =null + lock.lock() + try { + pti = partitionMap(key) + } finally { + lock.unlock() + } + pti + } + + def addPartitionsWithError(partitionList: Iterable[(String, Int)]) { + debug("adding partitions with error %s".format(partitionList)) + lock.lock() + try { + if (partitionMap != null) { + noLeaderPartitionSet ++= partitionList + cond.signalAll() + } + } finally { + lock.unlock() + } + } + + def shutdown() { + info("shutting down") + isShuttingDown.set(true) + leaderFinderThread.interrupt() + leaderFinderThreadShutdownLatch.await() + stopAllConnections() + info("shutdown completes") + } +} \ No newline at end of file Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1358897) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -91,7 +91,7 @@ with Logging { private val isShuttingDown = new AtomicBoolean(false) private val rebalanceLock = new Object - private var fetcher: Option[Fetcher] = None + private var fetcher: Option[ConsumerFetcherManager] = None private var zkClient: ZkClient = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue @@ -143,7 +143,7 @@ private def createFetcher() { if (enableFetcher) - fetcher = Some(new Fetcher(config, zkClient)) + fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient)) } private def connectZk() { @@ -161,7 +161,7 @@ try { scheduler.shutdownNow() fetcher match { - case Some(f) => f.stopConnectionsToAllBrokers + case Some(f) => f.shutdown case None => } sendShutdownToAllQueues() @@ -376,8 +376,6 @@ class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]]) extends IZkChildListener { - private var oldPartitionsPerTopicMap: mutable.Map[String, Seq[String]] = new mutable.HashMap[String, Seq[String]]() - private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() @@ -547,22 +545,39 @@ queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) { var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten fetcher match { - case Some(f) => f.stopConnectionsToAllBrokers - f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) - info("Committing all offsets after clearing the fetcher queues") - /** - * here, we need to commit offsets before stopping the consumer from returning any more messages - * from the current data chunk. Since partition ownership is not yet released, this commit offsets - * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition - * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated - * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes - * successfully and the fetchers restart to fetch more data chunks - **/ - commitOffsets + case Some(f) => + f.stopAllConnections + clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) + info("Committing all offsets after clearing the fetcher queues") + /** + * here, we need to commit offsets before stopping the consumer from returning any more messages + * from the current data chunk. Since partition ownership is not yet released, this commit offsets + * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition + * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated + * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes + * successfully and the fetchers restart to fetch more data chunks + **/ + commitOffsets case None => } } + private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, + queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]], + messageStreams: Map[String,List[KafkaStream[_]]]) { + + // Clear all but the currently iterated upon chunk in the consumer thread's queue + queuesTobeCleared.foreach(_.clear) + info("Cleared all relevant queues for this fetcher") + + // Also clear the currently iterated upon chunk in the consumer threads + if(messageStreams != null) + messageStreams.foreach(_._2.foreach(s => s.clear())) + + info("Cleared the data chunks in all the consumer message iterators") + + } + private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]], relevantTopicThreadIdsMap: Map[String, Set[String]]) { // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer Index: core/src/main/scala/kafka/server/AbstractFetcherManager.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcherManager.scala (revision 1358897) +++ core/src/main/scala/kafka/server/AbstractFetcherManager.scala (working copy) @@ -21,7 +21,7 @@ import kafka.utils.Logging import kafka.cluster.Broker -abstract class AbstractFetcherManager(name: String, numReplicaFetchers: Int = 1) extends Logging { +abstract class AbstractFetcherManager(protected val name: String, numReplicaFetchers: Int = 1) extends Logging { // map of (source brokerid, fetcher Id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread] private val mapLock = new Object @@ -73,13 +73,12 @@ None } - def shutdown() = { - info("shutting down") + def closeAllFetchers() { mapLock synchronized { for ( (_, fetcher) <- fetcherThreadMap) { fetcher.shutdown } + fetcherThreadMap.clear() } - info("shutdown completes") } } \ No newline at end of file Index: core/src/main/scala/kafka/server/KafkaConfig.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1358897) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (working copy) @@ -132,8 +132,10 @@ /** the number of byes of messages to attempt to fetch */ val replicaFetchSize = Utils.getInt(props, "replica.fetch.size", ConsumerConfig.FetchSize) + /** max wait time for each fetcher request issued by follower replicas*/ val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500) + /** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */ val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086) /* number of fetcher threads used to replicate messages from a source broker. Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (revision 1358897) +++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (working copy) @@ -28,7 +28,7 @@ minBytes = brokerConfig.replicaMinBytes) { // process fetched data and return the new fetch offset - def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) = { + def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) { val partitionId = partitionData.partition val replica = replicaMgr.getReplica(topic, partitionId).get val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] @@ -51,7 +51,7 @@ } // any logic for partitions whose leader has changed - def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit = { + def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) { // no handler needed since the controller will make the changes accordingly } } Index: core/src/main/scala/kafka/server/ReplicaFetcherManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (revision 1358897) +++ core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (working copy) @@ -22,8 +22,13 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) { - def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { + override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr) } + def shutdown() { + info("shutting down") + closeAllFetchers() + info("shutdown completes") + } } \ No newline at end of file Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala (revision 1358897) +++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala (working copy) @@ -23,9 +23,9 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.utils.Logging import kafka.common.ErrorMapping -import kafka.api.{PartitionData, FetchRequestBuilder} -import scala.collection.mutable +import collection.mutable import kafka.message.ByteBufferMessageSet +import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder} /** * Abstract class for fetching data from multiple partitions from the same broker. @@ -48,12 +48,12 @@ // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topic: String, partitionId: Int): Long - // any logic for partitions whose leader has changed - def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit + // deal with partitions with errors, potentially due to leadership changes + def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) override def run() { try { - while(isRunning.get()) { + while(isRunning.get) { val builder = new FetchRequestBuilder(). clientId(name). replicaId(fetcherBrokerId). @@ -66,47 +66,61 @@ } val fetchRequest = builder.build() - val response = simpleConsumer.fetch(fetchRequest) - trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) + val partitionsWithError = new mutable.HashSet[(String, Int)] + var response: FetchResponse = null + try { + trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) + response = simpleConsumer.fetch(fetchRequest) + } catch { + case t => + debug("error in fetch %s".format(fetchRequest), t) + if (isRunning.get) { + fetchMapLock synchronized { + partitionsWithError ++= fetchMap.keys + fetchMap.clear() + } + } + } - var partitionsWithNewLeader : List[Tuple2[String, Int]] = Nil - // process fetched data - fetchMapLock synchronized { - for ( topicData <- response.data ) { - for ( partitionData <- topicData.partitionDataArray) { - val topic = topicData.topic - val partitionId = partitionData.partition - val key = (topic, partitionId) - val currentOffset = fetchMap.get(key) - if (currentOffset.isDefined) { - partitionData.error match { - case ErrorMapping.NoError => - processPartitionData(topic, currentOffset.get, partitionData) - val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes - fetchMap.put(key, newOffset) - case ErrorMapping.OffsetOutOfRangeCode => - val newOffset = handleOffsetOutOfRange(topic, partitionId) - fetchMap.put(key, newOffset) - warn("current offset %d for topic %s partition %d out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) - case ErrorMapping.NotLeaderForPartitionCode => - partitionsWithNewLeader ::= key - case _ => - error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host), - ErrorMapping.exceptionFor(partitionData.error)) + if (response != null) { + // process fetched data + fetchMapLock synchronized { + for ( topicData <- response.data ) { + for ( partitionData <- topicData.partitionDataArray) { + val topic = topicData.topic + val partitionId = partitionData.partition + val key = (topic, partitionId) + val currentOffset = fetchMap.get(key) + if (currentOffset.isDefined) { + partitionData.error match { + case ErrorMapping.NoError => + processPartitionData(topic, currentOffset.get, partitionData) + val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes + fetchMap.put(key, newOffset) + case ErrorMapping.OffsetOutOfRangeCode => + val newOffset = handleOffsetOutOfRange(topic, partitionId) + fetchMap.put(key, newOffset) + warn("current offset %d for topic %s partition %d out of range; reset offset to %d" + .format(currentOffset.get, topic, partitionId, newOffset)) + case _ => + error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host), + ErrorMapping.exceptionFor(partitionData.error)) + partitionsWithError += key + fetchMap.remove(key) + } } } } } } - if (partitionsWithNewLeader.size > 0) { - debug("changing leaders for %s".format(partitionsWithNewLeader)) - handlePartitionsWithNewLeader(partitionsWithNewLeader) + if (partitionsWithError.size > 0) { + debug("handling partitions with error for %s".format(partitionsWithError)) + handlePartitionsWithErrors(partitionsWithError) } } } catch { - case e: InterruptedException => info("replica fetcher runnable interrupted. Shutting down") - case e1 => error("error in replica fetcher runnable", e1) + case e: InterruptedException => info("interrupted. Shutting down") + case e1 => error("error in fetching", e1) } shutdownComplete() }