diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala index eb492f0..5830931 100644 --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -272,47 +272,68 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) { - leaderAndIsrRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) - for (p <- partitionStateInfos) { - val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" - stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + - "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, - p._2.leaderIsrAndControllerEpoch, correlationId, broker, - p._1._1, p._1._2)) + try { + leaderAndIsrRequestMap.foreach { m => + val broker = m._1 + val partitionStateInfos = m._2.toMap + val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet + val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) + for (p <- partitionStateInfos) { + val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" + stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, + p._2.leaderIsrAndControllerEpoch, correlationId, broker, + p._1._1, p._1._2)) + } + controller.sendRequest(broker, leaderAndIsrRequest, null) } - controller.sendRequest(broker, leaderAndIsrRequest, null) - } - leaderAndIsrRequestMap.clear() - updateMetadataRequestMap.foreach { m => - val broker = m._1 - val partitionStateInfos = m._2.toMap - val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId, - partitionStateInfos, controllerContext.liveOrShuttingDownBrokers) - partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + - "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, - correlationId, broker, p._1))) - controller.sendRequest(broker, updateMetadataRequest, null) - } - updateMetadataRequestMap.clear() - stopReplicaRequestMap foreach { case(broker, replicaInfoList) => - val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet - val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet - debug("The stop replica request (delete = true) sent to broker %d is %s" - .format(broker, stopReplicaWithDelete.mkString(","))) - debug("The stop replica request (delete = false) sent to broker %d is %s" - .format(broker, stopReplicaWithoutDelete.mkString(","))) - replicaInfoList.foreach { r => - val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, - Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) - controller.sendRequest(broker, stopReplicaRequest, r.callback) + leaderAndIsrRequestMap.clear() + updateMetadataRequestMap.foreach { m => + val broker = m._1 + val partitionStateInfos = m._2.toMap + val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId, + partitionStateInfos, controllerContext.liveOrShuttingDownBrokers) + partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + + "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, + correlationId, broker, p._1))) + controller.sendRequest(broker, updateMetadataRequest, null) + } + updateMetadataRequestMap.clear() + stopReplicaRequestMap foreach { case(broker, replicaInfoList) => + val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet + val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet + debug("The stop replica request (delete = true) sent to broker %d is %s" + .format(broker, stopReplicaWithDelete.mkString(","))) + debug("The stop replica request (delete = false) sent to broker %d is %s" + .format(broker, stopReplicaWithoutDelete.mkString(","))) + replicaInfoList.foreach { r => + val stopReplicaRequest = new StopReplicaRequest(r.deletePartition, + Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId) + controller.sendRequest(broker, stopReplicaRequest, r.callback) + } + } + stopReplicaRequestMap.clear() + } catch { + case e : Exception => { + if(leaderAndIsrRequestMap.size > 0) { + error("Haven't been able to send leader and isr requests, current state of " + + "the map is %s".format(leaderAndIsrRequestMap.toString())) + leaderAndIsrRequestMap.clear() + } + if(updateMetadataRequestMap.size > 0) { + error("Haven't been able to send metadata update requests, current state of " + + "the map is %s".format(updateMetadataRequestMap.toString())) + updateMetadataRequestMap.clear() + } + if(stopReplicaRequestMap.size > 0) { + error("Haven't been able to send stop replica requests, current state of " + + "the map is %s".format(stopReplicaRequestMap.toString())) + stopReplicaRequestMap.clear() + } + throw new IllegalStateException(e) } } - stopReplicaRequestMap.clear() } } diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala index 66df6d2..9bf073d 100644 --- core/src/main/scala/kafka/controller/KafkaController.scala +++ core/src/main/scala/kafka/controller/KafkaController.scala @@ -971,9 +971,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * @param brokers The brokers that the update metadata request should be sent to */ def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) { - brokerRequestBatch.newBatch() - brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + try { + brokerRequestBatch.newBatch() + brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) + } catch { + case e : IllegalStateException => { + // Resign if the controller is in an illegal state + controllerElector.resign() + } + } } /**