From 4b22c1cce1db63c3817303a19aed5307ff4b3b71 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 6 Oct 2014 10:05:23 -0700 Subject: [PATCH] Fix for Kafka-1647. --- .../main/scala/kafka/server/ReplicaManager.scala | 26 ++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 78b7514..30f34f5 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -482,10 +482,9 @@ class ReplicaManager(val config: KafkaConfig, val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader leaders.find(_.id == newLeaderBrokerId) match { + //Only change partition state when the leader is available case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) - partitionsToMakeFollower += partition - else + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager) == false) stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, @@ -493,11 +492,15 @@ class ReplicaManager(val config: KafkaConfig, case None => // The leader broker should always be present in the leaderAndIsrRequest. // If not, we should record the error message and abort the transition process for this partition - stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + stateChangeLogger.error(("Broker %d got error during become-follower state change with correlation id %d" + + "from controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, partition.topic, partition.partitionId, newLeaderBrokerId)) } + // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the + // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a + // data loss issue. See KAFKA-1647. + partitionsToMakeFollower += partition } replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) @@ -523,11 +526,16 @@ class ReplicaManager(val config: KafkaConfig, } } else { - // we do not need to check if the leader exists again since this has been done at the beginning of this process + // we need to check if the leader exists val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => - new TopicAndPartition(partition) -> BrokerAndInitialOffset( - leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, - partition.getReplica().get.logEndOffset.messageOffset)).toMap + leaders.find(_.id == partition.leaderReplicaIdOpt.get) match { + case Some(leader) => + Some(new TopicAndPartition(partition) -> BrokerAndInitialOffset(leader, + partition.getReplica().get.logEndOffset.messageOffset)) + case None => + None + }).flatten.toMap + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) partitionsToMakeFollower.foreach { partition => -- 1.8.3.4 (Apple Git-47)