From 51355c39c69cd5551c9bf6e5dc4df74485257622 Mon Sep 17 00:00:00 2001 From: Dmitry Bugaychenko Date: Mon, 6 Apr 2015 20:25:39 +0300 Subject: [PATCH] Controlled shutdown patch --- .../controller/ControllerChannelManager.scala | 73 ++++++++++++++--- .../scala/kafka/controller/KafkaController.scala | 93 ++++++++++++++-------- .../kafka/controller/PartitionLeaderSelector.scala | 11 ++- .../main/scala/kafka/server/ReplicaManager.scala | 8 +- 4 files changed, 140 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 97acdb2..586aba2 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -16,17 +16,19 @@ */ package kafka.controller +import java.util +import java.util.concurrent.locks.ReentrantLock + import kafka.network.{Receive, BlockingChannel} import kafka.utils.{CoreUtils, Logging, ShutdownableThread} import collection.mutable.HashMap import kafka.cluster.Broker -import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} +import java.util.concurrent.{TimeUnit, Semaphore, LinkedBlockingQueue, BlockingQueue} import kafka.server.KafkaConfig -import collection.mutable +import scala.collection.{JavaConversions, mutable, Set} import kafka.api._ import kafka.common.TopicAndPartition import kafka.api.RequestOrResponse -import collection.Set class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] @@ -47,12 +49,33 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } } + // ODKL Patch: Execute a code tracking the completion of broker requests being sent. + private val tracker = new ThreadLocal[RequestTracker] + def tracked[T]()(fun: => T): T = { + if (tracker.get() != null) { + throw new IllegalStateException("Tracker already initialized.") + } + val reqisteredTracker: RequestTracker = new RequestTracker() + tracker.set(reqisteredTracker) + try { + return fun + } finally { + tracker.remove() + reqisteredTracker.await() + } + } + def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) => - stateInfo.messageQueue.put((request, callback)) + // ODKL Patch: pass tracker to acknowledge request + val requestsTracker: RequestTracker = tracker.get() + stateInfo.messageQueue.put((request, callback, requestsTracker)) + if(requestsTracker != null) { + requestsTracker.addRequest() + } case None => warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId)) } @@ -76,7 +99,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } private def addNewBroker(broker: Broker) { - val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize) + val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit, RequestTracker)](config.controllerMessageQueueSize) debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id)) val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) val channel = new BlockingChannel(brokerEndPoint.host, brokerEndPoint.port, @@ -90,10 +113,13 @@ class ControllerChannelManager (private val controllerContext: ControllerContext private def removeExistingBroker(brokerId: Int) { try { - brokerStateInfo(brokerId).channel.disconnect() - brokerStateInfo(brokerId).messageQueue.clear() - brokerStateInfo(brokerId).requestSendThread.shutdown() - brokerStateInfo.remove(brokerId) + // ODKL Patch: When removing broker, make sure to notify trackers that the requests won't ever complete. + val info: ControllerBrokerStateInfo = brokerStateInfo.remove(brokerId).get + info.channel.disconnect() + info.requestSendThread.shutdown() + val remaning = new util.ArrayList[(RequestOrResponse, (RequestOrResponse) => Unit, RequestTracker)](info.messageQueue.remainingCapacity()) + info.messageQueue.drainTo(remaning) + JavaConversions.collectionAsScalaIterable(remaning).foreach(x => if (x._3 != null) x._3.releaseRequest()) }catch { case e: Throwable => error("Error while removing broker by the controller", e) } @@ -109,7 +135,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext class RequestSendThread(val controllerId: Int, val controllerContext: ControllerContext, val toBroker: Broker, - val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], + val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit, RequestTracker)], val channel: BlockingChannel) extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { private val lock = new Object() @@ -120,6 +146,7 @@ class RequestSendThread(val controllerId: Int, val queueItem = queue.take() val request = queueItem._1 val callback = queueItem._2 + val tracker = queueItem._3 var receive: Receive = null try { lock synchronized { @@ -166,6 +193,10 @@ class RequestSendThread(val controllerId: Int, error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. channel.disconnect() + } finally { + if (tracker != null) { + tracker.releaseRequest() + } } } @@ -327,7 +358,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging case class ControllerBrokerStateInfo(channel: BlockingChannel, broker: Broker, - messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], + messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit, RequestTracker)], requestSendThread: RequestSendThread) case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: (RequestOrResponse) => Unit = null) @@ -362,3 +393,23 @@ object Callbacks { } } } + +// ODKL Patch: Used to track completion of the request. +class RequestTracker() { + val MAX_REQUESTS = 100000 + val requests : Semaphore = new Semaphore(MAX_REQUESTS) + + def addRequest(): Unit = { + if(!requests.tryAcquire()) { + throw new IllegalStateException("Maximum amount of tracked requests " + MAX_REQUESTS + " exceeded") + } + } + + def releaseRequest(): Unit = { + requests.release() + } + + def await(): Unit = { + requests.acquire(MAX_REQUESTS) + } +} diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3a09377..1e149c1 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -33,9 +33,10 @@ import kafka.utils.CoreUtils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.locks.ReentrantLock import kafka.server._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.concurrent.locks.{Lock, ReentrantLock} +import scala.Some import kafka.common.TopicAndPartition class ControllerContext(val zkClient: ZkClient, @@ -150,7 +151,8 @@ object KafkaController extends Logging { class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " - private var isRunning = true + // ODKL Patch: Volatile to prevent deadlocks in shutdown. + @volatile private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) @@ -249,26 +251,30 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt allPartitionsAndReplicationFactorOnBroker.foreach { case(topicAndPartition, replicationFactor) => + // Move leadership serially to relinquish lock. - inLock(controllerContext.controllerLock) { - controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => - if (replicationFactor > 1) { - if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { - // If the broker leads the topic partition, transition the leader and update isr. Updates zk and - // notifies all affected brokers - partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, - controlledShutdownPartitionLeaderSelector) - } else { - // Stop the replica first. The state change below initiates ZK changes which should take some time - // before which the stop replica request should be completed (in most cases) - brokerRequestBatch.newBatch() - brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, - topicAndPartition.partition, deletePartition = false) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) - - // If the broker is a follower, updates the isr in ZK and notifies the current leader - replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, - topicAndPartition.partition, id)), OfflineReplica) + // ODKL Patch: Make sure leadership moved before switching to the next one. + controllerContext.controllerChannelManager.tracked() { + inLock(controllerContext.controllerLock) { + controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => + if (replicationFactor > 1) { + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { + // If the broker leads the topic partition, transition the leader and update isr. Updates zk and + // notifies all affected brokers + partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, + controlledShutdownPartitionLeaderSelector) + } else { + // Stop the replica first. The state change below initiates ZK changes which should take some time + // before which the stop replica request should be completed (in most cases) + brokerRequestBatch.newBatch() + brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, + topicAndPartition.partition, deletePartition = false) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + + // If the broker is a follower, updates the isr in ZK and notifies the current leader + replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, + topicAndPartition.partition, id)), OfflineReplica) + } } } } @@ -375,7 +381,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * Returns true if this broker is the current controller. */ def isActive(): Boolean = { - inLock(controllerContext.controllerLock) { + isRunning && inLockIfRunning(controllerContext.controllerLock) { controllerContext.controllerChannelManager != null } } @@ -1119,7 +1125,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt trace("checking need to trigger partition rebalance") // get all the active brokers var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null - inLock(controllerContext.controllerLock) { + // ODKL Patch: Prevent deadlocks with auto leader rebalance. + inLockIfRunning(controllerContext.controllerLock) { preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy { case(topicAndPartition, assignedReplicas) => assignedReplicas.head @@ -1131,7 +1138,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt case(leaderBroker, topicAndPartitionsForBroker) => { var imbalanceRatio: Double = 0 var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null - inLock(controllerContext.controllerLock) { + inLockIfRunning(controllerContext.controllerLock) { topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case(topicPartition, replicas) => { @@ -1148,26 +1155,50 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions // that need to be on this broker if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { + info("Balancing broker " + leaderBroker + " with imbalance rate " + imbalanceRatio) topicsNotInPreferredReplica.foreach { - case(topicPartition, replicas) => { - inLock(controllerContext.controllerLock) { - // do this check only if the broker is live and there are no partitions being reassigned currently - // and preferred replica election is not in progress - if (controllerContext.liveBrokerIds.contains(leaderBroker) && + case (topicPartition, replicas) => { + // ODKL Patch: Make sure leadership moved before switching to the next one. + controllerContext.controllerChannelManager.tracked() { + inLockIfRunning(controllerContext.controllerLock) { + // do this check only if the broker is live and there are no partitions being reassigned currently + // and preferred replica election is not in progress + if (controllerContext.liveBrokerIds.contains(leaderBroker) && controllerContext.partitionsBeingReassigned.size == 0 && controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 && !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && controllerContext.allTopics.contains(topicPartition.topic)) { - onPreferredReplicaElection(Set(topicPartition), true) + onPreferredReplicaElection(Set(topicPartition), true) + } } } } } + info("Balancing broker " + leaderBroker + " done") } } } } } + + // ODKL Patch to prevent deadlocks in shutdown. + /** + * Execute the given function inside the lock + */ + def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = { + if (isRunning || lock.isHeldByCurrentThread) { + if (!lock.tryLock(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("Failed to acquire controller lock in 10 seconds."); + } + try { + return fun + } finally { + lock.unlock() + } + } else { + throw new IllegalStateException("Controller is not running, not allowed to lock.") + } + } } /** diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 3b15ab4..78047a0 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -84,7 +84,11 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi } case false => val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) - val newLeader = liveReplicasInIsr.head + // ODKL Patch: Trying to select replica from ISR not depending on ISR join order, but following the + // assignment order. Preferred replica is the first one, thus if possible it'll be chosen, but most + // probably it is the dead one, thus we fallback to second preferred replica. Here we do not care about + // overloading second preferred replica as we do not expect rolling crashed. + val newLeader = liveReplicasInIsr.sortBy(x => assignedReplicas.indexOf(x)).head debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) @@ -185,7 +189,10 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) - val newLeaderOpt = newIsr.headOption + // ODKL Patch: Trying to select replica from ISR not depending on ISR join order. If preferred replica is in ISR, choose + // it, choose the last replica from ISR - it is expected to be the youngest (most probably already survived rolling + // update) + val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) assignedReplicas.headOption else newIsr.lastOption newLeaderOpt match { case Some(newLeader) => debug("Partition %s : current leader = %d, new leader = %d" diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 144a15e..b3bd07c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -808,8 +808,14 @@ class ReplicaManager(val config: KafkaConfig, val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap + var hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap try { + // ODKL Patch: If some partitions are not yet initialized, restore them from the old checkpoints. + // This prevents broker from erasing checkpoints in case of racing during broker initialization + // TODO: Make this configurable. + if (java.lang.management.ManagementFactory.getRuntimeMXBean.getUptime < TimeUnit.HOURS.toMillis(1)) { + highWatermarkCheckpoints(dir).read().foreach(entry => if (!hwms.contains(entry._1)) hwms += entry) + } highWatermarkCheckpoints(dir).write(hwms) } catch { case e: IOException => -- 1.9.3 (Apple Git-50)