From 588bc6258cf1e2ad380ec6a5c759cdfddf56fe2b Mon Sep 17 00:00:00 2001 From: asingh Date: Tue, 23 Jun 2015 21:22:56 -0700 Subject: [PATCH] KAFKA-1367: Broker topic metadata not kept in sync with ZooKeeper --- .../scala/kafka/common/TopicAndPartition.scala | 3 ++ .../scala/kafka/controller/KafkaController.scala | 60 ++++++++++++++++++++-- .../main/scala/kafka/utils/ReplicationUtils.scala | 16 ++++-- core/src/main/scala/kafka/utils/ZkUtils.scala | 1 + .../unit/kafka/utils/ReplicationUtilsTest.scala | 2 + 5 files changed, 75 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala index df3db91..4f3a774 100644 --- a/core/src/main/scala/kafka/common/TopicAndPartition.scala +++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala @@ -1,6 +1,7 @@ package kafka.common import kafka.cluster.{Replica, Partition} +import kafka.utils.Json /** * Licensed to the Apache Software Foundation (ASF) under one or more @@ -33,5 +34,7 @@ case class TopicAndPartition(topic: String, partition: Int) { def asTuple = (topic, partition) override def toString = "[%s,%d]".format(topic, partition) + + def toJson = Json.encode(Map("topic" -> topic, "partition" -> partition)) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3635057..30eec8d 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -16,8 +16,9 @@ */ package kafka.controller -import collection._ -import collection.Set +import java.util + +import scala.collection._ import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit import kafka.admin.AdminUtils @@ -31,13 +32,15 @@ import kafka.utils.ZkUtils._ import kafka.utils._ import kafka.utils.CoreUtils._ import org.apache.zookeeper.Watcher.Event.KeeperState -import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} +import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock import kafka.server._ import kafka.common.TopicAndPartition +import scala.collection.JavaConverters._ + class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { var controllerChannelManager: ControllerChannelManager = null @@ -169,6 +172,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt private val partitionReassignedListener = new PartitionsReassignedListener(this) private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this) + private val isrChangeNotificationListener = new IsrChangeNotificationListener(this) newGauge( "ActiveControllerCount", @@ -307,6 +311,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt incrementControllerEpoch(zkClient) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() + registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() @@ -792,7 +797,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt controllerContext.controllerChannelManager.startup() } - private def updateLeaderAndIsrCache() { + def updateLeaderAndIsrCache() { val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.partitionReplicaAssignment.keySet) for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch) @@ -892,6 +897,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } + private def registerIsrChangeNotificationListener() = { + debug("Registering IsrChangeNotificationListener") + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath) + zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) + } + private def deregisterReassignedPartitionsListener() = { zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } @@ -1281,6 +1292,47 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: } /** + * Called when leader intimates of isr change + * @param controller + */ +class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging { + var topicAndPartitionSet: Set[TopicAndPartition] = Set() + + override def handleChildChange(parentPath: String, currentChilds: util.List[String]): Unit = { + debug("[IsrChangeNotificationListener] Fired!!!") + controller.updateLeaderAndIsrCache() + processUpdateNotifications(currentChilds.asScala) + } + + def processUpdateNotifications(children: mutable.Buffer[String]) { + val topicAndPartitions = children.map(x => getTopicAndPartition(x)).flatten.toSet + val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq + controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions) + debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions) + children.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, ZkUtils.TopicConfigChangesPath + "/" + x)) + } + + def getTopicAndPartition(child: String): Option[TopicAndPartition] = { + val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode) + if (jsonOpt.isDefined) { + val json = Json.parseFull(jsonOpt.get) + + json match { + case Some(m) => + val topicAndPartition = m.asInstanceOf[Map[String, Any]] + val topic = topicAndPartition.get("topic").get.asInstanceOf[String] + val partition = topicAndPartition.get("partition").get.asInstanceOf[Int] + Some(TopicAndPartition(topic, partition)) + case None => throw new IllegalArgumentException("Invalid topic and partition JSON: " + json) + } + } else { + None + } + } +} + +/** * Starts the preferred replica leader election for the list of partitions specified under * /admin/preferred_replica_election - */ diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index 6068733..96430c5 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -18,22 +18,32 @@ package kafka.utils import kafka.api.LeaderAndIsr +import kafka.common.TopicAndPartition import kafka.controller.LeaderIsrAndControllerEpoch -import org.apache.zookeeper.data.Stat import org.I0Itec.zkclient.ZkClient +import org.apache.zookeeper.data.Stat -import scala.Some import scala.collection._ object ReplicationUtils extends Logging { + val IsrChangeNotificationPrefix = "isr_change_" + def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int, zkVersion: Int): (Boolean,Int) = { debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(","))) val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId) val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData)) + val updatePersistentPath: (Boolean, Int) = ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData)) + if (updatePersistentPath._1) { + val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partitionId) + val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath( + zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix, + topicAndPartition.toJson) + info("Added " + isrChangeNotificationPath + " for " + topicAndPartition) + } + updatePersistentPath } def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 78475e3..166814c 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -47,6 +47,7 @@ object ZkUtils extends Logging { val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" val BrokerSequenceIdPath = "/brokers/seqid" + val IsrChangeNotificationPath = "/isr_change_notification" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala index c96c0ff..b9de8d6 100644 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -70,6 +70,8 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness { EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) EasyMock.replay(replicaManager) + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath) + val replicas = List(0,1) // regular update -- 2.3.2 (Apple Git-55)