diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 60f3ed4..f1ce7dc 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -126,6 +126,9 @@ class Partition(val topic: String, assignedReplicaMap.values.toSet } + def removeReplica(replicaId: Int) { + assignedReplicaMap.remove(replicaId) + } /** * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps. @@ -134,7 +137,7 @@ class Partition(val topic: String, * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * 4. set the new leader and ISR */ - def makeLeader(controllerId: Int, topic: String, partitionId: Int, + def makeLeader(controllerId: Int, topic: String, partitionId: Int, allReplicas: Set[Int], leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -152,6 +155,9 @@ class Partition(val topic: String, replicaFetcherManager.removeFetcher(topic, partitionId) val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet + // remove assigned replicas that have been removed by the controller + val assignedReplicasToBeDeleted = assignedReplicas().map(_.brokerId) -- allReplicas + assignedReplicasToBeDeleted.foreach(removeReplica(_)) // reset LogEndOffset for remote replicas assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) inSyncReplicas = newInSyncReplicas @@ -171,7 +177,8 @@ class Partition(val topic: String, * 3. set the leader and set ISR to empty * 4. start a fetcher to the new leader */ - def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, + def makeFollower(controllerId: Int, topic: String, partitionId: Int, allReplicas: Set[Int], + leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, leaders: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -197,6 +204,11 @@ class Partition(val topic: String, // stop fetcher thread to previous leader replicaFetcherManager.removeFetcher(topic, partitionId) localReplica.log.get.truncateTo(localReplica.highWatermark) + // add replicas that are new + allReplicas.foreach(r => getOrCreateReplica(r)) + // remove assigned replicas that have been removed by the controller + val assignedReplicasToBeDeleted = assignedReplicas().map(_.brokerId) -- allReplicas + assignedReplicasToBeDeleted.foreach(removeReplica(_)) inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion @@ -231,7 +243,11 @@ class Partition(val topic: String, case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark - if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) { + // For a replica to get added back to ISR, it has to satisfy 3 conditions- + // 1. It is not already in the ISR + // 2. It is part of the assigned replica list. See KAFKA-1097 + // 3. It's log end offset >= leader's highwatermark + if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" @@ -360,7 +376,7 @@ class Partition(val topic: String, val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) - if (updateSucceeded){ + if (updateSucceeded) { inSyncReplicas = newIsr zkVersion = newVersion trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 88d130f..c9c75d3 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -626,7 +626,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val reassignedReplicas = reassignedPartitionContext.newReplicas val topic = topicAndPartition.topic val partition = topicAndPartition.partition - // send stop replica state change request to the old replicas val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet // first move the replica to offline state (the controller removes it from the ISR) oldReplicas.foreach { replica => diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 212c05d..7509461 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -134,7 +134,19 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true) // remove this replica from the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) - controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) + val newAssignedReplicas = currentAssignedReplicas.filterNot(_ == replicaId) + controllerContext.partitionReplicaAssignment.put(topicAndPartition, newAssignedReplicas) + // Remove replica from ISR. This is to fix the ISR in zookeeper if replicaId sent a fetch request to the leader after the + // controller had removed it from ISR in zookeeper as part of the offline state change, and before it receives the + // stop replica request from the controller. This can only happen when new data is not coming in for some topic + // since a leader cannot shrink ISR for a replica which is completely caught up but not fetching any more data (kafka-1097) + controller.removeReplicaFromIsr(topic, partition, replicaId) match { + case Some(updatedLeaderIsrAndControllerEpoch) => + // send the shrunk ISR state change request only to the leader + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), + topic, partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) + case None => + } replicaState.remove((topic, partition, replicaId)) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica" .format(controllerId, controller.epoch, replicaId, topicAndPartition)) @@ -165,6 +177,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicaState.put((topic, partition, replicaId), OnlineReplica) case OfflineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) + // send stop replica command to the replica so that it stops fetching from the leader + brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) // As an optimization, the controller removes dead replicas from the ISR val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 03ba60e..1343217 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -254,7 +254,7 @@ class ReplicaManager(val config: KafkaConfig, "starting the become-leader transition for partition [%s,%d]") .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, correlationId)) { + if (partition.makeLeader(controllerId, topic, partitionId, partitionStateInfo.allReplicas, leaderIsrAndControllerEpoch, correlationId)) { // also add this partition to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { leaderPartitions += partition @@ -271,7 +271,8 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, leaders, correlationId)) { + if (partition.makeFollower(controllerId, topic, partitionId, partitionStateInfo.allReplicas, + leaderIsrAndControllerEpoch, leaders, correlationId)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition diff --git a/kafka-patch-review.py b/kafka-patch-review.py index 82ea9a8..50f0cf7 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -97,7 +97,7 @@ def main(): if not opt.reviewboard: print 'Created a new reviewboard ',rb_url,' against branch: ',opt.branch else: - print 'Updated reviewboard',opt.reviewboard + print 'Updated reviewboard',opt.reviewboard,' against branch ',opt.branch comment="Updated reviewboard " comment = comment + rb_url