diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5ccecd1..d8078bd 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -41,7 +41,6 @@ class Partition(val topic: String, val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup { private val localBrokerId = replicaManager.config.brokerId private val logManager = replicaManager.logManager - private val replicaFetcherManager = replicaManager.replicaFetcherManager private val zkClient = replicaManager.zkClient var leaderReplicaIdOpt: Option[Int] = None var inSyncReplicas: Set[Replica] = Set.empty[Replica] @@ -132,30 +131,23 @@ class Partition(val topic: String, assignedReplicaMap.values.toSet } + def getLeaderEpoch(): Int = { + leaderIsrUpdateLock synchronized { + return this.leaderEpoch + } + } + /** - * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps. - * 1. stop the existing replica fetcher - * 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available) - * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) - * 4. set the new leader and ISR + * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) + * and setting the new leader and ISR */ - def makeLeader(controllerId: Int, topic: String, partitionId: Int, - leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { + def makeLeader(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr - if (leaderEpoch >= leaderAndIsr.leaderEpoch){ - stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic, - partitionId, leaderEpoch, leaderAndIsr.leaderEpoch)) - return false - } // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - // stop replica fetcher thread, if any - replicaFetcherManager.removeFetcher(topic, partitionId) val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet // reset LogEndOffset for remote replicas @@ -171,52 +163,22 @@ class Partition(val topic: String, } /** - * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the follower in the following steps. - * 1. stop any existing fetcher on this partition from the local replica - * 2. make sure local replica exists and truncate the log to high watermark - * 3. set the leader and set ISR to empty - * 4. start a fetcher to the new leader + * Make the local replica the follower by setting the new leader and ISR to empty */ - def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - leaders: Set[Broker], correlationId: Int): Boolean = { + def makeFollower(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, leaders: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr - if (leaderEpoch >= leaderAndIsr.leaderEpoch) { - stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic, - partitionId, leaderEpoch, leaderAndIsr.leaderEpoch)) - return false - } - // record the epoch of the controller that made the leadership decision. This is useful while updating the isr - // to maintain the decision maker controller's epoch in the zookeeper path - controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - // make sure local replica exists. This reads the last check pointed high watermark from disk. On startup, it is - // important to ensure that this operation happens for every single partition in a leader and isr request, else - // some high watermark values could be overwritten with 0. This leads to replicas fetching from the earliest offset - // on the leader - val localReplica = getOrCreateReplica() val newLeaderBrokerId: Int = leaderAndIsr.leader // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 leaders.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => - // stop fetcher thread to previous leader - replicaFetcherManager.removeFetcher(topic, partitionId) - localReplica.log.get.truncateTo(localReplica.highWatermark) - logManager.checkpointRecoveryPointOffsets() + // record the epoch of the controller that made the leadership decision. This is useful while updating the isr + // to maintain the decision maker controller's epoch in the zookeeper path + controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion leaderReplicaIdOpt = Some(newLeaderBrokerId) - if (!replicaManager.isShuttingDown.get()) { - // start fetcher thread to current leader if we are not shutting down - replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) - } - else { - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + - "controller %d epoch %d since it is shutting down") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch)) - } case None => // we should not come here stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + "controller %d epoch %d for partition [%s,%d] new leader %d") diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 153bc0b..b0b5dce 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -42,6 +42,7 @@ object ErrorMapping { val MessageSizeTooLargeCode: Short = 10 val StaleControllerEpochCode: Short = 11 val OffsetMetadataTooLargeCode: Short = 12 + val StaleLeaderEpochCode: Short = 13 private val exceptionToCode = Map[Class[Throwable], Short]( diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala index 63596b7..df3db91 100644 --- a/core/src/main/scala/kafka/common/TopicAndPartition.scala +++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala @@ -1,5 +1,7 @@ package kafka.common +import kafka.cluster.{Replica, Partition} + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -24,6 +26,10 @@ case class TopicAndPartition(topic: String, partition: Int) { def this(tuple: (String, Int)) = this(tuple._1, tuple._2) + def this(partition: Partition) = this(partition.topic, partition.partitionId) + + def this(replica: Replica) = this(replica.topic, replica.partitionId) + def asTuple = (topic, partition) override def toString = "[%s,%d]".format(topic, partition) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 566ca46..e4451bb 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -18,9 +18,10 @@ package kafka.consumer import org.I0Itec.zkclient.ZkClient -import kafka.server.{AbstractFetcherThread, AbstractFetcherManager} +import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager} import kafka.cluster.{Cluster, Broker} import scala.collection.immutable +import scala.collection.Map import collection.mutable.HashMap import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock @@ -90,23 +91,22 @@ class ConsumerFetcherManager(private val consumerIdString: String, lock.unlock() } - leaderForPartitionsMap.foreach { - case(topicAndPartition, leaderBroker) => - val pti = partitionMap(topicAndPartition) - try { - addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) - } catch { - case t: Throwable => { - if (!isRunning.get()) - throw t /* If this thread is stopped, propagate this exception to kill the thread. */ - else { - warn("Failed to add leader for partition %s; will retry".format(topicAndPartition), t) - lock.lock() - noLeaderPartitionSet += topicAndPartition - lock.unlock() - } - } + try { + addFetcherForPartitions(leaderForPartitionsMap.map{ + case (topicAndPartition, broker) => + topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())} + ) + } catch { + case t: Throwable => { + if (!isRunning.get()) + throw t /* If this thread is stopped, propagate this exception to kill the thread. */ + else { + warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t) + lock.lock() + noLeaderPartitionSet ++= leaderForPartitionsMap.keySet + lock.unlock() } + } } shutdownIdleFetcherThreads() diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index dda0a8f..f8c1b4e 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -66,7 +66,7 @@ class ConsumerFetcherThread(name: String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - partitions.foreach(tap => removePartition(tap.topic, tap.partition)) + removePartitions(partitions.toSet) consumerFetcherManager.addPartitionsWithError(partitions) } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4719715..d489e08 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -177,6 +177,22 @@ class LogManager(val logDirs: Array[File], } debug("Shutdown complete.") } + + /** + * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset + * + * @param partitionAndOffsets Partition logs that need to be truncated + */ + def truncateTo(partitionAndOffsets: Map[TopicAndPartition, Long]) { + for ((topicAndPartition, truncateOffset) <- partitionAndOffsets) { + val log = logs.get(topicAndPartition) + // If the log does not exist, skip it + if (log != null) { + log.truncateTo(truncateOffset) + } + } + checkpointRecoveryPointOffsets() + } /** * Write out the current recovery point for all logs to a text file in the log directory diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 15b7bd3..394e981 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -18,14 +18,17 @@ package kafka.server import scala.collection.mutable +import scala.collection.Set +import scala.collection.Map import kafka.utils.Logging import kafka.cluster.Broker import kafka.metrics.KafkaMetricsGroup +import kafka.common.TopicAndPartition import com.yammer.metrics.core.Gauge abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1) extends Logging with KafkaMetricsGroup { - // map of (source brokerid, fetcher Id per source broker) => fetcher + // map of (source broker_id, fetcher_id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "] " @@ -60,36 +63,43 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: ) private def getFetcherId(topic: String, partitionId: Int) : Int = { - (topic.hashCode() + 31 * partitionId) % numFetchers + (31 * topic.hashCode() + partitionId) % numFetchers } // to be defined in subclass to create a specific fetcher def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread - def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) { + def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { - var fetcherThread: AbstractFetcherThread = null - val key = new BrokerAndFetcherId(sourceBroker, getFetcherId(topic, partitionId)) - fetcherThreadMap.get(key) match { - case Some(f) => fetcherThread = f - case None => - fetcherThread = createFetcherThread(key.fetcherId, sourceBroker) - fetcherThreadMap.put(key, fetcherThread) - fetcherThread.start + val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => + BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} + for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { + var fetcherThread: AbstractFetcherThread = null + fetcherThreadMap.get(brokerAndFetcherId) match { + case Some(f) => fetcherThread = f + case None => + fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) + fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) + fetcherThread.start + } + + fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => + topicAndPartition -> brokerAndInitOffset.initOffset + }) } - fetcherThread.addPartition(topic, partitionId, initialOffset) - info("Adding fetcher for partition [%s,%d], initOffset %d to broker %d with fetcherId %d" - .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId)) } + + info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) => + "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) } - def removeFetcher(topic: String, partitionId: Int) { - info("Removing fetcher for partition [%s,%d]".format(topic, partitionId)) + def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) { mapLock synchronized { for ((key, fetcher) <- fetcherThreadMap) { - fetcher.removePartition(topic, partitionId) + fetcher.removePartitions(partitions) } } + info("Removed fetcher for partitions %s".format(partitions.mkString(","))) } def shutdownIdleFetcherThreads() { @@ -115,4 +125,6 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: } } -case class BrokerAndFetcherId(broker: Broker, fetcherId: Int) \ No newline at end of file +case class BrokerAndFetcherId(broker: Broker, fetcherId: Int) + +case class BrokerAndInitialOffset(broker: Broker, initOffset: Long) \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index c64260f..bb2dd90 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -19,17 +19,19 @@ package kafka.server import kafka.cluster.Broker import collection.mutable +import scala.collection.Set +import scala.collection.Map import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import java.util.concurrent.atomic.AtomicLong import kafka.utils.{Pool, ShutdownableThread} -import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} import kafka.utils.Utils.inLock +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.AtomicLong /** @@ -166,23 +168,26 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } - def addPartition(topic: String, partitionId: Int, initialOffset: Long) { + def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) { partitionMapLock.lockInterruptibly() try { - val topicPartition = TopicAndPartition(topic, partitionId) - partitionMap.put( - topicPartition, - if (PartitionTopicInfo.isOffsetInvalid(initialOffset)) handleOffsetOutOfRange(topicPartition) else initialOffset) + for ((topicAndPartition, offset) <- partitionAndOffsets) { + // If the partitionMap already has the topic/partition, then do not update the map with the old offset + if (!partitionMap.contains(topicAndPartition)) + partitionMap.put( + topicAndPartition, + if (PartitionTopicInfo.isOffsetInvalid(offset)) handleOffsetOutOfRange(topicAndPartition) else offset) + } partitionMapCond.signalAll() } finally { partitionMapLock.unlock() } } - def removePartition(topic: String, partitionId: Int) { + def removePartitions(topicAndPartitions: Set[TopicAndPartition]) { partitionMapLock.lockInterruptibly() try { - partitionMap.remove(TopicAndPartition(topic, partitionId)) + topicAndPartitions.foreach(tp => partitionMap.remove(tp)) } finally { partitionMapLock.unlock() } @@ -199,7 +204,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup { - private[this] var lagVal = new AtomicLong(-1L) + private[this] val lagVal = new AtomicLong(-1L) newGauge( metricId + "-ConsumerLag", new Gauge[Long] { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ee1cc0c..7b8f89e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -28,8 +28,8 @@ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit import kafka.common._ -import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} -import kafka.controller.KafkaController +import kafka.api.{StopReplicaRequest, LeaderAndIsrRequest} +import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger @@ -50,6 +50,7 @@ class ReplicaManager(val config: KafkaConfig, private val allPartitions = new Pool[(String, Int), Partition] private var leaderPartitions = new mutable.HashSet[Partition]() private val leaderPartitionsLock = new Object + private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap @@ -116,7 +117,6 @@ class ReplicaManager(val config: KafkaConfig, val errorCode = ErrorMapping.NoError getReplica(topic, partitionId) match { case Some(replica) => - replicaFetcherManager.removeFetcher(topic, partitionId) /* TODO: handle deleteLog in a better way */ //if (deletePartition) // logManager.deleteLog(topic, partition) @@ -132,20 +132,26 @@ class ReplicaManager(val config: KafkaConfig, } def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = { - val responseMap = new collection.mutable.HashMap[(String, Int), Short] - if(stopReplicaRequest.controllerEpoch < controllerEpoch) { - stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d." - .format(localBrokerId, stopReplicaRequest.controllerEpoch) + - " Latest known controller epoch is %d " + controllerEpoch) - (responseMap, ErrorMapping.StaleControllerEpochCode) - } else { - controllerEpoch = stopReplicaRequest.controllerEpoch - val responseMap = new HashMap[(String, Int), Short] - for((topic, partitionId) <- stopReplicaRequest.partitions){ - val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions) - responseMap.put((topic, partitionId), errorCode) + replicaStateChangeLock synchronized { + val responseMap = new collection.mutable.HashMap[(String, Int), Short] + if(stopReplicaRequest.controllerEpoch < controllerEpoch) { + stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d." + .format(localBrokerId, stopReplicaRequest.controllerEpoch) + + " Latest known controller epoch is %d " + controllerEpoch) + (responseMap, ErrorMapping.StaleControllerEpochCode) + } else { + controllerEpoch = stopReplicaRequest.controllerEpoch + val responseMap = new HashMap[(String, Int), Short] + // First stop fetchers for all partitions, then stop the corresponding replicas + replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map{ + case (topic, partition) => TopicAndPartition(topic, partition) + }) + for((topic, partitionId) <- stopReplicaRequest.partitions){ + val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions) + responseMap.put((topic, partitionId), errorCode) + } + (responseMap, ErrorMapping.NoError) } - (responseMap, ErrorMapping.NoError) } } @@ -198,88 +204,176 @@ class ReplicaManager(val config: KafkaConfig, } def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { - leaderAndISRRequest.partitionStateInfos.foreach(p => + leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partition), stateInfo) => stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d received from controller %d epoch %d for partition [%s,%d]" .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, - leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2))) + leaderAndISRRequest.controllerEpoch, topic, partition))} info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest)) - val responseMap = new collection.mutable.HashMap[(String, Int), Short] - if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { - stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d. Latest known controller epoch is %d" - .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch)) - (responseMap, ErrorMapping.StaleControllerEpochCode) - }else { - val controllerId = leaderAndISRRequest.controllerId - controllerEpoch = leaderAndISRRequest.controllerEpoch - for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) { - var errorCode = ErrorMapping.NoError - val topic = topicAndPartition._1 - val partitionId = topicAndPartition._2 - - val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader - try { - if(requestedLeaderId == config.brokerId) - makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId) - else - makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders, - leaderAndISRRequest.correlationId) - } catch { - case e: Throwable => - val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + - "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, - leaderAndISRRequest.controllerEpoch, topicAndPartition) - stateChangeLogger.error(errorMsg, e) - errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + replicaStateChangeLock synchronized { + val responseMap = new collection.mutable.HashMap[(String, Int), Short] + if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { + stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d. Latest known controller epoch is %d" + .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch)) + (responseMap, ErrorMapping.StaleControllerEpochCode) + } else { + val controllerId = leaderAndISRRequest.controllerId + val correlationId = leaderAndISRRequest.correlationId + controllerEpoch = leaderAndISRRequest.controllerEpoch + + // First check partition's leader epoch + val partitionleaderIsrAndControllerEpoch = new HashMap[Partition, LeaderIsrAndControllerEpoch]() + leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => + val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) + val partitionLeaderEpoch = partition.getLeaderEpoch() + if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) { + // If the leader epoch is valid record the epoch of the controller that made the leadership decision. + // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path + partitionleaderIsrAndControllerEpoch.put(partition, partitionStateInfo.leaderIsrAndControllerEpoch) + } else { + // Otherwise record the error code in response + stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + + "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") + .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch, + partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition, partitionLeaderEpoch)) + responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) + } } - responseMap.put(topicAndPartition, errorCode) - stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, - topicAndPartition._1, topicAndPartition._2)) - } - info("Handled leader and isr request %s".format(leaderAndISRRequest)) - // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions - // have been completely populated before starting the checkpointing there by avoiding weird race conditions - if (!hwThreadInitialized) { - startHighWaterMarksCheckPointThread() - hwThreadInitialized = true + + val partitionsTobeLeader = partitionleaderIsrAndControllerEpoch + .filter{ case (partition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId} + val partitionsTobeFollower = (partitionleaderIsrAndControllerEpoch -- partitionsTobeLeader.keys) + + if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap) + if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) + + info("Handled leader and isr request %s".format(leaderAndISRRequest)) + // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions + // have been completely populated before starting the checkpointing there by avoiding weird race conditions + if (!hwThreadInitialized) { + startHighWaterMarksCheckPointThread() + hwThreadInitialized = true + } + replicaFetcherManager.shutdownIdleFetcherThreads() + (responseMap, ErrorMapping.NoError) } - replicaFetcherManager.shutdownIdleFetcherThreads() - (responseMap, ErrorMapping.NoError) } } - private def makeLeader(controllerId: Int, epoch:Int, topic: String, partitionId: Int, - partitionStateInfo: PartitionStateInfo, correlationId: Int) = { - val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch + /* + * Make the current broker to become follower for a given set of partitions by: + * + * 1. Stop fetchers for these partitions + * 2. Update the partition metadata in cache + * 3. Add these partitions to the leader partitions set + * + * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where + * the error message will be set on each partition since we do not know which partition caused it + * TODO: the above may need to be fixed later + */ + private def makeLeaders(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch], + correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-leader transition for partition [%s,%d]") - .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) - val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, correlationId)) { - // also add this partition to the list of partitions for which the leader is the current broker + "starting the become-leader transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, + partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) + + for (partition <- partitionLeaderISRAndControllerEpochs.keys) + responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) + + try { + // First stop fetchers for all the partitions + replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_))) + stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId)) + + // Update the partition information to be the leader + partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeLeader(controllerId, leaderIsrAndControllerEpoch, correlationId)} + + // Finally add these partitions to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { - leaderPartitions += partition - } + leaderPartitions ++= partitionLeaderISRAndControllerEpochs.keySet + } + } catch { + case e: Throwable => + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) + stateChangeLogger.error(errorMsg, e) + // Re-throw the exception for it to be caught in KafkaApis + throw e } - stateChangeLogger.trace("Broker %d completed become-leader transition for partition [%s,%d]".format(localBrokerId, topic, partitionId)) + + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-leader transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) } - private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int, - partitionStateInfo: PartitionStateInfo, leaders: Set[Broker], correlationId: Int) { - val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch + /* + * Make the current broker to become follower for a given set of partitions by: + * + * 1. Stop fetchers for these partitions + * 2. Truncate the log and checkpoint offsets for these partitions. + * 3. If the broker is not shutting down, add the fetcher to the new leaders + * 4. Update the partition metadata in cache + * 5. Remove these partitions from the leader partitions set + * + * The ordering of doing these steps make sure that the replicas in transition will not + * take any more messages before checkpointing offsets so that all messages before the checkpoint + * are guaranteed to be flushed to disks + * + * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where + * the error message will be set on each partition since we do not know which partition caused it + * TODO: the above may need to be fixed later + */ + private def makeFollowers(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch], + leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-follower transition for partition [%s,%d]") - .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) + "starting the become-follower transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, + partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) + + for (partition <- partitionLeaderISRAndControllerEpochs.keys) + responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) + + try { + replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_))) + stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId)) + + logManager.truncateTo(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) => + new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark + }) + stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId)) + + if (!isShuttingDown.get()) { + replicaFetcherManager.addFetcherForPartitions(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) => + new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == leaderISRAndControllerEpoch.leaderAndIsr.leader).get, partition.getReplica().get.logEndOffset)} + ) + } + else { + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d since it is shutting down") + .format(localBrokerId, correlationId, controllerId, epoch)) + } + + partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} - val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, leaders, correlationId)) { - // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { - leaderPartitions -= partition + leaderPartitions --= partitionLeaderISRAndControllerEpochs.keySet } + } catch { + case e: Throwable => + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) + stateChangeLogger.error(errorMsg, e) + // Re-throw the exception for it to be caught in KafkaApis + throw e } - stateChangeLogger.trace("Broker %d completed the become-follower transition for partition [%s,%d]".format(localBrokerId, topic, partitionId)) + + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-follower transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) } private def maybeShrinkIsr(): Unit = { @@ -307,7 +401,7 @@ class ReplicaManager(val config: KafkaConfig, val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) for((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap + val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap try { highWatermarkCheckpoints(dir).write(hwms) } catch {