From 84fdfcce8253c393e1ba10b8651f041fdbb769e6 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 6 Oct 2014 10:05:23 -0700 Subject: [PATCH 1/3] Fix for Kafka-1647. --- .../main/scala/kafka/server/ReplicaManager.scala | 26 ++++++++++++++-------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 78b7514..30f34f5 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -482,10 +482,9 @@ class ReplicaManager(val config: KafkaConfig, val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader leaders.find(_.id == newLeaderBrokerId) match { + //Only change partition state when the leader is available case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) - partitionsToMakeFollower += partition - else + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager) == false) stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, @@ -493,11 +492,15 @@ class ReplicaManager(val config: KafkaConfig, case None => // The leader broker should always be present in the leaderAndIsrRequest. // If not, we should record the error message and abort the transition process for this partition - stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + stateChangeLogger.error(("Broker %d got error during become-follower state change with correlation id %d" + + "from controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, partition.topic, partition.partitionId, newLeaderBrokerId)) } + // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the + // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a + // data loss issue. See KAFKA-1647. + partitionsToMakeFollower += partition } replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) @@ -523,11 +526,16 @@ class ReplicaManager(val config: KafkaConfig, } } else { - // we do not need to check if the leader exists again since this has been done at the beginning of this process + // we need to check if the leader exists val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => - new TopicAndPartition(partition) -> BrokerAndInitialOffset( - leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, - partition.getReplica().get.logEndOffset.messageOffset)).toMap + leaders.find(_.id == partition.leaderReplicaIdOpt.get) match { + case Some(leader) => + Some(new TopicAndPartition(partition) -> BrokerAndInitialOffset(leader, + partition.getReplica().get.logEndOffset.messageOffset)) + case None => + None + }).flatten.toMap + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) partitionsToMakeFollower.foreach { partition => -- 1.8.3.4 (Apple Git-47) From f06ad4855cda4341fd9f0759cb335ec7ef9146f0 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 13 Oct 2014 16:31:13 -0700 Subject: [PATCH 2/3] Addressed Joel's comments. --- core/src/main/scala/kafka/server/ReplicaManager.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 30f34f5..942f7c9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -46,9 +46,9 @@ object ReplicaManager { case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset: LogOffsetMetadata) -class ReplicaManager(val config: KafkaConfig, - time: Time, - val zkClient: ZkClient, +class ReplicaManager(val config: KafkaConfig, + time: Time, + val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { @@ -492,15 +492,15 @@ class ReplicaManager(val config: KafkaConfig, case None => // The leader broker should always be present in the leaderAndIsrRequest. // If not, we should record the error message and abort the transition process for this partition - stateChangeLogger.error(("Broker %d got error during become-follower state change with correlation id %d" + - "from controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" + + " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.)") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, partition.topic, partition.partitionId, newLeaderBrokerId)) + // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the + // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a + // data loss issue. See KAFKA-1647. + partitionsToMakeFollower += partition } - // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the - // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a - // data loss issue. See KAFKA-1647. - partitionsToMakeFollower += partition } replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) -- 1.8.3.4 (Apple Git-47) From 0529840c9f0bdf3e6e97103a74fcb42fe8c89fa4 Mon Sep 17 00:00:00 2001 From: jqin Date: Sat, 18 Oct 2014 00:20:47 -0700 Subject: [PATCH 3/3] the version 2 code seems to be submitted by mistake... This should be the code for review that addressed Joel's comments. --- core/src/main/scala/kafka/server/ReplicaManager.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 942f7c9..33a79c9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -484,7 +484,9 @@ class ReplicaManager(val config: KafkaConfig, leaders.find(_.id == newLeaderBrokerId) match { //Only change partition state when the leader is available case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager) == false) + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) + partitionsToMakeFollower += partition + else stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, -- 1.8.3.4 (Apple Git-47)