From ee5ec622146628619ec3d9d361b2bd785331b97d Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 29 Jun 2015 17:45:36 -0700 Subject: [PATCH] KAFKA-972: MetadataRequest returns stale list of brokers --- .../scala/kafka/controller/KafkaController.scala | 21 +++++-- .../unit/kafka/integration/TopicMetadataTest.scala | 66 ++++++++++++++++++++-- 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 20f1499..b4fc755 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -387,8 +387,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt /** * This callback is invoked by the replica state machine's broker change listener, with the list of newly started * brokers as input. It does the following - - * 1. Triggers the OnlinePartition state change for all new/offline partitions - * 2. It checks whether there are reassigned replicas assigned to any newly started brokers. If + * 1. Sends update metadata request to all live and shutting down brokers + * 2. Triggers the OnlinePartition state change for all new/offline partitions + * 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If * so, it performs the reassignment logic for each topic/partition. * * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons: @@ -400,10 +401,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt def onBrokerStartup(newBrokers: Seq[Int]) { info("New broker startup callback for %s".format(newBrokers.mkString(","))) val newBrokersSet = newBrokers.toSet - // send update metadata request for all partitions to the newly restarted brokers. In cases of controlled shutdown - // leaders will not be elected when a new broker comes up. So at least in the common controlled shutdown case, the - // metadata will reach the new brokers faster - sendUpdateMetadataRequest(newBrokers) + // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new + // broker via this update. + // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the + // common controlled shutdown case, the metadata will reach the new brokers faster + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) @@ -433,6 +435,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * 1. Mark partitions with dead leaders as offline * 2. Triggers the OnlinePartition state change for all new/offline partitions * 3. Invokes the OfflineReplica state change on the input list of newly started brokers + * 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting down brokers * * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because * the partition state machine will refresh our cache for us when performing leader election for all new/offline @@ -464,6 +467,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted) } + + // If broker failure did not require leader re-election, inform brokers of failed broker + // Note that during leader re-election, brokers update their metadata + if (partitionsWithoutLeader.isEmpty) { + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + } } /** diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index a95ee5e..5b6c9d6 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -25,7 +25,7 @@ import kafka.api.{TopicMetadataResponse, TopicMetadataRequest} import kafka.client.ClientUtils import kafka.cluster.{Broker, BrokerEndPoint} import kafka.common.ErrorMapping -import kafka.server.{NotRunning, KafkaConfig, KafkaServer} +import kafka.server.{KafkaConfig, KafkaServer, NotRunning} import kafka.utils.TestUtils import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness @@ -36,7 +36,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { private var server1: KafkaServer = null var brokerEndPoints: Seq[BrokerEndPoint] = null var adHocConfigs: Seq[KafkaConfig] = null - val numConfigs: Int = 2 + val numConfigs: Int = 4 override def setUp() { super.setUp() @@ -171,13 +171,15 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testIsrAfterBrokerShutDownAndJoinsBack { + val numBrokers = 2 //just 2 brokers are enough for the test + // start adHoc brokers - val adHocServers = adHocConfigs.map(p => createServer(p)) + val adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p)) val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers // create topic val topic: String = "test" - AdminUtils.createTopic(zkClient, topic, 1, numConfigs) + AdminUtils.createTopic(zkClient, topic, 1, numBrokers) // shutdown a broker adHocServers.last.shutdown() @@ -192,4 +194,60 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // shutdown adHoc brokers adHocServers.map(p => p.shutdown()) } + + private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = { + var topicMetadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1) + + // Get topic metadata from old broker + // Wait for metadata to get updated by checking metadata from a new broker + waitUntilTrue(() => { + topicMetadata = ClientUtils.fetchTopicMetadata( + Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000, 0) + topicMetadata.brokers.size == expectedBrokersCount}, + "Alive brokers list is not correctly propagated by coordinator to brokers" + ) + + // Assert that topic metadata at new brokers is updated correctly + servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x => + waitUntilTrue(() => + topicMetadata == ClientUtils.fetchTopicMetadata( + Set.empty, + Seq(new Broker(x.config.brokerId, + x.config.hostName, + x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), + "TopicMetadataTest-testBasicTopicMetadata", + 2000, 0), "Topic metadata is not correctly updated")) + } + + + def testAliveBrokerListWithNoTopics { + checkMetadata(Seq(server1), 1) + } + + def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup { + var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p)) + + checkMetadata(adHocServers, numConfigs - 1) + + // Add a broker + adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head)) + + checkMetadata(adHocServers, numConfigs) + adHocServers.map(p => p.shutdown()) + } + + + def testAliveBrokersListWithNoTopicsAfterABrokerShutdown { + val adHocServers = adHocConfigs.map(p => createServer(p)) + + checkMetadata(adHocServers, numConfigs) + + // Shutdown a broker + adHocServers.last.shutdown() + adHocServers.last.awaitShutdown() + + checkMetadata(adHocServers, numConfigs - 1) + + adHocServers.map(p => p.shutdown()) + } } -- 2.3.2 (Apple Git-55)