From 3556f1f8a9a73dfdec61c554881e10f3c13252a1 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 29 Jun 2015 17:45:36 -0700 Subject: [PATCH] KAFKA-972: MetadataRequest returns stale list of brokers --- .../scala/kafka/controller/KafkaController.scala | 21 +++++--- .../unit/kafka/integration/TopicMetadataTest.scala | 60 +++++++++++++++++++++- 2 files changed, 73 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3635057..a06ea13 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -383,8 +383,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt /** * This callback is invoked by the replica state machine's broker change listener, with the list of newly started * brokers as input. It does the following - - * 1. Triggers the OnlinePartition state change for all new/offline partitions - * 2. It checks whether there are reassigned replicas assigned to any newly started brokers. If + * 1. Sends update metadata request to all live and shutting down brokers + * 2. Triggers the OnlinePartition state change for all new/offline partitions + * 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If * so, it performs the reassignment logic for each topic/partition. * * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons: @@ -396,10 +397,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt def onBrokerStartup(newBrokers: Seq[Int]) { info("New broker startup callback for %s".format(newBrokers.mkString(","))) val newBrokersSet = newBrokers.toSet - // send update metadata request for all partitions to the newly restarted brokers. In cases of controlled shutdown - // leaders will not be elected when a new broker comes up. So at least in the common controlled shutdown case, the - // metadata will reach the new brokers faster - sendUpdateMetadataRequest(newBrokers) + // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new + // broker via this update. + // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the + // common controlled shutdown case, the metadata will reach the new brokers faster + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) @@ -429,6 +431,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * 1. Mark partitions with dead leaders as offline * 2. Triggers the OnlinePartition state change for all new/offline partitions * 3. Invokes the OfflineReplica state change on the input list of newly started brokers + * 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting down brokers * * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because * the partition state machine will refresh our cache for us when performing leader election for all new/offline @@ -460,6 +463,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted) } + + // If broker failure did not require leader re-election, inform brokers of failed broker + // Note that during leader re-election, brokers update their metadata + if (partitionsWithoutLeader.isEmpty) { + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + } } /** diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 995b059..6a038c1 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -34,11 +34,14 @@ import kafka.client.ClientUtils class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { private var server1: KafkaServer = null var brokerEndPoints: Seq[BrokerEndPoint] = null + var adHocConfigs: Seq[KafkaConfig] = null + val numConfigs: Int = 4 override def setUp() { super.setUp() - val props = createBrokerConfigs(1, zkConnect) - val configs = props.map(KafkaConfig.fromProps) + val props = createBrokerConfigs(numConfigs, zkConnect) + val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps) + adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases server1 = TestUtils.createServer(configs.head) brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) } @@ -62,6 +65,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val deserializedMetadataRequest = TopicMetadataRequest.readFrom(serializedMetadataRequest) assertEquals(topicMetadataRequest, deserializedMetadataRequest) + + AdminUtils.deleteTopic(zkClient, topic) } def testBasicTopicMetadata { @@ -79,6 +84,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size) assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId) assertEquals(1, partitionMetadata.head.replicas.size) + + AdminUtils.deleteTopic(zkClient, topic) } def testGetAllTopicMetadata { @@ -103,6 +110,9 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size) assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId) assertEquals(1, partitionMetadataTopic2.head.replicas.size) + + AdminUtils.deleteTopic(zkClient, topic1) + AdminUtils.deleteTopic(zkClient, topic2) } def testAutoCreateTopic { @@ -129,5 +139,51 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId) assertEquals(1, partitionMetadata.head.replicas.size) assertTrue(partitionMetadata.head.leader.isDefined) + + AdminUtils.deleteTopic(zkClient, topic) + } + + + def testAliveBrokerListWithNoTopics { + // issue metadata request with empty list of topics + waitUntilTrue(() => ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", + 2000, 0).brokers.size == 1, "Alive brokers list is not correctly propagated by coordinator to brokers"); + } + + + def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup { + var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p)) + // issue metadata request with empty list of topics + waitUntilTrue(() => ClientUtils.fetchTopicMetadata( + Set.empty, + Seq(new Broker(adHocServers.last.config.brokerId, adHocServers.last.config.hostName, adHocServers.last.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), + "TopicMetadataTest-testBasicTopicMetadata", + 2000, 0).brokers.size == numConfigs - 1, "Alive brokers list is not correctly propagated by coordinator to brokers") + + adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head)) + + waitUntilTrue(() => ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", + 2000, 0).brokers.size == numConfigs, "Alive brokers list is not correctly propagated by coordinator to brokers after one broker shutdown") + + adHocServers.map(p => p.shutdown()) + } + + + def testAliveBrokersListWithNoTopicsAfterABrokerShutdown { + val adHocServers = adHocConfigs.map(p => createServer(p)) + // issue metadata request with empty list of topics + waitUntilTrue(() => ClientUtils.fetchTopicMetadata( + Set.empty, + Seq(new Broker(adHocServers.last.config.brokerId, adHocServers.last.config.hostName, adHocServers.last.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), + "TopicMetadataTest-testBasicTopicMetadata", + 2000, 0).brokers.size == numConfigs, "Alive brokers list is not correctly propagated by coordinator to brokers") + + adHocServers.last.shutdown() + adHocServers.last.awaitShutdown() + + waitUntilTrue(() => ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", + 2000, 0).brokers.size == numConfigs - 1, "Alive brokers list is not correctly propagated by coordinator to brokers after one broker shutdown") + + adHocServers.map(p => p.shutdown()) } } -- 2.3.2 (Apple Git-55)