diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5ccecd1..d3212bd 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -132,30 +132,23 @@ class Partition(val topic: String, assignedReplicaMap.values.toSet } + def getLeaderEpoch(): Int = { + leaderIsrUpdateLock synchronized { + return this.leaderEpoch + } + } + /** - * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps. - * 1. stop the existing replica fetcher - * 2. create replicas in ISR if needed (the ISR expand/shrink logic needs replicas in ISR to be available) - * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) - * 4. set the new leader and ISR + * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) + * and setting the new leader and ISR */ - def makeLeader(controllerId: Int, topic: String, partitionId: Int, - leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { + def makeLeader(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr - if (leaderEpoch >= leaderAndIsr.leaderEpoch){ - stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic, - partitionId, leaderEpoch, leaderAndIsr.leaderEpoch)) - return false - } // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - // stop replica fetcher thread, if any - replicaFetcherManager.removeFetcher(topic, partitionId) val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet // reset LogEndOffset for remote replicas @@ -171,52 +164,23 @@ class Partition(val topic: String, } /** - * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the follower in the following steps. - * 1. stop any existing fetcher on this partition from the local replica - * 2. make sure local replica exists and truncate the log to high watermark - * 3. set the leader and set ISR to empty - * 4. start a fetcher to the new leader + * Make the local replica the follower by setting the new leader and ISR to empty */ - def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, + def makeFollower(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, leaders: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr - if (leaderEpoch >= leaderAndIsr.leaderEpoch) { - stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic, - partitionId, leaderEpoch, leaderAndIsr.leaderEpoch)) - return false - } - // record the epoch of the controller that made the leadership decision. This is useful while updating the isr - // to maintain the decision maker controller's epoch in the zookeeper path - controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - // make sure local replica exists. This reads the last check pointed high watermark from disk. On startup, it is - // important to ensure that this operation happens for every single partition in a leader and isr request, else - // some high watermark values could be overwritten with 0. This leads to replicas fetching from the earliest offset - // on the leader - val localReplica = getOrCreateReplica() val newLeaderBrokerId: Int = leaderAndIsr.leader // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 leaders.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => - // stop fetcher thread to previous leader - replicaFetcherManager.removeFetcher(topic, partitionId) - localReplica.log.get.truncateTo(localReplica.highWatermark) - logManager.checkpointRecoveryPointOffsets() + // record the epoch of the controller that made the leadership decision. This is useful while updating the isr + // to maintain the decision maker controller's epoch in the zookeeper path + controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion leaderReplicaIdOpt = Some(newLeaderBrokerId) - if (!replicaManager.isShuttingDown.get()) { - // start fetcher thread to current leader if we are not shutting down - replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) - } - else { - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + - "controller %d epoch %d since it is shutting down") - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch)) - } case None => // we should not come here stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + "controller %d epoch %d for partition [%s,%d] new leader %d") diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 153bc0b..b0b5dce 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -42,6 +42,7 @@ object ErrorMapping { val MessageSizeTooLargeCode: Short = 10 val StaleControllerEpochCode: Short = 11 val OffsetMetadataTooLargeCode: Short = 12 + val StaleLeaderEpochCode: Short = 13 private val exceptionToCode = Map[Class[Throwable], Short]( diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 566ca46..0fa32ac 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -18,9 +18,10 @@ package kafka.consumer import org.I0Itec.zkclient.ZkClient -import kafka.server.{AbstractFetcherThread, AbstractFetcherManager} +import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager} import kafka.cluster.{Cluster, Broker} import scala.collection.immutable +import scala.collection.Map import collection.mutable.HashMap import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock @@ -90,23 +91,23 @@ class ConsumerFetcherManager(private val consumerIdString: String, lock.unlock() } - leaderForPartitionsMap.foreach { - case(topicAndPartition, leaderBroker) => - val pti = partitionMap(topicAndPartition) - try { - addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) - } catch { - case t: Throwable => { - if (!isRunning.get()) - throw t /* If this thread is stopped, propagate this exception to kill the thread. */ - else { - warn("Failed to add leader for partition %s; will retry".format(topicAndPartition), t) - lock.lock() - noLeaderPartitionSet += topicAndPartition - lock.unlock() - } - } + try { + val partitionAndLeadersMap = Map(leaderForPartitionsMap.toList: _*) + addFetcherForPartitions(partitionAndLeadersMap.map{ + case (topicAndPartition, broker) => + topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())} + ) + } catch { + case t: Throwable => { + if (!isRunning.get()) + throw t /* If this thread is stopped, propagate this exception to kill the thread. */ + else { + warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t) + lock.lock() + noLeaderPartitionSet ++= leaderForPartitionsMap.keySet + lock.unlock() } + } } shutdownIdleFetcherThreads() diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index dda0a8f..f8c1b4e 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -66,7 +66,7 @@ class ConsumerFetcherThread(name: String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { - partitions.foreach(tap => removePartition(tap.topic, tap.partition)) + removePartitions(partitions.toSet) consumerFetcherManager.addPartitionsWithError(partitions) } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4719715..d489e08 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -177,6 +177,22 @@ class LogManager(val logDirs: Array[File], } debug("Shutdown complete.") } + + /** + * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset + * + * @param partitionAndOffsets Partition logs that need to be truncated + */ + def truncateTo(partitionAndOffsets: Map[TopicAndPartition, Long]) { + for ((topicAndPartition, truncateOffset) <- partitionAndOffsets) { + val log = logs.get(topicAndPartition) + // If the log does not exist, skip it + if (log != null) { + log.truncateTo(truncateOffset) + } + } + checkpointRecoveryPointOffsets() + } /** * Write out the current recovery point for all logs to a text file in the log directory diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 15b7bd3..7a32a46 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -18,14 +18,17 @@ package kafka.server import scala.collection.mutable +import scala.collection.Set +import scala.collection.Map import kafka.utils.Logging import kafka.cluster.Broker import kafka.metrics.KafkaMetricsGroup +import kafka.common.TopicAndPartition import com.yammer.metrics.core.Gauge abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1) extends Logging with KafkaMetricsGroup { - // map of (source brokerid, fetcher Id per source broker) => fetcher + // map of (source broker_id, fetcher_id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "] " @@ -60,36 +63,53 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: ) private def getFetcherId(topic: String, partitionId: Int) : Int = { - (topic.hashCode() + 31 * partitionId) % numFetchers + (31 * topic.hashCode() + partitionId) % numFetchers } // to be defined in subclass to create a specific fetcher def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread - def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) { - mapLock synchronized { + def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { + val partitionsPerFetcher = new mutable.HashMap[BrokerAndFetcherId, mutable.Set[(TopicAndPartition, Long)]] + for ((topicAndPartition, brokerAndInitialOffset) <- partitionAndOffsets) { var fetcherThread: AbstractFetcherThread = null - val key = new BrokerAndFetcherId(sourceBroker, getFetcherId(topic, partitionId)) - fetcherThreadMap.get(key) match { - case Some(f) => fetcherThread = f + val key = new BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition)) + + mapLock synchronized { + fetcherThreadMap.get(key) match { + case Some(f) => fetcherThread = f + case None => + fetcherThread = createFetcherThread(key.fetcherId,brokerAndInitialOffset.broker) + fetcherThreadMap.put(key, fetcherThread) + fetcherThread.start + } + } + + // prepare this partition to be added to the fetcher + partitionsPerFetcher.get(key) match { + case Some(tl) => tl += ((topicAndPartition, brokerAndInitialOffset.initOffset)) case None => - fetcherThread = createFetcherThread(key.fetcherId, sourceBroker) - fetcherThreadMap.put(key, fetcherThread) - fetcherThread.start + val brokerAndOffsets = new mutable.HashSet[(TopicAndPartition, Long)]() + brokerAndOffsets += ((topicAndPartition, brokerAndInitialOffset.initOffset)) + partitionsPerFetcher.put(key, brokerAndOffsets) } - fetcherThread.addPartition(topic, partitionId, initialOffset) - info("Adding fetcher for partition [%s,%d], initOffset %d to broker %d with fetcherId %d" - .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId)) } + + partitionsPerFetcher.foreach {case (brokerAndFetcherId, topicPartitionAndInitOffsets) => + fetcherThreadMap.get(brokerAndFetcherId).get.addPartitions(topicPartitionAndInitOffsets) + } + + info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) => + "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) } - def removeFetcher(topic: String, partitionId: Int) { - info("Removing fetcher for partition [%s,%d]".format(topic, partitionId)) + def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) { mapLock synchronized { for ((key, fetcher) <- fetcherThreadMap) { - fetcher.removePartition(topic, partitionId) + fetcher.removePartitions(partitions) } } + info("Removed fetcher for partitions %s".format(partitions.mkString(","))) } def shutdownIdleFetcherThreads() { @@ -115,4 +135,6 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: } } -case class BrokerAndFetcherId(broker: Broker, fetcherId: Int) \ No newline at end of file +case class BrokerAndFetcherId(broker: Broker, fetcherId: Int) + +case class BrokerAndInitialOffset(broker: Broker, initOffset: Long) \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index c64260f..4bf9c80 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -19,17 +19,18 @@ package kafka.server import kafka.cluster.Broker import collection.mutable +import scala.collection.Set import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import java.util.concurrent.atomic.AtomicLong import kafka.utils.{Pool, ShutdownableThread} -import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} import kafka.utils.Utils.inLock +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.AtomicLong /** @@ -166,25 +167,34 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } - def addPartition(topic: String, partitionId: Int, initialOffset: Long) { + def addPartitions(partitionAndOffsets: Set[(TopicAndPartition, Long)]) { partitionMapLock.lockInterruptibly() try { - val topicPartition = TopicAndPartition(topic, partitionId) - partitionMap.put( - topicPartition, - if (PartitionTopicInfo.isOffsetInvalid(initialOffset)) handleOffsetOutOfRange(topicPartition) else initialOffset) + for ((topicAndPartition, offset) <- partitionAndOffsets) { + // If the partitionMap already has the topic/partition, then do not update the map with the old offset + if (!partitionMap.contains(topicAndPartition)) + partitionMap.put( + topicAndPartition, + if (PartitionTopicInfo.isOffsetInvalid(offset)) handleOffsetOutOfRange(topicAndPartition) else offset) + } partitionMapCond.signalAll() } finally { partitionMapLock.unlock() } } - def removePartition(topic: String, partitionId: Int) { - partitionMapLock.lockInterruptibly() - try { - partitionMap.remove(TopicAndPartition(topic, partitionId)) - } finally { - partitionMapLock.unlock() + def removePartitions(topicAndPartitions: Set[TopicAndPartition]) { + // First check which topic partitions are assigned to this fetcher, + // This step does not require to be covered by partitionMapLock since + // both add/remove partitions calls are covered by the mapLock of the fetcher manager + val topicAndPartitionsForFetcher = topicAndPartitions.filter(tp => partitionMap.keySet.contains(tp)) + if (topicAndPartitionsForFetcher.size > 0) { + partitionMapLock.lockInterruptibly() + try { + topicAndPartitionsForFetcher.foreach(tp => partitionMap.remove(tp)) + } finally { + partitionMapLock.unlock() + } } } @@ -199,7 +209,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup { - private[this] var lagVal = new AtomicLong(-1L) + private[this] val lagVal = new AtomicLong(-1L) newGauge( metricId + "-ConsumerLag", new Gauge[Long] { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ee1cc0c..caeb105 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -29,7 +29,7 @@ import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit import kafka.common._ import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} -import kafka.controller.KafkaController +import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger @@ -50,6 +50,7 @@ class ReplicaManager(val config: KafkaConfig, private val allPartitions = new Pool[(String, Int), Partition] private var leaderPartitions = new mutable.HashSet[Partition]() private val leaderPartitionsLock = new Object + private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap @@ -116,7 +117,6 @@ class ReplicaManager(val config: KafkaConfig, val errorCode = ErrorMapping.NoError getReplica(topic, partitionId) match { case Some(replica) => - replicaFetcherManager.removeFetcher(topic, partitionId) /* TODO: handle deleteLog in a better way */ //if (deletePartition) // logManager.deleteLog(topic, partition) @@ -132,20 +132,26 @@ class ReplicaManager(val config: KafkaConfig, } def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = { - val responseMap = new collection.mutable.HashMap[(String, Int), Short] - if(stopReplicaRequest.controllerEpoch < controllerEpoch) { - stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d." - .format(localBrokerId, stopReplicaRequest.controllerEpoch) + - " Latest known controller epoch is %d " + controllerEpoch) - (responseMap, ErrorMapping.StaleControllerEpochCode) - } else { - controllerEpoch = stopReplicaRequest.controllerEpoch - val responseMap = new HashMap[(String, Int), Short] - for((topic, partitionId) <- stopReplicaRequest.partitions){ - val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions) - responseMap.put((topic, partitionId), errorCode) + replicaStateChangeLock synchronized { + val responseMap = new collection.mutable.HashMap[(String, Int), Short] + if(stopReplicaRequest.controllerEpoch < controllerEpoch) { + stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d." + .format(localBrokerId, stopReplicaRequest.controllerEpoch) + + " Latest known controller epoch is %d " + controllerEpoch) + (responseMap, ErrorMapping.StaleControllerEpochCode) + } else { + controllerEpoch = stopReplicaRequest.controllerEpoch + val responseMap = new HashMap[(String, Int), Short] + // First stop fetchers for all partitions, then mark them as stopped + replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map{ + case (topic, partition) => TopicAndPartition(topic, partition) + }) + for((topic, partitionId) <- stopReplicaRequest.partitions){ + val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions) + responseMap.put((topic, partitionId), errorCode) + } + (responseMap, ErrorMapping.NoError) } - (responseMap, ErrorMapping.NoError) } } @@ -198,10 +204,10 @@ class ReplicaManager(val config: KafkaConfig, } def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { - leaderAndISRRequest.partitionStateInfos.foreach(p => + leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partition), stateInfo) => stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d received from controller %d epoch %d for partition [%s,%d]" .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, - leaderAndISRRequest.controllerEpoch, p._1._1, p._1._2))) + leaderAndISRRequest.controllerEpoch, topic, partition))} info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest)) val responseMap = new collection.mutable.HashMap[(String, Int), Short] @@ -209,34 +215,20 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d. Latest known controller epoch is %d" .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch)) (responseMap, ErrorMapping.StaleControllerEpochCode) - }else { + } else { val controllerId = leaderAndISRRequest.controllerId controllerEpoch = leaderAndISRRequest.controllerEpoch - for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) { - var errorCode = ErrorMapping.NoError - val topic = topicAndPartition._1 - val partitionId = topicAndPartition._2 - - val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader - try { - if(requestedLeaderId == config.brokerId) - makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId) - else - makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders, - leaderAndISRRequest.correlationId) - } catch { - case e: Throwable => - val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + - "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, - leaderAndISRRequest.controllerEpoch, topicAndPartition) - stateChangeLogger.error(errorMsg, e) - errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) - } - responseMap.put(topicAndPartition, errorCode) - stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, - topicAndPartition._1, topicAndPartition._2)) - } + + val makeLeaderPartitionInfos = leaderAndISRRequest.partitionStateInfos + .filter(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId) + val partitionsTobeLeader = makeLeaderPartitionInfos + .map{ case ((topic, partition), partitionStateInfo) => TopicAndPartition(topic, partition) -> partitionStateInfo} + val partitionsTobeFollower = (leaderAndISRRequest.partitionStateInfos -- makeLeaderPartitionInfos.keys) + .map{ case ((topic, partition), partitionStateInfo) => TopicAndPartition(topic, partition) -> partitionStateInfo} + + if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap) + if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) + info("Handled leader and isr request %s".format(leaderAndISRRequest)) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions @@ -249,37 +241,149 @@ class ReplicaManager(val config: KafkaConfig, } } - private def makeLeader(controllerId: Int, epoch:Int, topic: String, partitionId: Int, - partitionStateInfo: PartitionStateInfo, correlationId: Int) = { - val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch + /* + * Make the current broker to become follower for a given set of partitions by: + * + * 1. Stop fetchers for these partitions + * 2. Update the partition metadata in cache + * 3. Add these partitions to the leader partitions set + * + */ + private def makeLeaders(controllerId: Int, epoch: Int, partitionStates: Map[TopicAndPartition, PartitionStateInfo], + correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-leader transition for partition [%s,%d]") - .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) - val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, correlationId)) { - // also add this partition to the list of partitions for which the leader is the current broker - leaderPartitionsLock synchronized { - leaderPartitions += partition - } + "starting the become-leader transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, partitionStates.keySet.mkString(","))) + + replicaStateChangeLock synchronized { + for (tp <- partitionStates.keys) responseMap.put((tp.topic, tp.partition), ErrorMapping.NoError) + + // First check partition's leader epoch + val partitionAndLeaderISRs = new HashMap[Partition, LeaderIsrAndControllerEpoch]() + partitionStates.foreach{ case (topicAndPartition, partitionStateInfo) => { + val partition = getOrCreatePartition(topicAndPartition.topic, topicAndPartition.partition, partitionStateInfo.replicationFactor) + val partitionLeaderEpoch = partition.getLeaderEpoch() + if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) { + // If the leader epoch is valid record the epoch of the controller that made the leadership decision. + // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path + partitionAndLeaderISRs.put(partition, partitionStateInfo.leaderIsrAndControllerEpoch) + } else { + // Otherwise record the error code in response + stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d") + .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch, topicAndPartition.topic, + topicAndPartition.partition, partitionLeaderEpoch, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)) + responseMap.put((topicAndPartition.topic, topicAndPartition.partition), ErrorMapping.StaleLeaderEpochCode) + } + }} + + try { + // For the valid partitions, first stop fetchers for all the partitions + replicaFetcherManager.removeFetcherForPartitions(partitionAndLeaderISRs.keySet.map(p => TopicAndPartition(p.topic, p.partitionId))) + stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionAndLeaderISRs.keySet.mkString(","), controllerId, correlationId)) + + // Update the partition information to be the leader + partitionAndLeaderISRs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeLeader(controllerId, leaderIsrAndControllerEpoch, correlationId)} + + // Finally add these partitions to the list of partitions for which the leader is the current broker + leaderPartitionsLock synchronized { + leaderPartitions ++= partitionAndLeaderISRs.keySet + } + } catch { + case e: Throwable => + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) + stateChangeLogger.error(errorMsg, e) + // TODO: Only use one error code per request since we are doing state transitions in batch now + } } - stateChangeLogger.trace("Broker %d completed become-leader transition for partition [%s,%d]".format(localBrokerId, topic, partitionId)) + + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-leader transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, partitionStates.keySet.mkString(","))) } - private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int, - partitionStateInfo: PartitionStateInfo, leaders: Set[Broker], correlationId: Int) { - val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch + /* + * Make the current broker to become follower for a given set of partitions by: + * + * 1. Stop fetchers for these partitions + * 2. Truncate the log and checkpoint offsets for these partitions. + * 3. If the broker is not shutting down, add the fetcher to the new leaders + * 4. Update the partition metadata in cache + * 5. Remove these partitions from the leader partitions set + * + * The ordering of doing these steps make sure that the replicas in transition will not + * take any more messages before checkpointing offsets so that all messages before the checkpoint + * are guaranteed to be flushed to disks + */ + private def makeFollowers(controllerId: Int, epoch: Int, partitionStates: Map[TopicAndPartition, PartitionStateInfo], + leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-follower transition for partition [%s,%d]") - .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) - - val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, leaders, correlationId)) { - // remove this replica's partition from the ISR expiration queue - leaderPartitionsLock synchronized { - leaderPartitions -= partition + "starting the become-follower transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, partitionStates.keySet.mkString(","))) + + replicaStateChangeLock synchronized { + for (tp <- partitionStates.keys) responseMap.put((tp.topic, tp.partition), ErrorMapping.NoError) + + // First check partition's leader epoch + val partitionAndLeaderISRs = new HashMap[Partition, LeaderIsrAndControllerEpoch]() + partitionStates.foreach(tpi => { + val partition = getOrCreatePartition(tpi._1.topic, tpi._1.partition, tpi._2.replicationFactor) + val partitionLeaderEpoch = partition.getLeaderEpoch() + if (partitionLeaderEpoch < tpi._2.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) { + // If the leader epoch is valid record the epoch of the controller that made the leadership decision. + // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path + partitionAndLeaderISRs.put(partition, tpi._2.leaderIsrAndControllerEpoch) + } else { + // Otherwise record the error code in response + stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the request's leader epoch %d") + .format(localBrokerId, correlationId, controllerId, tpi._2.leaderIsrAndControllerEpoch.controllerEpoch, tpi._1.topic, + tpi._1.partition, partitionLeaderEpoch, tpi._2.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)) + responseMap.put((tpi._1.topic, tpi._1.partition), ErrorMapping.StaleLeaderEpochCode) + } + }) + + try { + replicaFetcherManager.removeFetcherForPartitions(partitionAndLeaderISRs.keySet.map(p => TopicAndPartition(p.topic, p.partitionId))) + stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionAndLeaderISRs.keySet.mkString(","), controllerId, correlationId)) + + logManager.truncateTo(partitionAndLeaderISRs.map{ case(partition, leaderISRAndControllerEpoch) => + TopicAndPartition(partition.topic, partition.partitionId) -> partition.getOrCreateReplica().highWatermark + }) + stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionAndLeaderISRs.keySet.mkString(","), controllerId, correlationId)) + + if (!isShuttingDown.get()) { + replicaFetcherManager.addFetcherForPartitions(partitionAndLeaderISRs.map{ case(partition, leaderISRAndControllerEpoch) => + TopicAndPartition(partition.topic, partition.partitionId) -> BrokerAndInitialOffset(leaders.find(_.id == leaderISRAndControllerEpoch.leaderAndIsr.leader).get, partition.getReplica().get.logEndOffset)} + ) + } + else { + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d since it is shutting down") + .format(localBrokerId, correlationId, controllerId, epoch)) + } + + partitionAndLeaderISRs.foreach(f => f._1.makeFollower(controllerId, f._2, leaders, correlationId)) + + leaderPartitionsLock synchronized { + leaderPartitions --= partitionAndLeaderISRs.keySet + } + } catch { + case e: Throwable => + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) + stateChangeLogger.error(errorMsg, e) + // TODO: Only use one error code per request since we are doing state transitions in batch now } } - stateChangeLogger.trace("Broker %d completed the become-follower transition for partition [%s,%d]".format(localBrokerId, topic, partitionId)) + + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-follower transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, partitionStates.keySet.mkString(","))) } private def maybeShrinkIsr(): Unit = {