diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7c03a24..215f2ed 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -17,6 +17,7 @@ package kafka.controller import java.util +import java.util.concurrent.atomic.AtomicBoolean import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse} @@ -156,6 +157,7 @@ object KafkaController extends Logging { class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true + val isRebalanceInProgress = new AtomicBoolean(false) private val stateChangeLogger = KafkaController.stateChangeLogger val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) @@ -1167,6 +1169,11 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat @throws(classOf[Exception]) def handleNewSession() { info("ZK expired; shut down all controller components and try to re-elect") + while (!isRebalanceInProgress.compareAndSet(false, true)) { + //avoid controllerContext.controllerLock deadlock in autorebalance schedule + Thread.sleep(100) + } + inLock(controllerContext.controllerLock) { onControllerResignation() controllerElector.elect @@ -1179,7 +1186,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat } private def checkAndTriggerPartitionRebalance(): Unit = { - if (isActive()) { + val canRebalance = isRebalanceInProgress.compareAndSet(false, true) + if (canRebalance && isActive()) { trace("checking need to trigger partition rebalance") // get all the active brokers var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null @@ -1230,6 +1238,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat } } } + isRebalanceInProgress.set(false) } } }