From 6ae3098a8fe88a30ba0d0ce4df80b442f4bb69dc Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 6 Oct 2014 10:05:23 -0700 Subject: [PATCH 1/2] Fix for Kafka-1647. --- .../main/scala/kafka/server/ReplicaManager.scala | 26 ++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 02fa382..d392bb1 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -621,10 +621,9 @@ class ReplicaManager(val config: KafkaConfig, val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader leaders.find(_.id == newLeaderBrokerId) match { + //Only change partition state when the leader is available case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) - partitionsToMakeFollower += partition - else + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager) == false) stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, @@ -632,11 +631,15 @@ class ReplicaManager(val config: KafkaConfig, case None => // The leader broker should always be present in the leaderAndIsrRequest. // If not, we should record the error message and abort the transition process for this partition - stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + stateChangeLogger.error(("Broker %d got error during become-follower state change with correlation id %d" + + "from controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, partition.topic, partition.partitionId, newLeaderBrokerId)) } + // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the + // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a + // data loss issue. See KAFKA-1647. + partitionsToMakeFollower += partition } replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) @@ -662,11 +665,16 @@ class ReplicaManager(val config: KafkaConfig, } } else { - // we do not need to check if the leader exists again since this has been done at the beginning of this process + // we need to check if the leader exists val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => - new TopicAndPartition(partition) -> BrokerAndInitialOffset( - leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, - partition.getReplica().get.logEndOffset.messageOffset)).toMap + leaders.find(_.id == partition.leaderReplicaIdOpt.get) match { + case Some(leader) => + Some(new TopicAndPartition(partition) -> BrokerAndInitialOffset(leader, + partition.getReplica().get.logEndOffset.messageOffset)) + case None => + None + }).flatten.toMap + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) partitionsToMakeFollower.foreach { partition => -- 1.8.3.4 (Apple Git-47) From fd7f548c853438644568c550e43d73a3f3bba423 Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 30 Oct 2014 15:02:08 -0700 Subject: [PATCH 2/2] rebased for KAFKA-1647 --- core/src/main/scala/kafka/server/ReplicaManager.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d392bb1..6b1047c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -65,7 +65,7 @@ object ReplicaManager { class ReplicaManager(val config: KafkaConfig, time: Time, - val zkClient: ZkClient, + val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { @@ -631,15 +631,15 @@ class ReplicaManager(val config: KafkaConfig, case None => // The leader broker should always be present in the leaderAndIsrRequest. // If not, we should record the error message and abort the transition process for this partition - stateChangeLogger.error(("Broker %d got error during become-follower state change with correlation id %d" + - "from controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" + + " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.)") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, partition.topic, partition.partitionId, newLeaderBrokerId)) + // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the + // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a + // data loss issue. See KAFKA-1647. + partitionsToMakeFollower += partition } - // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the - // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a - // data loss issue. See KAFKA-1647. - partitionsToMakeFollower += partition } replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) -- 1.8.3.4 (Apple Git-47)