diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala index eb492f0..a9a9728 100644 --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -182,6 +182,9 @@ class RequestSendThread(val controllerId: Int, } class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging { + // KAFKA-2300: Flags to throw exception once upon delete + var deleteEnabled: Boolean = false + var once: Boolean = true val controllerContext = controller.controllerContext val controllerId: Int = controller.config.brokerId val clientId: String = controller.clientId @@ -243,6 +246,11 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging case Some(leaderIsrAndControllerEpoch) => val replicas = controllerContext.partitionReplicaAssignment(partition).toSet val partitionStateInfo = if (beingDeleted) { + // KAFKA-2300: Set flag to throw exception when sending + if(once) { + once = false + deleteEnabled = true + } val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr) PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas) } else { @@ -288,6 +296,11 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging controller.sendRequest(broker, leaderAndIsrRequest, null) } leaderAndIsrRequestMap.clear() + // KAFKA-2300: Throws exception + if(deleteEnabled) { + deleteEnabled = false; + throw new InterruptedException() + } updateMetadataRequestMap.foreach { m => val broker = m._1 val partitionStateInfos = m._2.toMap