diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index f79c1dc..e297240 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -16,19 +16,21 @@ */ package kafka.controller +import java.util +import java.util.concurrent.locks.ReentrantLock + import kafka.network.{Receive, BlockingChannel} import kafka.utils.{Utils, Logging, ShutdownableThread} import collection.mutable.HashMap import kafka.cluster.Broker -import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} +import java.util.concurrent.{TimeUnit, Semaphore, LinkedBlockingQueue, BlockingQueue} import kafka.server.KafkaConfig -import collection.mutable +import scala.collection.{JavaConversions, mutable, Set} import kafka.api._ import org.apache.log4j.Logger import scala.Some import kafka.common.TopicAndPartition import kafka.api.RequestOrResponse -import collection.Set class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] @@ -49,12 +51,33 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } } + // ODKL Patch: Execute a code tracking the completion of broker requests being sent. + private val tracker = new ThreadLocal[RequestTracker] + def tracked[T]()(fun: => T): T = { + if (tracker.get() != null) { + throw new IllegalStateException("Tracker already initialized.") + } + val reqisteredTracker: RequestTracker = new RequestTracker() + tracker.set(reqisteredTracker) + try { + return fun + } finally { + tracker.remove() + reqisteredTracker.await() + } + } + def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) => - stateInfo.messageQueue.put((request, callback)) + // ODKL Patch: pass tracker to acknowledge request + val requestsTracker: RequestTracker = tracker.get() + stateInfo.messageQueue.put((request, callback, requestsTracker)) + if(requestsTracker != null) { + requestsTracker.addRequest() + } case None => warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId)) } @@ -78,7 +101,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } private def addNewBroker(broker: Broker) { - val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize) + val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit, RequestTracker)](config.controllerMessageQueueSize) debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id)) val channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, @@ -91,10 +114,13 @@ class ControllerChannelManager (private val controllerContext: ControllerContext private def removeExistingBroker(brokerId: Int) { try { - brokerStateInfo(brokerId).channel.disconnect() - brokerStateInfo(brokerId).messageQueue.clear() - brokerStateInfo(brokerId).requestSendThread.shutdown() - brokerStateInfo.remove(brokerId) + // ODKL Patch: When removing broker, make sure to notify trackers that the requests won't ever complete. + val info: ControllerBrokerStateInfo = brokerStateInfo.remove(brokerId).get + info.channel.disconnect() + info.requestSendThread.shutdown() + val remaning = new util.ArrayList[(RequestOrResponse, (RequestOrResponse) => Unit, RequestTracker)](info.messageQueue.remainingCapacity()) + info.messageQueue.drainTo(remaning) + JavaConversions.collectionAsScalaIterable(remaning).foreach(x => if (x._3 != null) x._3.releaseRequest()) }catch { case e: Throwable => error("Error while removing broker by the controller", e) } @@ -110,7 +136,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext class RequestSendThread(val controllerId: Int, val controllerContext: ControllerContext, val toBroker: Broker, - val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], + val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit, RequestTracker)], val channel: BlockingChannel) extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { private val lock = new Object() @@ -121,6 +147,7 @@ class RequestSendThread(val controllerId: Int, val queueItem = queue.take() val request = queueItem._1 val callback = queueItem._2 + val tracker = queueItem._3 var receive: Receive = null try { lock synchronized { @@ -165,6 +192,10 @@ class RequestSendThread(val controllerId: Int, warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. channel.disconnect() + } finally { + if (tracker != null) { + tracker.releaseRequest() + } } } @@ -316,7 +347,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging case class ControllerBrokerStateInfo(channel: BlockingChannel, broker: Broker, - messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], + messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit, RequestTracker)], requestSendThread: RequestSendThread) case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: (RequestOrResponse) => Unit = null) @@ -351,3 +382,23 @@ object Callbacks { } } } + +// ODKL Patch: Used to track completion of the request. +class RequestTracker() { + val MAX_REQUESTS = 100000 + val requests : Semaphore = new Semaphore(MAX_REQUESTS) + + def addRequest(): Unit = { + if(!requests.tryAcquire()) { + throw new IllegalStateException("Maximum amount of tracked requests " + MAX_REQUESTS + " exceeded") + } + } + + def releaseRequest(): Unit = { + requests.release() + } + + def await(): Unit = { + requests.acquire(MAX_REQUESTS) + } +} diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 21fb715..d69e93b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -33,8 +33,8 @@ import kafka.utils.Utils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.concurrent.locks.{Lock, ReentrantLock} import scala.Some import kafka.common.TopicAndPartition @@ -154,7 +154,8 @@ object KafkaController extends Logging { class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " - private var isRunning = true + // ODKL Patch: Volatile to prevent deadlocks in shutdown. + @volatile private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) @@ -248,29 +249,32 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg allPartitionsAndReplicationFactorOnBroker.foreach { case(topicAndPartition, replicationFactor) => - // Move leadership serially to relinquish lock. - inLock(controllerContext.controllerLock) { - controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => - if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { - // If the broker leads the topic partition, transition the leader and update isr. Updates zk and - // notifies all affected brokers - partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, - controlledShutdownPartitionLeaderSelector) - } - else { - // Stop the replica first. The state change below initiates ZK changes which should take some time - // before which the stop replica request should be completed (in most cases) - brokerRequestBatch.newBatch() - brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, - topicAndPartition.partition, deletePartition = false) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) - - // If the broker is a follower, updates the isr in ZK and notifies the current leader - replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, - topicAndPartition.partition, id)), OfflineReplica) + // ODKL Patch: Make sure leadership moved before switching to the next one. + controllerContext.controllerChannelManager.tracked() { + // Move leadership serially to relinquish lock. + inLock(controllerContext.controllerLock) { + controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { + // If the broker leads the topic partition, transition the leader and update isr. Updates zk and + // notifies all affected brokers + partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, + controlledShutdownPartitionLeaderSelector) + } + else { + // Stop the replica first. The state change below initiates ZK changes which should take some time + // before which the stop replica request should be completed (in most cases) + brokerRequestBatch.newBatch() + brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, + topicAndPartition.partition, deletePartition = false) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + + // If the broker is a follower, updates the isr in ZK and notifies the current leader + replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, + topicAndPartition.partition, id)), OfflineReplica) + } + } } } - } } def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) { @@ -359,7 +363,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * Returns true if this broker is the current controller. */ def isActive(): Boolean = { - inLock(controllerContext.controllerLock) { + isRunning && inLockIfRunning(controllerContext.controllerLock) { controllerContext.controllerChannelManager != null } } @@ -1075,7 +1079,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg trace("checking need to trigger partition rebalance") // get all the active brokers var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null - inLock(controllerContext.controllerLock) { + // ODKL Patch: Prevent deadlocks with auto leader rebalance. + inLockIfRunning(controllerContext.controllerLock) { preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy { case(topicAndPartition, assignedReplicas) => assignedReplicas.head @@ -1087,7 +1092,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg case(leaderBroker, topicAndPartitionsForBroker) => { var imbalanceRatio: Double = 0 var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null - inLock(controllerContext.controllerLock) { + inLockIfRunning(controllerContext.controllerLock) { topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case(topicPartition, replicas) => { @@ -1104,27 +1109,51 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions // that need to be on this broker if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { + info("Balancing broker " + leaderBroker + " with imbalance rate " + imbalanceRatio) topicsNotInPreferredReplica.foreach { - case(topicPartition, replicas) => { - inLock(controllerContext.controllerLock) { - // do this check only if the broker is live and there are no partitions being reassigned currently - // and preferred replica election is not in progress - if (controllerContext.liveBrokerIds.contains(leaderBroker) && + case (topicPartition, replicas) => { + // ODKL Patch: Make sure leadership moved before switching to the next one. + controllerContext.controllerChannelManager.tracked() { + inLockIfRunning(controllerContext.controllerLock) { + // do this check only if the broker is live and there are no partitions being reassigned currently + // and preferred replica election is not in progress + if (controllerContext.liveBrokerIds.contains(leaderBroker) && controllerContext.partitionsBeingReassigned.size == 0 && controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 && !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) && controllerContext.allTopics.contains(topicPartition.topic)) { - onPreferredReplicaElection(Set(topicPartition), true) + onPreferredReplicaElection(Set(topicPartition), true) + } } } } } + info("Balancing broker " + leaderBroker + " done") } } } } } + + // ODKL Patch to prevent deadlocks in shutdown. + /** + * Execute the given function inside the lock + */ + def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = { + if (isRunning || lock.isHeldByCurrentThread) { + if (!lock.tryLock(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("Failed to acquire controller lock in 10 seconds."); + } + try { + return fun + } finally { + lock.unlock() + } + } else { + throw new IllegalStateException("Controller is not running, not allowed to lock.") + } + } } /** diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index fa29bbe..1a5fdf8 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -69,7 +69,11 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) } case false => - val newLeader = liveBrokersInIsr.head + // ODKL Patch: Trying to select replica from ISR not depending on ISR join order, but following the + // assignment order. Preferred replica is the first one, thus if possible it'll be chosen, but most + // probably it is the dead one, thus we fallback to second preferred replica. Here we do not care about + // overloading second preferred replica as we do not expect rolling crashed. + val newLeader = liveBrokersInIsr.sortBy(x => assignedReplicas.indexOf(x)).head debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) @@ -170,7 +174,10 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) - val newLeaderOpt = newIsr.headOption + // ODKL Patch: Trying to select replica from ISR not depending on ISR join order. If preferred replica is in ISR, choose + // it, choose the last replica from ISR - it is expected to be the youngest (most probably already survived rolling + // update) + val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) assignedReplicas.headOption else newIsr.lastOption newLeaderOpt match { case Some(newLeader) => debug("Partition %s : current leader = %d, new leader = %d" diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ad4ffe0..68ed842 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,21 +16,22 @@ */ package kafka.server -import kafka.cluster.{Broker, Partition, Replica} -import collection._ -import mutable.HashMap -import org.I0Itec.zkclient.ZkClient import java.io.{File, IOException} +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import kafka.utils._ -import kafka.log.LogManager -import kafka.metrics.KafkaMetricsGroup + import com.yammer.metrics.core.Gauge -import java.util.concurrent.TimeUnit +import kafka.api.{LeaderAndIsrRequest, PartitionStateInfo, StopReplicaRequest} +import kafka.cluster.{Broker, Partition, Replica} import kafka.common._ -import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController -import org.apache.log4j.Logger +import kafka.log.LogManager +import kafka.metrics.KafkaMetricsGroup +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient + +import scala.collection._ +import scala.collection.mutable.HashMap object ReplicaManager { @@ -442,8 +443,14 @@ class ReplicaManager(val config: KafkaConfig, val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap + var hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap try { + // ODKL Patch: If some partitions are not yet initialized, restore them from the old checkpoints. + // This prevents broker from erasing checkpoints in case of racing during broker initialization + // TODO: Make this configurable. + if (java.lang.management.ManagementFactory.getRuntimeMXBean.getUptime < TimeUnit.HOURS.toMillis(1)) { + highWatermarkCheckpoints(dir).read().foreach(entry => if (!hwms.contains(entry._1)) hwms += entry) + } highWatermarkCheckpoints(dir).write(hwms) } catch { case e: IOException => diff --git a/kafka-patch-review.py b/kafka-patch-review.py old mode 100644 new mode 100755