diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 3e40817..0311737 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -32,8 +32,6 @@ import collection.Set object LeaderAndIsr { val initialLeaderEpoch: Int = 0 val initialZKVersion: Int = 0 - val NoLeader = -1 - val LeaderDuringDelete = -2 } case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index f79c1dc..f17d976 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -211,8 +211,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging leaderAndIsrRequestMap(brokerId).put((topic, partition), PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) } - addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, - Set(TopicAndPartition(topic, partition))) + addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq) } def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean, @@ -233,40 +232,34 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging * */ def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int], - partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition], callback: (RequestOrResponse) => Unit = null) { - def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) { - val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) - leaderIsrAndControllerEpochOpt match { - case Some(leaderIsrAndControllerEpoch) => - val replicas = controllerContext.partitionReplicaAssignment(partition).toSet - val partitionStateInfo = if (beingDeleted) { - val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr) - PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas) - } else { - PartitionStateInfo(leaderIsrAndControllerEpoch, replicas) - } + val partitionList = controllerContext.partitionLeadershipInfo.keySet.dropWhile( + p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) + if(partitionList.size > 0) { + partitionList.foreach { partition => + val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) + leaderIsrAndControllerEpochOpt match { + case Some(leaderIsrAndControllerEpoch) => + val replicas = controllerContext.partitionReplicaAssignment(partition).toSet + val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas) + brokerIds.filter(b => b >= 0).foreach { brokerId => + updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo]) + updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo) + } + case None => + info("Leader not assigned yet for partition %s. Skip sending udpate metadata request".format(partition)) + } + } + } else { + if(controllerContext.partitionLeadershipInfo.keySet.size > 0) { + // last set of topics are being deleted + controllerContext.partitionLeadershipInfo.foreach { case(partition, leaderIsrAndControllerEpoch) => brokerIds.filter(b => b >= 0).foreach { brokerId => - updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo]) - updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo) + updateMetadataRequestMap.put(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo]) } - case None => - info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition)) + } } } - - val filteredPartitions = { - val givenPartitions = if (partitions.isEmpty) - controllerContext.partitionLeadershipInfo.keySet - else - partitions - if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty) - givenPartitions - else - givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted - } - filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false)) - controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true)) } def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) { diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c8c02ce..fcabd0d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -552,7 +552,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) controllerContext.partitionsBeingReassigned.remove(topicAndPartition) //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic)) } @@ -933,9 +933,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * metadata requests * @param brokers The brokers that the update metadata request should be sent to */ - def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) { + def sendUpdateMetadataRequest(brokers: Seq[Int]) { brokerRequestBatch.newBatch() - brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) + brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers) brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) } @@ -967,7 +967,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) if (leaderAndIsr.isr.contains(replicaId)) { // if the replica to be removed from the ISR is also the leader, set the new leader value to -1 - val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader + val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader var newIsr = leaderAndIsr.isr.filter(b => b != replicaId) // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index d29e556..09f54ac 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -72,13 +72,12 @@ class TopicDeletionManager(controller: KafkaController, val controllerContext = controller.controllerContext val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine - val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted - val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic) + var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted val deleteLock = new ReentrantLock() - val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ + var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) val deleteTopicsCond = deleteLock.newCondition() - val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) + var deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable @@ -100,7 +99,6 @@ class TopicDeletionManager(controller: KafkaController, if(isDeleteTopicEnabled) { deleteTopicsThread.shutdown() topicsToBeDeleted.clear() - partitionsToBeDeleted.clear() topicsIneligibleForDeletion.clear() } } @@ -114,7 +112,6 @@ class TopicDeletionManager(controller: KafkaController, def enqueueTopicsForDeletion(topics: Set[String]) { if(isDeleteTopicEnabled) { topicsToBeDeleted ++= topics - partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic) resumeTopicDeletionThread() } } @@ -267,7 +264,6 @@ class TopicDeletionManager(controller: KafkaController, partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition) partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition) topicsToBeDeleted -= topic - partitionsToBeDeleted.retain(_.topic != topic) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic)) controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic)) @@ -281,8 +277,7 @@ class TopicDeletionManager(controller: KafkaController, private def onTopicDeletion(topics: Set[String]) { info("Topic deletion callback for %s".format(topics.mkString(","))) // send update metadata so that brokers stop serving data for topics to be deleted - val partitions = topics.flatMap(controllerContext.partitionsForTopic) - controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions) + controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic) topics.foreach { topic => onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet) @@ -327,8 +322,8 @@ class TopicDeletionManager(controller: KafkaController, /** * This callback is invoked by the delete topic callback with the list of partitions for topics to be deleted * It does the following - - * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being - * deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException + * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) with all partitions except those for + * which the topics are being deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas * and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state, * it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1 diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 705d87e..c068ef6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -137,20 +137,26 @@ class KafkaApis(val requestChannel: RequestChannel, // cache the list of alive brokers in the cluster updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) updateMetadataRequest.partitionStateInfos.foreach { partitionState => - if (partitionState._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { - val partition = partitionState._1 - metadataCache.remove(partition) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, partition, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } else { - metadataCache.put(partitionState._1, partitionState._2) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, partitionState._2, partitionState._1, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } + metadataCache.put(partitionState._1, partitionState._2) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } + // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are + // currently being deleted by the controller + val topicsKnownToThisBroker = metadataCache.map { + case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet + val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map { + case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet + val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController + val partitionsToBeDeleted = metadataCache.filter { + case(topicAndPartition, partitionStateInfo) => deletedTopics.contains(topicAndPartition.topic) + }.keySet + partitionsToBeDeleted.foreach { partition => + metadataCache.remove(partition) + stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) } } val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) diff --git a/system_test/mirror_maker_testsuite/config/mirror_producer.properties b/system_test/mirror_maker_testsuite/config/mirror_producer.properties index 7f48b07..f94bebd 100644 --- a/system_test/mirror_maker_testsuite/config/mirror_producer.properties +++ b/system_test/mirror_maker_testsuite/config/mirror_producer.properties @@ -1,3 +1,10 @@ +# old producer +metadata.broker.list=localhost:9094 +compression.codec=0 +request.retries=3 +request.required.acks=1 + +# new producer block.on.buffer.full=true bootstrap.servers=localhost:9094 compression.type=none