From c294d2e7243fee62741ec697adf819617f7c2cd7 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 17 Mar 2015 16:00:42 -0700 Subject: [PATCH 1/4] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- .../scala/kafka/server/AbstractFetcherThread.scala | 131 +++++++++++++-------- .../scala/kafka/server/ReplicaFetcherThread.scala | 2 +- 2 files changed, 80 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 5d5cf58..f305418 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -23,6 +23,7 @@ import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} import kafka.utils.Utils.inLock +import kafka.utils.DelayedItem import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup @@ -37,10 +38,10 @@ import com.yammer.metrics.core.Gauge * Abstract class for fetching data from multiple partitions from the same broker. */ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0, + fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1, fetchBackOffMs: Int, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { - private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map + private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) @@ -77,11 +78,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def doWork() { inLock(partitionMapLock) { if (partitionMap.isEmpty) - partitionMapCond.await(200L, TimeUnit.MILLISECONDS) + partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) partitionMap.foreach { - case((topicAndPartition, offset)) => - fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, - offset, fetchSize) + case((topicAndPartition, partitionFetchState)) => + if(partitionFetchState.isActive) + fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, + partitionFetchState.offset, fetchSize) } } @@ -115,53 +117,53 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple - val currentOffset = partitionMap.get(topicAndPartition) - // we append to the log if the current offset is defined and it is the same as the offset requested during fetch - if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { - partitionData.error match { - case ErrorMapping.NoError => - try { - val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - val validBytes = messages.validBytes - val newOffset = messages.shallowIterator.toSeq.lastOption match { - case Some(m: MessageAndOffset) => m.nextOffset - case None => currentOffset.get + partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState => + // we append to the log if the current offset is defined and it is the same as the offset requested during fetch + if (fetchRequest.requestInfo(topicAndPartition).offset == currentPartitionFetchState.offset) { + partitionData.error match { + case ErrorMapping.NoError => + try { + val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + val validBytes = messages.validBytes + val newOffset = messages.shallowIterator.toSeq.lastOption match { + case Some(m: MessageAndOffset) => m.nextOffset + case None => currentPartitionFetchState.offset + } + partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset + fetcherStats.byteRate.mark(validBytes) + // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread + processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData) + } catch { + case ime: InvalidMessageException => + // we log the error and continue. This ensures two things + // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag + // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and + // should get fixed in the subsequent fetches + logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage) + case e: Throwable => + throw new KafkaException("error processing data for partition [%s,%d] offset %d" + .format(topic, partitionId, currentPartitionFetchState.offset), e) } - partitionMap.put(topicAndPartition, newOffset) - fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset - fetcherStats.byteRate.mark(validBytes) - // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread - processPartitionData(topicAndPartition, currentOffset.get, partitionData) - } catch { - case ime: InvalidMessageException => - // we log the error and continue. This ensures two things - // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag - // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and - // should get fixed in the subsequent fetches - logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) - case e: Throwable => - throw new KafkaException("error processing data for partition [%s,%d] offset %d" - .format(topic, partitionId, currentOffset.get), e) - } - case ErrorMapping.OffsetOutOfRangeCode => - try { - val newOffset = handleOffsetOutOfRange(topicAndPartition) - partitionMap.put(topicAndPartition, newOffset) - error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) - } catch { - case e: Throwable => - error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) + case ErrorMapping.OffsetOutOfRangeCode => + try { + val newOffset = handleOffsetOutOfRange(topicAndPartition) + partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" + .format(currentPartitionFetchState.offset, topic, partitionId, newOffset)) + } catch { + case e: Throwable => + error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) + partitionsWithError += topicAndPartition + } + case _ => + if (isRunning.get) { + error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, + ErrorMapping.exceptionFor(partitionData.error).getClass)) partitionsWithError += topicAndPartition - } - case _ => - if (isRunning.get) { - error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, - ErrorMapping.exceptionFor(partitionData.error).getClass)) - partitionsWithError += topicAndPartition - } - } - } + } + } + }) } } } @@ -180,7 +182,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (!partitionMap.contains(topicAndPartition)) partitionMap.put( topicAndPartition, - if (PartitionTopicInfo.isOffsetInvalid(offset)) handleOffsetOutOfRange(topicAndPartition) else offset) + if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(topicAndPartition)) + else new PartitionFetchState(offset) + )} + partitionMapCond.signalAll() + } finally { + partitionMapLock.unlock() + } + } + + def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) { + partitionMapLock.lockInterruptibly() + try { + for (partition <- partitions) { + partitionMap.get(partition).foreach (currentPartitionFetchState => + if(currentPartitionFetchState.isActive) + partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay))) + ) } partitionMapCond.signalAll() } finally { @@ -247,3 +265,12 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) { override def toString = "%s-%s-%d".format(clientId, topic, partitionId) } + +case class PartitionFetchState(offset: Long, delay: DelayedItem) { + + def this(offset: Long) = this(offset, new DelayedItem(0)) + + def isActive: Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 } + + override def toString = "%d-%b".format(offset, isActive) +} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 96faa7b..daf21d9 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -120,6 +120,6 @@ class ReplicaFetcherThread(name:String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - // no handler needed since the controller will make the changes accordingly + delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs) } } -- 1.9.5 (Apple Git-50.3) From bebbf231d420f3360116b55b8c3aa8eb4e874a8a Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 27 Mar 2015 15:18:53 -0700 Subject: [PATCH 2/4] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index f305418..de1e915 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -46,6 +46,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port) + private var allPartitionsInactive: Boolean = true val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) val fetchRequestBuilder = new FetchRequestBuilder(). @@ -77,16 +78,20 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def doWork() { inLock(partitionMapLock) { - if (partitionMap.isEmpty) - partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + allPartitionsInactive = true partitionMap.foreach { case((topicAndPartition, partitionFetchState)) => - if(partitionFetchState.isActive) + if(partitionFetchState.isActive) { + allPartitionsInactive = false fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset, fetchSize) + } } } + if (partitionMap.isEmpty || allPartitionsInactive) + partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + val fetchRequest = fetchRequestBuilder.build() if (!fetchRequest.requestInfo.isEmpty) processFetchRequest(fetchRequest) -- 1.9.5 (Apple Git-50.3) From ee001a6454c83e5dd8f76c26676c96d8ba8849c8 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 27 Mar 2015 17:01:09 -0700 Subject: [PATCH 3/4] KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index de1e915..a2008ff 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -46,7 +46,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port) - private var allPartitionsInactive: Boolean = true val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) val fetchRequestBuilder = new FetchRequestBuilder(). @@ -77,8 +76,9 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { + // check if all topics are inactive + var allPartitionsInactive = true inLock(partitionMapLock) { - allPartitionsInactive = true partitionMap.foreach { case((topicAndPartition, partitionFetchState)) => if(partitionFetchState.isActive) { @@ -89,8 +89,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } - if (partitionMap.isEmpty || allPartitionsInactive) + if (partitionMap.isEmpty || allPartitionsInactive) { + trace("Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + } val fetchRequest = fetchRequestBuilder.build() if (!fetchRequest.requestInfo.isEmpty) @@ -271,6 +273,9 @@ case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: override def toString = "%s-%s-%d".format(clientId, topic, partitionId) } +/** + * case class to keep partition offset and its state(active , inactive) + */ case class PartitionFetchState(offset: Long, delay: DelayedItem) { def this(offset: Long) = this(offset, new DelayedItem(0)) -- 1.9.5 (Apple Git-50.3) From 7f03299b7533a235a9e8e015c0a2c5889f504a45 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 3 Apr 2015 21:09:55 -0700 Subject: [PATCH 4/4] KAFKA-2082. Kafka Replication ends up in a bad state. --- .../kafka/consumer/ConsumerFetcherManager.scala | 109 +++------------------ .../kafka/consumer/ConsumerFetcherThread.scala | 14 ++- .../kafka/server/AbstractFetcherManager.scala | 109 ++++++++++++++++++++- .../scala/kafka/server/AbstractFetcherThread.scala | 51 ++++++---- .../scala/kafka/server/ReplicaFetcherManager.scala | 14 +-- .../scala/kafka/server/ReplicaFetcherThread.scala | 16 ++- .../main/scala/kafka/server/ReplicaManager.scala | 3 +- 7 files changed, 185 insertions(+), 131 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b9e2bea..fb607bb 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -29,8 +29,9 @@ import kafka.utils.Utils.inLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} import kafka.common.TopicAndPartition -import kafka.client.ClientUtils -import java.util.concurrent.atomic.AtomicInteger +import kafka.server.PartitionFetchState + + /** * Usage: @@ -41,78 +42,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, private val config: ConsumerConfig, private val zkClient : ZkClient) extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), - config.clientId, config.numConsumerFetchers) { + config.clientId, config.numConsumerFetchers, config.socketTimeoutMs, config.refreshLeaderBackoffMs, zkClient) { private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null private var cluster: Cluster = null - private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] - private val lock = new ReentrantLock - private val cond = lock.newCondition() - private var leaderFinderThread: ShutdownableThread = null - private val correlationId = new AtomicInteger(0) - - private class LeaderFinderThread(name: String) extends ShutdownableThread(name) { - // thread responsible for adding the fetcher to the right broker when leader is available - override def doWork() { - val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] - lock.lock() - try { - while (noLeaderPartitionSet.isEmpty) { - trace("No partition for leader election.") - cond.await() - } - - trace("Partitions without leader %s".format(noLeaderPartitionSet)) - val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, - brokers, - config.clientId, - config.socketTimeoutMs, - correlationId.getAndIncrement).topicsMetadata - if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) - topicsMetadata.foreach { tmd => - val topic = tmd.topic - tmd.partitionsMetadata.foreach { pmd => - val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) - if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { - val leaderBroker = pmd.leader.get - leaderForPartitionsMap.put(topicAndPartition, leaderBroker) - noLeaderPartitionSet -= topicAndPartition - } - } - } - } catch { - case t: Throwable => { - if (!isRunning.get()) - throw t /* If this thread is stopped, propagate this exception to kill the thread. */ - else - warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) - } - } finally { - lock.unlock() - } - - try { - addFetcherForPartitions(leaderForPartitionsMap.map{ - case (topicAndPartition, broker) => - topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())} - ) - } catch { - case t: Throwable => { - if (!isRunning.get()) - throw t /* If this thread is stopped, propagate this exception to kill the thread. */ - else { - warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t) - lock.lock() - noLeaderPartitionSet ++= leaderForPartitionsMap.keySet - lock.unlock() - } - } - } - - shutdownIdleFetcherThreads() - Thread.sleep(config.refreshLeaderBackoffMs) - } - } + private val partitionMapLock = new ReentrantLock + private val partitionMapCond = partitionMapLock.newCondition() override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new ConsumerFetcherThread( @@ -121,14 +55,12 @@ class ConsumerFetcherManager(private val consumerIdString: String, } def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) { - leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") - leaderFinderThread.start() - - inLock(lock) { + createLeaderFinderThread() + inLock(partitionMapLock) { partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap this.cluster = cluster - noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) - cond.signalAll() + addPartitionsWithNoLeader(topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), new PartitionFetchState(tpi.getFetchOffset()))).toMap) + partitionMapCond.signalAll() } } @@ -138,29 +70,12 @@ class ConsumerFetcherManager(private val consumerIdString: String, * leader, then the leader finder thread will process these partitions (before shutting down) and add fetchers for * these partitions. */ - info("Stopping leader finder thread") - if (leaderFinderThread != null) { - leaderFinderThread.shutdown() - leaderFinderThread = null - } - + stopLeaderFinderThread() info("Stopping all fetchers") closeAllFetchers() - // no need to hold the lock for the following since leaderFindThread and all fetchers have been stopped partitionMap = null - noLeaderPartitionSet.clear() - info("All connections stopped") } - def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) { - debug("adding partitions with error %s".format(partitionList)) - inLock(lock) { - if (partitionMap != null) { - noLeaderPartitionSet ++= partitionList - cond.signalAll() - } - } - } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 152fda5..b3fc3d1 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -22,7 +22,8 @@ import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData} import kafka.common.TopicAndPartition - +import kafka.server.PartitionFetchState +import scala.collection.Map class ConsumerFetcherThread(name: String, val config: ConsumerConfig, @@ -65,9 +66,14 @@ class ConsumerFetcherThread(name: String, newOffset } + // any logic for partitions with errors + def handlePartitionsWithErrors(partitionAndFetchState: Map[TopicAndPartition, PartitionFetchState]) { + handlePartitionsWithNoLeader(partitionAndFetchState) + } + // any logic for partitions whose leader has changed - def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - removePartitions(partitions.toSet) - consumerFetcherManager.addPartitionsWithError(partitions) + def handlePartitionsWithNoLeader(partitionAndFetchState: Map[TopicAndPartition, PartitionFetchState]) { + removePartitions(partitionAndFetchState.keys.toSet) + consumerFetcherManager.addPartitionsWithNoLeader(partitionAndFetchState) } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 20c00cb..e4ab36b 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -24,13 +24,29 @@ import kafka.utils.{Utils, Logging} import kafka.cluster.Broker import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition +import kafka.utils.{ShutdownableThread, SystemTime} +import kafka.utils.ZkUtils._ +import kafka.utils.Utils.inLock +import kafka.client.ClientUtils +import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge -abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1) +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.AtomicInteger + + +abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1, + socketTimeoutMs: Int, refreshLeaderBackoffMs: Int, zkClient:ZkClient) extends Logging with KafkaMetricsGroup { // map of (source broker_id, fetcher_id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] private val mapLock = new Object + private val noLeaderPartitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] + private val noLeaderPartitionMapLock = new ReentrantLock + private val noLeaderPartitionMapCond = noLeaderPartitionMapLock.newCondition() + private val correlationId = new AtomicInteger(0) + private var leaderFinderThread: ShutdownableThread = null + this.logIdent = "[" + name + "] " newGauge( @@ -124,8 +140,97 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri fetcherThreadMap.clear() } } + + def stopLeaderFinderThread() { + info("Stopping leader finder thread") + if (leaderFinderThread != null) { + leaderFinderThread.shutdown() + leaderFinderThread = null + } + noLeaderPartitionMap.clear() + } + + def createLeaderFinderThread() { + leaderFinderThread = new LeaderFinderThread(clientId + "-leader-finder-thread") + leaderFinderThread.start() + } + + def addPartitionsWithNoLeader(partitionAndFetchState: Map[TopicAndPartition, PartitionFetchState]) { + info("adding partitions with no leader %s".format(partitionAndFetchState.keySet)) + inLock(noLeaderPartitionMapLock) { + noLeaderPartitionMap ++= partitionAndFetchState + noLeaderPartitionMapCond.signalAll() + } + } + + private class LeaderFinderThread(name: String) extends ShutdownableThread(name) { + // thread responsible for adding the fetcher to the right broker when leader is available + override def doWork() { + val leaderForPartitionsMap = new mutable.HashMap[TopicAndPartition, BrokerAndInitialOffset] + noLeaderPartitionMapLock.lock() + try { + while (noLeaderPartitionMap.isEmpty) { + trace("No partition for leader election.") + noLeaderPartitionMapCond.await() + } + + info("Partitions without leader %s".format(noLeaderPartitionMap.keys.toSet)) + val brokers = getAllBrokersInCluster(zkClient) + val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionMap.keys.map(m => m.topic).toSet, + brokers, + clientId, + socketTimeoutMs, + correlationId.getAndIncrement).topicsMetadata + if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) + topicsMetadata.foreach { tmd => + val topic = tmd.topic + tmd.partitionsMetadata.foreach { pmd => + val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) + if(pmd.leader.isDefined && noLeaderPartitionMap.contains(topicAndPartition)) { + val leaderBroker = pmd.leader.get + val currentPartitionFetchState = noLeaderPartitionMap.get(topicAndPartition).get + leaderForPartitionsMap.put(topicAndPartition, BrokerAndInitialOffset(leaderBroker, currentPartitionFetchState.offset)) + noLeaderPartitionMap.remove(topicAndPartition) + } + } + } + } catch { + case t: Throwable => { + if (!isRunning.get()) + throw t /* If this thread is stopped, propagate this exception to kill the thread. */ + else + warn("Failed to find leader for %s".format(noLeaderPartitionMap.keySet), t) + } + } finally { + noLeaderPartitionMapLock.unlock() + } + + try { + info("adding fetchers for partitions %s"+leaderForPartitionsMap.keySet) + addFetcherForPartitions(leaderForPartitionsMap) + } catch { + case t: Throwable => { + if (!isRunning.get()) + throw t /* If this thread is stopped, propagate this exception to kill the thread. */ + else { + warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t) + noLeaderPartitionMapLock.lock() + for ((topicAndPartition, brokerAndInitOffset) <- leaderForPartitionsMap) { + noLeaderPartitionMap.put(topicAndPartition, new PartitionFetchState(brokerAndInitOffset.initOffset)) + } + noLeaderPartitionMapLock.unlock() + } + } + } + + shutdownIdleFetcherThreads() + Thread.sleep(refreshLeaderBackoffMs) + } + } } + + case class BrokerAndFetcherId(broker: Broker, fetcherId: Int) -case class BrokerAndInitialOffset(broker: Broker, initOffset: Long) \ No newline at end of file +case class BrokerAndInitialOffset(broker: Broker, initOffset: Long) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index a2008ff..3ffb2ed 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -63,8 +63,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long - // deal with partitions with errors, potentially due to leadership changes - def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) + // deal with partitions with errors + def handlePartitionsWithErrors(partitions: Map[TopicAndPartition, PartitionFetchState]) + + // deal with partitions with leadership changes + def handlePartitionsWithNoLeader(partitions: Map[TopicAndPartition, PartitionFetchState]) override def shutdown(){ initiateShutdown() @@ -87,12 +90,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke partitionFetchState.offset, fetchSize) } } + if (partitionMap.isEmpty || allPartitionsInactive) { + trace("Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) + partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + } } - if (partitionMap.isEmpty || allPartitionsInactive) { - trace("Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) - partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) - } val fetchRequest = fetchRequestBuilder.build() if (!fetchRequest.requestInfo.isEmpty) @@ -100,7 +103,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } private def processFetchRequest(fetchRequest: FetchRequest) { - val partitionsWithError = new mutable.HashSet[TopicAndPartition] + val partitionsWithError = new mutable.HashMap[TopicAndPartition, PartitionFetchState] + val partitionsWithNoLeader = new mutable.HashMap[TopicAndPartition, PartitionFetchState] var response: FetchResponse = null try { trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) @@ -110,7 +114,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (isRunning.get) { warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) inLock(partitionMapLock) { - partitionsWithError ++= partitionMap.keys + partitionsWithError ++= partitionMap // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } @@ -161,13 +165,19 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } catch { case e: Throwable => error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) - partitionsWithError += topicAndPartition + partitionsWithError.put(topicAndPartition, partitionMap.get(topicAndPartition).get) + } + case ErrorMapping.NotLeaderForPartitionCode => + if (isRunning.get) { + error("Not Leader for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, + ErrorMapping.exceptionFor(partitionData.error).getClass)) + partitionsWithNoLeader.put(topicAndPartition, partitionMap.get(topicAndPartition).get) } case _ => if (isRunning.get) { error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, ErrorMapping.exceptionFor(partitionData.error).getClass)) - partitionsWithError += topicAndPartition + partitionsWithError.put(topicAndPartition, partitionMap.get(topicAndPartition).get) } } }) @@ -175,10 +185,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } - if(partitionsWithError.size > 0) { - debug("handling partitions with error for %s".format(partitionsWithError)) + if (!partitionsWithNoLeader.isEmpty) { + info("handling partitions with no leader for %s".format(partitionsWithNoLeader.keys.toSet)) + handlePartitionsWithNoLeader(partitionsWithNoLeader) + } + + if(!partitionsWithError.isEmpty) { + info("handling partitions with error for %s".format(partitionsWithError.keys.toSet)) handlePartitionsWithErrors(partitionsWithError) } + } def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) { @@ -198,14 +214,13 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } - def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) { + def delayPartitions(partitionAndFetchState: Map[TopicAndPartition, PartitionFetchState], delay: Long) { partitionMapLock.lockInterruptibly() try { - for (partition <- partitions) { - partitionMap.get(partition).foreach (currentPartitionFetchState => - if(currentPartitionFetchState.isActive) - partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay))) - ) + partitionAndFetchState.foreach { + case((topicAndPartition, partitionFetchState)) => + if(partitionFetchState.isActive) + partitionMap.put(topicAndPartition, new PartitionFetchState(partitionFetchState.offset, new DelayedItem(delay))) } partitionMapCond.signalAll() } finally { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 351dbba..a5a9f89 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -17,19 +17,21 @@ package kafka.server +import org.I0Itec.zkclient.ZkClient import kafka.cluster.Broker -class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) - extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, - "Replica", brokerConfig.numReplicaFetchers) { +class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val zkClient: ZkClient, private val replicaMgr: ReplicaManager) + extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, "Replica", + brokerConfig.numReplicaFetchers, brokerConfig.replicaSocketTimeoutMs, 200, zkClient) { override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { - new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr) + new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr, this) } def shutdown() { info("shutting down") + stopLeaderFinderThread() closeAllFetchers() info("shutdown completed") - } -} \ No newline at end of file + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index daf21d9..e5e93b8 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -23,11 +23,13 @@ import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} +import scala.collection.{mutable, Set, Map} class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, - replicaMgr: ReplicaManager) + replicaMgr: ReplicaManager, + replicaFetcherMgr: ReplicaFetcherManager) extends AbstractFetcherThread(name = name, clientId = name, sourceBroker = sourceBroker, @@ -118,8 +120,16 @@ class ReplicaFetcherThread(name:String, } } + // any logic for partitions with errors + def handlePartitionsWithErrors(partitionAndFetchState: Map[TopicAndPartition, PartitionFetchState]) { + delayPartitions(partitionAndFetchState, brokerConfig.replicaFetchBackoffMs) + } + // any logic for partitions whose leader has changed - def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs) + def handlePartitionsWithNoLeader(partitionAndFetchState: Map[TopicAndPartition, PartitionFetchState]) { + info("in ReplicaFetcherThread handlePartitionsWithNoLeader") + removePartitions(partitionAndFetchState.keys.toSet) + replicaFetcherMgr.addPartitionsWithNoLeader(partitionAndFetchState) } + } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6e43622..167db55 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -99,7 +99,8 @@ class ReplicaManager(val config: KafkaConfig, private val localBrokerId = config.brokerId private val allPartitions = new Pool[(String, Int), Partition] private val replicaStateChangeLock = new Object - val replicaFetcherManager = new ReplicaFetcherManager(config, this) + val replicaFetcherManager = new ReplicaFetcherManager(config, zkClient, this) + replicaFetcherManager.createLeaderFinderThread() private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap private var hwThreadInitialized = false -- 1.9.5 (Apple Git-50.3)