From b3807c0203afd8bc4b09847aba3ffc76444aefa5 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 29 Jun 2015 17:45:36 -0700 Subject: [PATCH] KAFKA-972: MetadataRequest returns stale list of brokers --- .../scala/kafka/controller/KafkaController.scala | 21 ++++-- .../unit/kafka/integration/TopicMetadataTest.scala | 87 +++++++++++++++++++--- 2 files changed, 91 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3635057..a06ea13 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -383,8 +383,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt /** * This callback is invoked by the replica state machine's broker change listener, with the list of newly started * brokers as input. It does the following - - * 1. Triggers the OnlinePartition state change for all new/offline partitions - * 2. It checks whether there are reassigned replicas assigned to any newly started brokers. If + * 1. Sends update metadata request to all live and shutting down brokers + * 2. Triggers the OnlinePartition state change for all new/offline partitions + * 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If * so, it performs the reassignment logic for each topic/partition. * * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons: @@ -396,10 +397,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt def onBrokerStartup(newBrokers: Seq[Int]) { info("New broker startup callback for %s".format(newBrokers.mkString(","))) val newBrokersSet = newBrokers.toSet - // send update metadata request for all partitions to the newly restarted brokers. In cases of controlled shutdown - // leaders will not be elected when a new broker comes up. So at least in the common controlled shutdown case, the - // metadata will reach the new brokers faster - sendUpdateMetadataRequest(newBrokers) + // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new + // broker via this update. + // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the + // common controlled shutdown case, the metadata will reach the new brokers faster + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) @@ -429,6 +431,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt * 1. Mark partitions with dead leaders as offline * 2. Triggers the OnlinePartition state change for all new/offline partitions * 3. Invokes the OfflineReplica state change on the input list of newly started brokers + * 4. If no partitions are effected then send UpdateMetadataRequest to live or shutting down brokers * * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because * the partition state machine will refresh our cache for us when performing leader election for all new/offline @@ -460,6 +463,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted) } + + // If broker failure did not require leader re-election, inform brokers of failed broker + // Note that during leader re-election, brokers update their metadata + if (partitionsWithoutLeader.isEmpty) { + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + } } /** diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 995b059..f41755a 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -17,28 +17,32 @@ package kafka.integration -import org.apache.kafka.common.protocol.SecurityProtocol -import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness -import kafka.admin.AdminUtils import java.nio.ByteBuffer + import junit.framework.Assert._ -import kafka.cluster.{BrokerEndPoint, Broker} -import kafka.utils.TestUtils -import kafka.utils.TestUtils._ -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.admin.AdminUtils import kafka.api.TopicMetadataRequest -import kafka.common.ErrorMapping import kafka.client.ClientUtils +import kafka.cluster.{Broker, BrokerEndPoint} +import kafka.common.ErrorMapping +import kafka.server.{KafkaConfig, KafkaServer, NotRunning} +import kafka.utils.TestUtils +import kafka.utils.TestUtils._ +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.protocol.SecurityProtocol +import org.scalatest.junit.JUnit3Suite class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { private var server1: KafkaServer = null var brokerEndPoints: Seq[BrokerEndPoint] = null + var adHocConfigs: Seq[KafkaConfig] = null + val numConfigs: Int = 4 override def setUp() { super.setUp() - val props = createBrokerConfigs(1, zkConnect) - val configs = props.map(KafkaConfig.fromProps) + val props = createBrokerConfigs(numConfigs, zkConnect) + val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps) + adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases server1 = TestUtils.createServer(configs.head) brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) } @@ -130,4 +134,65 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(1, partitionMetadata.head.replicas.size) assertTrue(partitionMetadata.head.leader.isDefined) } + + + def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = { + // Wait for metadata to get updated by checking metadata from a new broker + waitUntilTrue(() => + ClientUtils.fetchTopicMetadata( + Set.empty, + Seq(new Broker(servers.head.config.brokerId, + servers.head.config.hostName, + servers.head.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), + "TopicMetadataTest-testBasicTopicMetadata", 2000, 0).brokers.size == expectedBrokersCount, + "Alive brokers list is not correctly propagated by coordinator to brokers") + + // Get topic metadata from old broker + val topicMetadata = ClientUtils.fetchTopicMetadata( + Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000, 0) + + // Assert that topic metadata at new brokers is updated correctly + servers.filter(x => x.brokerState.currentState != NotRunning.state).map(x => + assertEquals(topicMetadata, + ClientUtils.fetchTopicMetadata( + Set.empty, + Seq(new Broker(x.config.brokerId, + x.config.hostName, + x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), + "TopicMetadataTest-testBasicTopicMetadata", + 2000, 0) + )) + } + + + def testAliveBrokerListWithNoTopics { + checkMetadata(Seq(server1), 1) + } + + def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup { + var adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p)) + + checkMetadata(adHocServers, numConfigs - 1) + + // Add a broker + adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head)) + + checkMetadata(adHocServers, numConfigs) + adHocServers.map(p => p.shutdown()) + } + + + def testAliveBrokersListWithNoTopicsAfterABrokerShutdown { + val adHocServers = adHocConfigs.map(p => createServer(p)) + + checkMetadata(adHocServers, numConfigs) + + // Shutdown a broker + adHocServers.last.shutdown() + adHocServers.last.awaitShutdown() + + checkMetadata(adHocServers, numConfigs - 1) + + adHocServers.map(p => p.shutdown()) + } } -- 2.3.2 (Apple Git-55)