diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 9f521fa..0f9eebe 100755 --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -30,7 +30,7 @@ import kafka.api.RequestOrResponse import collection.Set class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging { - private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] + protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " @@ -100,7 +100,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext } } - private def startRequestSendThread(brokerId: Int) { + protected def startRequestSendThread(brokerId: Int) { val requestThread = brokerStateInfo(brokerId).requestSendThread if(requestThread.getState == Thread.State.NEW) requestThread.start() @@ -280,49 +280,67 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) { - leaderAndIsrRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) - for (p <- partitionStateInfos) { - val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" - stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + - "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, - p._2.leaderIsrAndControllerEpoch, correlationId, broker, - p._1._1, p._1._2)) + try { + leaderAndIsrRequestMap.foreach { m => + val broker = m._1 + val partitionStateInfos = m._2.toMap + val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet + val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map(b => b.getBrokerEndPoint(controller.config.interBrokerSecurityProtocol)) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) + for (p <- partitionStateInfos) { + val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" + stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, + p._2.leaderIsrAndControllerEpoch, correlationId, broker, + p._1._1, p._1._2)) + } + controller.sendRequest(broker, leaderAndIsrRequest, null) } - controller.sendRequest(broker, leaderAndIsrRequest, null) - } - leaderAndIsrRequestMap.clear() - updateMetadataRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - - val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0 - val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch, - correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers) - partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + - "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, - correlationId, broker, p._1))) - controller.sendRequest(broker, updateMetadataRequest, null) - } - updateMetadataRequestMap.clear() - stopReplicaRequestMap foreach { case(broker, replicaInfoList) => - val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet - val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet - debug("The stop replica request (delete = true) sent to broker %d is %s" - .format(broker, stopReplicaWithDelete.mkString(","))) - debug("The stop replica request (delete = false) sent to broker %d is %s" - .format(broker, stopReplicaWithoutDelete.mkString(","))) - replicaInfoList.foreach { r => - val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, - Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) - controller.sendRequest(broker, stopReplicaRequest, r.callback) + leaderAndIsrRequestMap.clear() + updateMetadataRequestMap.foreach { m => + val broker = m._1 + val partitionStateInfos = m._2.toMap + + val versionId = if (controller.config.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0 + val updateMetadataRequest = new UpdateMetadataRequest(versionId = versionId.toShort, controllerId = controllerId, controllerEpoch = controllerEpoch, + correlationId = correlationId, clientId = clientId, partitionStateInfos = partitionStateInfos, aliveBrokers = controllerContext.liveOrShuttingDownBrokers) + partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + + "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, + correlationId, broker, p._1))) + controller.sendRequest(broker, updateMetadataRequest, null) + } + updateMetadataRequestMap.clear() + stopReplicaRequestMap foreach { case(broker, replicaInfoList) => + val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet + val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet + debug("The stop replica request (delete = true) sent to broker %d is %s" + .format(broker, stopReplicaWithDelete.mkString(","))) + debug("The stop replica request (delete = false) sent to broker %d is %s" + .format(broker, stopReplicaWithoutDelete.mkString(","))) + replicaInfoList.foreach { r => + val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, + Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) + controller.sendRequest(broker, stopReplicaRequest, r.callback) + } + } + stopReplicaRequestMap.clear() + } catch { + case e : Throwable => { + if(leaderAndIsrRequestMap.size > 0) { + error("Haven't been able to send leader and isr requests, current state of " + + "the map is %s".format(leaderAndIsrRequestMap.toString())) + } + if(updateMetadataRequestMap.size > 0) { + error("Haven't been able to send metadata update requests, current state of " + + "the map is %s".format(updateMetadataRequestMap.toString())) + } + if(stopReplicaRequestMap.size > 0) { + error("Haven't been able to send stop replica requests, current state of " + + "the map is %s".format(stopReplicaRequestMap.toString())) + } + throw new IllegalStateException(e) } } - stopReplicaRequestMap.clear() } } diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala index b4fc755..365736b 100755 --- core/src/main/scala/kafka/controller/KafkaController.scala +++ core/src/main/scala/kafka/controller/KafkaController.scala @@ -341,6 +341,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * required to clean up internal controller data structures */ def onControllerResignation() { + debug("Controller resigning, broker id %d".format(config.brokerId)) // de-register listeners deregisterIsrChangeNotificationListener() deregisterReassignedPartitionsListener() @@ -998,9 +999,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * @param brokers The brokers that the update metadata request should be sent to */ def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) { - brokerRequestBatch.newBatch() - brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + try { + brokerRequestBatch.newBatch() + brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + } catch { + case e : IllegalStateException => { + // Resign if the controller is in an illegal state + error("Forcing the controller to resign") + controllerElector.resign() + + throw e + } + } } /** diff --git core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala new file mode 100644 index 0000000..0fd07e3 --- /dev/null +++ core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.controller + +import java.util.concurrent.LinkedBlockingQueue +import java.util.Properties + +import junit.framework.Assert._ +import org.scalatest.junit.JUnit3Suite + +import org.junit.{Test, After, Before} +import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} +import org.I0Itec.zkclient.serialize.ZkSerializer +import org.apache.log4j.{Logger, Level} + +import kafka.api.RequestOrResponse +import kafka.common.TopicAndPartition +import kafka.controller.ControllerBrokerStateInfo +import kafka.controller.ControllerContext +import kafka.controller.ControllerChannelManager +import kafka.controller.KafkaController +import kafka.controller.OnlinePartition +import kafka.integration.KafkaServerTestHarness +import kafka.server.BrokerState +import kafka.server.KafkaConfig +import kafka.server.KafkaServer +import kafka.server.RunningAsController +import kafka.utils._ +import kafka.utils.TestUtils._ + +import scala.collection.Map +import scala.collection.mutable + + +class ControllerFailoverTest extends KafkaServerTestHarness with Logging { + val log = Logger.getLogger(classOf[ControllerFailoverTest]) + //val zkConnect = TestZKUtils.zookeeperConnect + val numNodes = 2 + val numParts = 1 + val msgQueueSize = 1 + val topic = "topic1" + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) + + override def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect) + .map(KafkaConfig.fromProps(_, overridingProps)) + + override def setUp() { + super.setUp() + } + + override def tearDown() { + super.tearDown() + } + + /** + * See @link{https://issues.apache.org/jira/browse/KAFKA-2300} + * for the background of this test case + */ + def testMetadataUpdate() { + log.setLevel(Level.INFO) + var controller : KafkaServer = this.servers.head; + // Find the current controller + val epochMap : mutable.Map[Int, Int] = mutable.Map.empty + for(server <- this.servers) { + epochMap += (server.config.brokerId -> server.kafkaController.epoch) + if(server.kafkaController.isActive()) { + controller = server + } + } + // Create topic with one partition + kafka.admin.AdminUtils.createTopic(controller.zkClient, topic, 1, 1) + val topicPartition = TopicAndPartition("topic1", 0) + var partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) + while(!partitions.contains(topicPartition)) { + partitions = controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition) + Thread.sleep(100) + } + // Replace channel manager with our mock manager + controller.kafkaController.controllerContext.controllerChannelManager.shutdown() + val channelManager = new MockChannelManager(controller.kafkaController.controllerContext, + controller.kafkaController.config) + channelManager.startup() + controller.kafkaController.controllerContext.controllerChannelManager = channelManager + channelManager.shrinkBlockingQueue(0) + channelManager.stopSendThread(0) + // Spawn a new thread to block on the outgoing channel + // queue + val thread = new Thread(new Runnable { + def run() { + try { + controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) + log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0))) + controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) + log.info("Queue state %d %d".format(channelManager.queueCapacity(0), channelManager.queueSize(0))) + } catch { + case e : Exception => { + log.info("Thread interrupted") + } + } + } + }) + thread.setName("mythread") + thread.start() + while(thread.getState() != Thread.State.WAITING){ + Thread.sleep(100) + } + // Assume that the thread is WAITING because it is + // blocked on the queue, so interrupt and move forward + thread.interrupt() + thread.join() + channelManager.resumeSendThread(0) + // Wait and find current controller + var found = false + var counter = 0 + while(!found && counter < 10) { + for(server <- this.servers) { + val previousEpoch = (epochMap get server.config.brokerId) match { + case Some(epoch) => + epoch + case None => + val msg = String.format("Missing element in epoch map %s", epochMap.mkString(", ")) + throw new IllegalStateException(msg) + } + + if(server.kafkaController.isActive() + && (previousEpoch) < server.kafkaController.epoch) { + controller = server + found = true + } + } + if(!found){ + Thread.sleep(100) + counter += 1 + } + } + // Give it a shot to make sure that sending isn't blocking + try { + controller.kafkaController.sendUpdateMetadataRequest(Seq(0), Set(topicPartition)) + } catch { + case e : Throwable => { + fail(e) + } + } + } +} + +class MockChannelManager(private val controllerContext: ControllerContext, + config: KafkaConfig) + extends ControllerChannelManager(controllerContext, config) { + def stopSendThread(brokerId: Int) { + val requestThread = brokerStateInfo(brokerId).requestSendThread + requestThread.isRunning.set(false) + requestThread.interrupt() + requestThread.join() + } + + def shrinkBlockingQueue(brokerId: Int) { + val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](1) + val brokerInfo = this.brokerStateInfo(brokerId) + this.brokerStateInfo.put(brokerId, new ControllerBrokerStateInfo(brokerInfo.channel, + brokerInfo.broker, + messageQueue, + brokerInfo.requestSendThread)) + } + + def resumeSendThread (brokerId: Int) { + this.startRequestSendThread(0) + } + + def queueCapacity(broker : Int): Int = { + this.brokerStateInfo(broker).messageQueue.remainingCapacity() + } + + def queueSize(broker : Int): Int = { + this.brokerStateInfo(broker).messageQueue.size() + } +} \ No newline at end of file