Index: src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1355197) +++ src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -380,8 +380,7 @@ leader match { case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic, partition)) leader - case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), - new LeaderExists(topic, partition, leaderExists)) + case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partition.toString), new LeaderExistsListener(topic, partition, leaderExists)) leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS) // check if leader is elected val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) @@ -396,7 +395,6 @@ leaderLock.unlock() } } - def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = { val startTime = System.currentTimeMillis() while (true) { @@ -416,24 +414,24 @@ val topic1 = "test1" val topic2 = "test2" - val leader1 = 1; - val ISR1 = List(1, 2, 3) + val leader1 = 0; + val ISR1 = List(0, 1, 2) - val leader2 = 2; - val ISR2 = List(2, 3, 4) + val leader2 = 1; + val ISR2 = List(1, 2, 3) val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1) val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2) - val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1), - ((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2)) + val map = Map(((topic1, 0), leaderAndISR1),// ((topic2, 1), leaderAndISR1), + ((topic2, 0), leaderAndISR2))//, ((topic2, 1), leaderAndISR2)) new LeaderAndISRRequest(1, "client 1", 1, 4, map) } def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = { val topic1 = "test1" val topic2 = "test2" - val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError), - ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError)) + val responseMap = Map(((topic1, 0), ErrorMapping.NoError),// ((topic1, 1), ErrorMapping.NoError), + ((topic2, 0), ErrorMapping.NoError))//, ((topic2, 1), ErrorMapping.NoError)) new LeaderAndISRResponse(1, responseMap) } @@ -448,8 +446,8 @@ def createSampleStopReplicaResponse() : StopReplicaResponse = { val topic1 = "test1" val topic2 = "test2" - val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError), - ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError)) + val responseMap = Map(((topic1, 0), ErrorMapping.NoError),// ((topic1, 2), ErrorMapping.NoError), + ((topic2, 0), ErrorMapping.NoError))//, ((topic2, 2), ErrorMapping.NoError)) new StopReplicaResponse(1, responseMap) } } Index: src/test/scala/unit/kafka/integration/TopicMetadataTest.scala =================================================================== --- src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (revision 1355197) +++ src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (working copy) @@ -96,7 +96,8 @@ // create the kafka request handler val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper) + val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper, null, + null, null, null, 1) // mock the receive API to return the request buffer as created above val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) Index: src/test/scala/unit/kafka/controller/ControllerBasicTest.scala =================================================================== --- src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (revision 1355197) +++ src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (working copy) @@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger import kafka.utils.{ControllerTestUtils, ZkUtils, TestUtils} +import kafka.admin.CreateTopicCommand class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -49,18 +50,24 @@ brokers(3).shutdown() Thread.sleep(1000) - var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) + var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 + info("cur controller " + curController) assertEquals(curController, "2") + brokers(1).startup() brokers(2).shutdown() Thread.sleep(1000) - curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) - assertEquals(curController, "1") + curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 + info("cur controller " + curController) + assertEquals("Controller not right", curController, "1") } - def testControllerCommandSend(){ + /*def testControllerCommandSend(){ Thread.sleep(1000) + CreateTopicCommand.createTopic(zkClient, "test1", 1, 4, "0:1:2:3") + CreateTopicCommand.createTopic(zkClient, "test2", 1, 4, "0:1:2:3") + for(broker <- brokers){ if(broker.kafkaController.isActive){ val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest() @@ -96,5 +103,5 @@ assertEquals(successCount.get(), 8) } } - } + } */ } \ No newline at end of file Index: src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala =================================================================== --- src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (revision 1355197) +++ src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (working copy) @@ -40,7 +40,7 @@ var testData: String = null - testData = ZkUtils.readData(zkClient, "/tmp/zktest") + testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1 Assert.assertNotNull(testData) zkClient.close Index: src/test/scala/unit/kafka/server/LeaderElectionTest.scala =================================================================== --- src/test/scala/unit/kafka/server/LeaderElectionTest.scala (revision 1355197) +++ src/test/scala/unit/kafka/server/LeaderElectionTest.scala (working copy) @@ -13,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.server @@ -52,52 +52,53 @@ servers ++= List(server1, server2) try { - // start 2 brokers - val topic = "new-topic" - val partitionId = 0 + // start 2 brokers + val topic = "new-topic" + val partitionId = 0 - // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") + // create topic with 1 partition, 2 replicas, one on each broker + CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") - // wait until leader is elected - var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) - assertTrue("Leader should get elected", leader.isDefined) - // NOTE: this is to avoid transient test failures - assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) + // wait until leader is elected + var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) + assertTrue("Leader should get elected", leader.isDefined) + // NOTE: this is to avoid transient test failures + assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) + debug("Leader is elected to be: %s".format(leader.getOrElse(-1))) - // kill the server hosting the preferred replica - servers.head.shutdown() + // kill the server hosting the preferred replica + servers.last.shutdown() - // check if leader moves to the other server - leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500) - assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) + // check if leader moves to the other server + //leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500) + Thread.sleep(zookeeper.tickTime) - Thread.sleep(zookeeper.tickTime) + leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) - // bring the preferred replica back - servers.head.startup() + assertEquals("Leader must move to broker 0", 0, leader.getOrElse(-1)) + debug("Leader is elected to be: %s".format(leader.getOrElse(-1))) - leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) - assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1)) - // shutdown current leader (broker 1) - servers.last.shutdown() - leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) + // bring the preferred replica back + servers.last.startup() + servers.head.shutdown() - // test if the leader is the preferred replica - assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1)) + Thread.sleep(zookeeper.tickTime) + + leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) + assertEquals("Leader must return to 1", 1, leader.getOrElse(-1)) + debug("Leader is elected to be: %s".format(leader.getOrElse(-1))) }catch { case e => error("Error while running leader election test ", e) } finally { // shutdown the servers and delete data hosted on them servers.map(server => server.shutdown()) servers.map(server => Utils.rm(server.config.logDir)) - } } // Assuming leader election happens correctly, test if epoch changes as expected - def testEpoch() { + /*def testEpoch() { // keep switching leaders to see if epoch changes correctly val topic = "new-topic" val partitionId = 0 @@ -127,5 +128,5 @@ TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2)) } - } + }*/ } \ No newline at end of file Index: src/test/scala/unit/kafka/server/StateChangeTest.scala =================================================================== --- src/test/scala/unit/kafka/server/StateChangeTest.scala (revision 1355197) +++ src/test/scala/unit/kafka/server/StateChangeTest.scala (working copy) @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.server - -import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness -import kafka.common.QueueFullException -import junit.framework.Assert._ -import kafka.utils.{ZkQueue, TestUtils} - -class StateChangeTest extends JUnit3Suite with ZooKeeperTestHarness { - - val brokerId1 = 0 - val port1 = TestUtils.choosePort() - var stateChangeQ: ZkQueue = null - val config = new KafkaConfig(TestUtils.createBrokerConfig(brokerId1, port1)) - - override def setUp() { - super.setUp() - - // create a queue - val queuePath = "/brokers/state/" + config.brokerId - stateChangeQ = new ZkQueue(zkClient, queuePath, 10) - } - - override def tearDown() { - super.tearDown() - } - - def testZkQueueDrainAll() { - for(i <- 0 until 5) { - val itemPath = stateChangeQ.put("test:0:follower") - val item = itemPath.split("/").last.split("-").last.toInt - assertEquals(i, item) - } - - var numItems: Int = 0 - for(i <- 0 until 5) { - val item = stateChangeQ.take() - assertEquals("test:0:follower", item._2) - assertTrue(stateChangeQ.remove(item)) - numItems += 1 - } - assertEquals(5, numItems) - - for(i <- 5 until 10) { - val itemPath = stateChangeQ.put("test:1:follower") - val item = itemPath.split("/").last.split("-").last.toInt - assertEquals(i+5, item) - } - - numItems = 0 - for(i <- 0 until 5) { - val item = stateChangeQ.take() - assertTrue(stateChangeQ.remove(item)) - assertEquals("test:1:follower", item._2) - numItems += 1 - } - assertEquals(5, numItems) - } - - def testZkQueueFull() { - for(i <- 0 until 10) { - val itemPath = stateChangeQ.put("test:0:follower") - val item = itemPath.split("/").last.split("-").last.toInt - assertEquals(i, item) - } - - try { - stateChangeQ.put("test:0:follower") - fail("Queue should be full") - }catch { - case e:QueueFullException => // expected - } - } - - def testStateChangeCommandJson() { - // test start replica - val topic = "foo" - val partition = 0 - val epoch = 1 - - val startReplica = new StartReplica(topic, partition, epoch) - val startReplicaJson = startReplica.toJson() - val startReplicaFromJson = StateChangeCommand.getStateChangeRequest(startReplicaJson) - assertEquals(startReplica, startReplicaFromJson) - - // test close replica - val closeReplica = new StartReplica(topic, partition, epoch) - val closeReplicaJson = startReplica.toJson() - val closeReplicaFromJson = StateChangeCommand.getStateChangeRequest(closeReplicaJson) - assertEquals(closeReplica, closeReplicaFromJson) - } - - // TODO: Do this after patch for delete topic/delete partition is in - def testStateChangeRequestValidity() { - // mock out the StateChangeRequestHandler - - // setup 3 replicas for one topic partition - - // shutdown follower 1 - - // restart leader to trigger epoch change - - // start follower 1 - - // test follower 1 acted only on one become follower request - } -} \ No newline at end of file Index: src/test/resources/log4j.properties =================================================================== --- src/test/resources/log4j.properties (revision 1355197) +++ src/test/resources/log4j.properties (working copy) @@ -18,7 +18,7 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=ERROR +log4j.logger.kafka=DEBUG # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN Index: src/main/scala/kafka/cluster/Replica.scala =================================================================== --- src/main/scala/kafka/cluster/Replica.scala (revision 1355197) +++ src/main/scala/kafka/cluster/Replica.scala (working copy) @@ -62,6 +62,11 @@ } } + def isLeader: Boolean = { + brokerId == partition.leaderId().get + } + + def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = { highwaterMarkOpt match { case Some(highwaterMark) => Index: src/main/scala/kafka/cluster/Partition.scala =================================================================== --- src/main/scala/kafka/cluster/Partition.scala (revision 1355197) +++ src/main/scala/kafka/cluster/Partition.scala (working copy) @@ -18,10 +18,9 @@ import kafka.common.NoLeaderForPartitionException import kafka.utils.{SystemTime, Time, Logging} -import org.I0Itec.zkclient.ZkClient -import kafka.utils.ZkUtils._ import java.util.concurrent.locks.ReentrantLock import java.lang.IllegalStateException +import scala.collection._ /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR @@ -118,45 +117,21 @@ stuckReplicas ++ slowReplicas } - def updateISR(newISR: Set[Int], zkClientOpt: Option[ZkClient] = None) { - try { - leaderISRUpdateLock.lock() - zkClientOpt match { - case Some(zkClient) => - // update ISR in ZK - updateISRInZk(newISR, zkClient) - case None => - } - // update partition's ISR in cache - inSyncReplicas = newISR.map {r => + def updateISR(newISR: Set[Int]) { + leaderISRUpdateLock.lock() + // update partition's ISR in cache + inSyncReplicas = newISR.map + { + r => getReplica(r) match { case Some(replica) => replica case None => throw new IllegalStateException("ISR update failed. No replica for id %d".format(r)) } - } - info("Updated ISR for for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(","))) - }catch { - case e => throw new IllegalStateException("Failed to update ISR for topic %s ".format(topic) + - "partition %d to %s".format(partitionId, newISR.mkString(",")), e) - }finally { - leaderISRUpdateLock.unlock() } + info("Updated ISR for for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(","))) + leaderISRUpdateLock.unlock() } - private def updateISRInZk(newISR: Set[Int], zkClient: ZkClient) = { - val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString)) - if(replicaListAndEpochString == null) { - throw new NoLeaderForPartitionException(("Illegal partition state. ISR cannot be updated for topic " + - "%s partition %d since leader and ISR does not exist in ZK".format(topic, partitionId))) - } - else { - val replicasAndEpochInfo = replicaListAndEpochString.split(";") - val epoch = replicasAndEpochInfo.last - updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString), - "%s;%s".format(newISR.mkString(","), epoch)) - info("Updating ISR for for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(","))) - } - } override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Partition])) Index: src/main/scala/kafka/admin/AdminUtils.scala =================================================================== --- src/main/scala/kafka/admin/AdminUtils.scala (revision 1355197) +++ src/main/scala/kafka/admin/AdminUtils.scala (working copy) @@ -90,10 +90,10 @@ topics.map { topic => if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) { val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic).iterator).get(topic).get - val sortedPartitions = topicPartitionAssignment.toList.sortWith( (m1,m2) => m1._1.toInt < m2._1.toInt ) + val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) val partitionMetadata = sortedPartitions.map { partitionMap => - val partition = partitionMap._1.toInt + val partition = partitionMap._1 val replicas = partitionMap._2 val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition) val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) @@ -101,7 +101,7 @@ new PartitionMetadata(partition, leader.map(l => getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head), - getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)), + getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas), getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas), None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */) } @@ -112,6 +112,9 @@ } } + scala.Seq + + private def getBrokerInfoFromCache(zkClient: ZkClient, cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker], brokerIds: Seq[Int]): Seq[Broker] = { Index: src/main/scala/kafka/consumer/TopicCount.scala =================================================================== --- src/main/scala/kafka/consumer/TopicCount.scala (revision 1355197) +++ src/main/scala/kafka/consumer/TopicCount.scala (working copy) @@ -66,7 +66,7 @@ consumerId: String, zkClient: ZkClient) : TopicCount = { val dirs = new ZKGroupDirs(group) - val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId) + val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1 val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER) val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER) Index: src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1355197) +++ src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -296,7 +296,7 @@ try { val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) val znode = topicDirs.consumerOffsetDir + "/" + partitionId - val offsetString = readDataMaybeNull(zkClient, znode) + val offsetString = readDataMaybeNull(zkClient, znode)._1 if (offsetString != null) return offsetString.toLong else @@ -418,7 +418,7 @@ } } - private def deletePartitionOwnershipFromZK(topic: String, partition: String) { + private def deletePartitionOwnershipFromZK(topic: String, partition: Int) { val topicDirs = new ZKGroupTopicDirs(group, topic) val znode = topicDirs.consumerOwnerDir + "/" + partition deletePath(zkClient, znode) @@ -429,7 +429,7 @@ info("Releasing partition ownership") for ((topic, infos) <- localTopicRegistry) { for(partition <- infos.keys) - deletePartitionOwnershipFromZK(topic, partition.toString) + deletePartitionOwnershipFromZK(topic, partition) localTopicRegistry.remove(topic) } } @@ -486,7 +486,7 @@ releasePartitionOwnership(topicRegistry) - var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]() + var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]() var currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { @@ -494,7 +494,7 @@ val topicDirs = new ZKGroupTopicDirs(group, topic) val curConsumers = consumersPerTopicMap.get(topic).get - var curPartitions: Seq[String] = partitionsPerTopicMap.get(topic).get + var curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get val nPartsPerConsumer = curPartitions.size / curConsumers.size val nConsumersWithExtraPart = curPartitions.size % curConsumers.size @@ -587,13 +587,13 @@ } } - private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = { - var successfullyOwnedPartitions : List[(String, String)] = Nil + private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, Int), String]): Boolean = { + var successfullyOwnedPartitions : List[(String, Int)] = Nil val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner => val topic = partitionOwner._1._1 val partition = partitionOwner._1._2 val consumerThreadId = partitionOwner._2 - val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic,partition) + val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition) try { createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId) info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic) @@ -618,29 +618,29 @@ } private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]], - topicDirs: ZKGroupTopicDirs, partition: String, + topicDirs: ZKGroupTopicDirs, partition: Int, topic: String, consumerThreadId: String) { val partTopicInfoMap = currentTopicRegistry.get(topic) // find the leader for this partition - val leaderOpt = getLeaderForPartition(zkClient, topic, partition.toInt) + val leaderOpt = getLeaderForPartition(zkClient, topic, partition) leaderOpt match { - case None => throw new NoBrokersForPartitionException("No leader available for partition %s on topic %s". + case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s". format(partition, topic)) - case Some(l) => debug("Leader for partition %s for topic %s is %d".format(partition, topic, l)) + case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l)) } val leader = leaderOpt.get val znode = topicDirs.consumerOffsetDir + "/" + partition - val offsetString = readDataMaybeNull(zkClient, znode) + val offsetString = readDataMaybeNull(zkClient, znode)._1 // If first time starting a consumer, set the initial offset based on the config var offset : Long = 0L if (offsetString == null) offset = config.autoOffsetReset match { case OffsetRequest.SmallestTimeString => - earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.EarliestTime) + earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime) case OffsetRequest.LargestTimeString => - earliestOrLatestOffset(topic, leader, partition.toInt, OffsetRequest.LatestTime) + earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime) case _ => throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig") } @@ -651,12 +651,12 @@ val fetchedOffset = new AtomicLong(offset) val partTopicInfo = new PartitionTopicInfo(topic, leader, - partition.toInt, + partition, queue, consumedOffset, fetchedOffset, new AtomicInteger(config.fetchSize)) - partTopicInfoMap.put(partition.toInt, partTopicInfo) + partTopicInfoMap.put(partition, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) } } Index: src/main/scala/kafka/tools/VerifyConsumerRebalance.scala =================================================================== --- src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (revision 1355197) +++ src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (working copy) @@ -104,7 +104,7 @@ } // try reading the partition owner path for see if a valid consumer id exists there val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition - val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath) + val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1 if(partitionOwner == null) { error("No owner for topic %s partition %s".format(topic, partition)) rebalanceSucceeded = false Index: src/main/scala/kafka/tools/ExportZkOffsets.scala =================================================================== --- src/main/scala/kafka/tools/ExportZkOffsets.scala (revision 1355197) +++ src/main/scala/kafka/tools/ExportZkOffsets.scala (working copy) @@ -100,7 +100,7 @@ for (bidPid <- bidPidList) { val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic) val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid - val offsetVal = ZkUtils.readDataMaybeNull(zkClient, offsetPath) + val offsetVal = ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1 fileWriter.write(offsetPath + ":" + offsetVal + "\n") debug(offsetPath + " => " + offsetVal) } Index: src/main/scala/kafka/tools/ConsumerOffsetChecker.scala =================================================================== --- src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (revision 1355197) +++ src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (working copy) @@ -33,7 +33,7 @@ // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092 private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = { - val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid)) + val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1 val consumer = brokerInfo match { case BrokerIpPattern(ip, port) => Some(new SimpleConsumer(ip, port.toInt, 10000, 100000)) @@ -47,9 +47,9 @@ private def processPartition(zkClient: ZkClient, group: String, topic: String, bidPid: String) { val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s". - format(group, topic, bidPid)).toLong + format(group, topic, bidPid))._1.toLong val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s". - format(group, topic, bidPid)) + format(group, topic, bidPid))._1 println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid)) println("%20s%s".format("Owner = ", owner)) println("%20s%d".format("Consumer offset = ", offset)) Index: src/main/scala/kafka/utils/ZkUtils.scala =================================================================== --- src/main/scala/kafka/utils/ZkUtils.scala (revision 1355197) +++ src/main/scala/kafka/utils/ZkUtils.scala (working copy) @@ -27,12 +27,14 @@ import org.I0Itec.zkclient.serialize.ZkSerializer import scala.collection._ import util.parsing.json.JSON +import kafka.api.LeaderAndISR +import org.apache.zookeeper.data.Stat + object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" - val BrokerStatePath = "/brokers/state" val ControllerPath = "/controller" def getTopicPath(topic: String): String ={ @@ -44,7 +46,7 @@ } def getController(zkClient: ZkClient): Int= { - val controller = readDataMaybeNull(zkClient, ControllerPath) + val controller = readDataMaybeNull(zkClient, ControllerPath)._1 controller.toInt } @@ -52,45 +54,56 @@ getTopicPartitionsPath(topic) + "/" + partitionId } - def getTopicPartitionLeaderAndISR(topic: String, partitionId: String): String ={ + def getTopicPartitionLeaderAndISRPath(topic: String, partitionId: String): String ={ getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR" } - def getTopicVersion(zkClient: ZkClient, topic: String): String ={ - readDataMaybeNull(zkClient, getTopicPath(topic)) + def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={ + ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted } - def getTopicPartitionReplicasPath(topic: String, partitionId: String): String ={ - getTopicPartitionPath(topic, partitionId) + "/" + "replicas" + def getAllLiveBrokerIds(zkClient: ZkClient): Set[Int] = { + ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).toSet } - def getTopicPartitionInSyncPath(topic: String, partitionId: String): String ={ - getTopicPartitionPath(topic, partitionId) + "/" + "isr" + def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = { + val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted + getBrokerInfoFromIds(zkClient, brokerIds.map(_.toInt)) } - def getTopicPartitionLeaderPath(topic: String, partitionId: String): String ={ - getTopicPartitionPath(topic, partitionId) + "/" + "leader" - } - def getBrokerStateChangePath(brokerId: Int): String = { - BrokerStatePath + "/" + brokerId - } + def getLeaderAndISRForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndISR] = { + val (leaderAndISRStr: String, stat: Stat) = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition.toString)) + debug("Check the leaderEpocAndISR raw string: %s ".format(leaderAndISRStr)) + if(leaderAndISRStr == null) None + else { + JSON.parseFull(leaderAndISRStr) match { + case Some(m) => + val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt + val epoc = m.asInstanceOf[Map[String, String]].get("leaderEpoc").get.toInt + val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get + val ISR = Utils.getCSVList(ISRString).map(r => r.toInt) + val zkPathVersion = stat.getVersion + debug("The leader: %d, epoc: %d, ISR: %s, zkPathVersion: %d".format(leader, epoc, ISR.toString(), zkPathVersion)) + Some(LeaderAndISR(leader, epoc, ISR.toList, zkPathVersion)) + case None => None + } + } - def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={ - ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted } - def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = { - val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted - getBrokerInfoFromIds(zkClient, brokerIds.map(b => b.toInt)) - } def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = { - val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString)) - if(leaderAndEpoch == null) None + val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition.toString))._1 + debug("Get leader, check the leaderEpocAndISR raw string: %s ".format(leaderAndISR)) + + if(leaderAndISR == null) None else { - val leaderAndEpochInfo = leaderAndEpoch.split(";") - Some(leaderAndEpochInfo.head.toInt) + JSON.parseFull(leaderAndISR) match { + case Some(m) => + Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt) + case None => None + } } } @@ -99,51 +112,53 @@ * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some * other broker will retry becoming leader with the same new epoch value. */ - def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = { - val lastKnownEpoch = try { - val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString)) - if(isrAndEpoch != null) { - val isrAndEpochInfo = isrAndEpoch.split(";") - if(isrAndEpochInfo.last.isEmpty) - throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition)) - else - isrAndEpochInfo.last.toInt - }else { - throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition)) + def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = { + val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition.toString))._1 + if(leaderAndISR != null) { + val epoch = JSON.parseFull(leaderAndISR) match { + case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition)) + case Some(m) => + m.asInstanceOf[Map[String, String]].get("epoc").get.toInt } - } catch { - case e: ZkNoNodeException => - throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition)) - case e1 => throw e1 + epoch } - lastKnownEpoch + else + throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d is empty".format(topic, partition)) } /** - * Gets the assigned replicas (AR) for a specific topic and partition + * Gets the in-sync replicas (ISR) for a specific topic and partition */ - def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = { - val topicAndPartitionAssignment = getPartitionAssignmentForTopics(zkClient, List(topic).iterator) - topicAndPartitionAssignment.get(topic) match { - case Some(partitionAssignment) => partitionAssignment.get(partition.toString) match { - case Some(replicaList) => replicaList - case None => Seq.empty[String] + def getInSyncReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = { + val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition.toString))._1 + if(leaderAndISR == null) Seq.empty[Int] + else { + JSON.parseFull(leaderAndISR) match { + case Some(m) => + val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get + Utils.getCSVList(ISRString).map(r => r.toInt) + case None => Seq.empty[Int] } - case None => Seq.empty[String] } } /** - * Gets the in-sync replicas (ISR) for a specific topic and partition + * Gets the assigned replicas (AR) for a specific topic and partition */ - def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = { - val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString)) - if(replicaListAndEpochString == null) - Seq.empty[Int] - else { - val replicasAndEpochInfo = replicaListAndEpochString.split(";") - Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt) - } + def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = { + val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1 + val assignedReplicas = if (jsonPartitionMap == null) { + Seq.empty[Int] + } else { + JSON.parseFull(jsonPartitionMap) match { + case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match { + case None => Seq.empty[Int] + case Some(seq) => seq.map(_.toInt) + } + case None => Seq.empty[Int] + } + } + assignedReplicas } def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { @@ -152,25 +167,7 @@ replicas.contains(brokerId.toString) } - def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = { - try { - // NOTE: first increment epoch, then become leader - val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId) - createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString), - "%d;%d".format(brokerId, newEpoch)) - val currentISR = getInSyncReplicasForPartition(client, topic, partition) - val updatedISR = if(currentISR.size == 0) List(brokerId) else currentISR - updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString), - "%s;%d".format(updatedISR.mkString(","), newEpoch)) - info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition)) - Some(newEpoch, updatedISR) - } catch { - case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None - case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe); None - } - } - - def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = { + def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = { // read previous epoch, increment it and write it to the leader path and the ISR path. val epoch = try { Some(getEpochForPartition(client, topic, partition)) @@ -206,7 +203,7 @@ info("Registering broker " + brokerIdPath + " succeeded with " + broker) } - def getConsumerPartitionOwnerPath(group: String, topic: String, partition: String): String = { + def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { val topicDirs = new ZKGroupTopicDirs(group, topic) topicDirs.consumerOwnerDir + "/" + partition } @@ -254,7 +251,7 @@ // this can happen when there is connection loss; make sure the data is what we intend to write var storedData: String = null try { - storedData = readData(client, path) + storedData = readData(client, path)._1 } catch { case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this case e2 => throw e2 @@ -292,17 +289,24 @@ /** * Update the value of a persistent node with the given path and data. * create parrent directory if necessary. Never throw NodeExistException. + * Return the updated path zkVersion */ - def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = { + def updatePersistentPath(client: ZkClient, path: String, data: String): Int = { + var stat: Stat = null try { - client.writeData(path, data) + stat = client.writeData(path, data) + stat.getVersion } catch { case e: ZkNoNodeException => { createParentPath(client, path) try { client.createPersistent(path, data) + // When the new path is created, its zkVersion always starts from 0 + 0 } catch { - case e: ZkNodeExistsException => client.writeData(path, data) + case e: ZkNodeExistsException => + stat = client.writeData(path, data) + stat.getVersion case e2 => throw e2 } } @@ -311,6 +315,22 @@ } /** + * Conditional update the persistent path data, return true if it succeeds, otherwise (the path doesn't + * exist, the current version is not the expected version, etc.) return false + */ + def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): Boolean = { + try { + client.writeData(path, data, expectVersion) + info("Conditional update the zkPath %s with expected version %d succeed".format(path, expectVersion)) + true + } catch { + case e: Exception => + info("Conditional update the zkPath %s with expected version %d failed".format(path, expectVersion)) + false + } + } + + /** * Update the value of a persistent node with the given path and data. * create parrent directory if necessary. Never throw NodeExistException. */ @@ -349,12 +369,23 @@ } } - def readData(client: ZkClient, path: String): String = { - client.readData(path) + def readData(client: ZkClient, path: String): (String, Stat) = { + val stat: Stat = new Stat() + val dataStr: String = client.readData(path, stat) + (dataStr, stat) } - def readDataMaybeNull(client: ZkClient, path: String): String = { - client.readData(path, true) + def readDataMaybeNull(client: ZkClient, path: String): (String, Stat) = { + val stat: Stat = new Stat() + var dataStr: String = null + try{ + dataStr = client.readData(path, stat) + (dataStr, stat) + } catch { + case e: ZkNoNodeException => + (null, stat) + case e2 => throw e2 + } } def getChildren(client: ZkClient, path: String): Seq[String] = { @@ -366,7 +397,6 @@ def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = { import scala.collection.JavaConversions._ // triggers implicit conversion from java list to scala Seq - try { client.getChildren(path) } catch { @@ -388,22 +418,56 @@ val cluster = new Cluster val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath) for (node <- nodes) { - val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node) + val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1 cluster.add(Broker.createBroker(node.toInt, brokerZKString)) } cluster } - def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Map[String, List[String]]] = { - val ret = new mutable.HashMap[String, Map[String, List[String]]]() + def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), collection.Set[Int]] = { + val ret = new mutable.HashMap[(String, Int), Set[Int]] topics.foreach{ topic => - val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic)) + val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1 + if (jsonPartitionMap != null) { + JSON.parseFull(jsonPartitionMap) match { + case Some(m) => + val replicaMap = m.asInstanceOf[Map[String, List[String]]] + for((partition, replicas) <- replicaMap){ + ret.put((topic, partition.toInt), replicas.map(_.toInt).toSet) + debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas.mkString(","))) + } + case None => + } + } + } + ret + } + + def getPartitionLeaderAndISRFroTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), LeaderAndISR] = { + val ret = new mutable.HashMap[(String, Int), LeaderAndISR] + val partitionsForTopics = getPartitionsForTopics(zkClient, topics) + for((topic, partitions) <- partitionsForTopics){ + for(partition <- partitions){ + val leaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition.toInt) + if(leaderAndISR.isDefined) + ret.put((topic, partition.toInt), leaderAndISR.get) + } + } + ret + } + + def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = { + val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]() + topics.foreach{ topic => + val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1 val partitionMap = if (jsonPartitionMap == null) { - Map[String, List[String]]() + Map[Int, Seq[Int]]() } else { JSON.parseFull(jsonPartitionMap) match { - case Some(m) => m.asInstanceOf[Map[String, List[String]]] - case None => Map[String, List[String]]() + case Some(m) => + val m1 = m.asInstanceOf[Map[String, Seq[String]]] + m1.map(p => (p._1.toInt, p._2.map(_.toInt))) + case None => Map[Int, Seq[Int]]() } } debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap)) @@ -412,7 +476,7 @@ ret } - def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = { + def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[Int]] = { getPartitionAssignmentForTopics(zkClient, topics).map{ topicAndPartitionMap => val topic = topicAndPartitionMap._1 val partitionMap = topicAndPartitionMap._2 @@ -421,14 +485,18 @@ } } - def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = { + def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[(String, Int), Seq[Int]] = { + val ret = new mutable.HashMap[(String, Int), Seq[Int]] val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator) topicsAndPartitions.map{ topicAndPartitionMap => val topic = topicAndPartitionMap._1 val partitionMap = topicAndPartitionMap._2 - val relevantPartitions = partitionMap.filter( m => m._2.contains(brokerId.toString) ) - (topic -> relevantPartitions.keySet.map(_.toInt).toSeq) + val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) ) + for((relevantPartition, replicaAssignment) <- relevantPartitionsMap){ + ret.put((topic, relevantPartition), replicaAssignment) + } } + ret } def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) { @@ -447,7 +515,7 @@ val dirs = new ZKGroupDirs(group) val consumersInGroup = getConsumersInGroup(zkClient, group) val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId, - ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient)) + ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient)) consumersInGroup.zip(topicCountMaps).toMap } @@ -471,7 +539,7 @@ } def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] = - brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) ) + brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1)) def getAllTopics(zkClient: ZkClient): Seq[String] = { val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) @@ -481,7 +549,7 @@ } -class LeaderExists(topic: String, partition: Int, leaderExists: Condition) extends IZkDataListener { +class LeaderExistsListener(topic: String, partition: Int, leaderExists: Condition) extends IZkDataListener { @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { val t = dataPath.split("/").takeRight(3).head @@ -494,7 +562,6 @@ def handleDataDeleted(dataPath: String) { leaderExists.signal() } - } object ZKStringSerializer extends ZkSerializer { Index: src/main/scala/kafka/utils/UpdateOffsetsInZK.scala =================================================================== --- src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (revision 1355197) +++ src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (working copy) @@ -45,7 +45,7 @@ private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = { val cluster = ZkUtils.getCluster(zkClient) val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator) - var partitions: Seq[String] = Nil + var partitions: Seq[Int] = Nil partitionsPerTopicMap.get(topic) match { case Some(l) => partitions = l.sortWith((s,t) => s < t) @@ -54,7 +54,7 @@ var numParts = 0 for (partition <- partitions) { - val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition.toInt) + val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition) val broker = brokerHostingPartition match { case Some(b) => b @@ -68,7 +68,7 @@ val brokerInfo = brokerInfos.head val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024) - val offsets = consumer.getOffsetsBefore(topic, partition.toInt, offsetOption, 1) + val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1) val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) println("updating partition " + partition + " with new offset: " + offsets(0)) Index: src/main/scala/kafka/utils/ZkQueue.scala =================================================================== --- src/main/scala/kafka/utils/ZkQueue.scala (revision 1355197) +++ src/main/scala/kafka/utils/ZkQueue.scala (working copy) @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.utils - -import kafka.utils.ZkUtils._ -import kafka.common.QueueFullException -import org.I0Itec.zkclient.{IZkChildListener, ZkClient} -import java.util.concurrent.PriorityBlockingQueue -import java.util.Comparator - -class ZkQueue(zkClient: ZkClient, path: String, size: Int) { - // create the queue in ZK, if one does not exist - makeSurePersistentPathExists(zkClient, path) - val queueItems = new PriorityBlockingQueue[String](size, new ZkQueueComparator) - var latestQueueItemPriority: Int = -1 - zkClient.subscribeChildChanges(path, new ZkQueueListener) - - // TODO: This API will be used by the leader to enqueue state change requests to the followers - /** - * Inserts the specified element into this priority queue. This method will never block. If the queue is full, - * it will throw QueueFullException - * @param item Item to add to the zookeeper queue - * @returns The zookeeper location of item in the queue - */ - def put(item: String): String = { - // if queue is full, throw QueueFullException - if(isFull) - throw new QueueFullException("Queue is full. Item %s will be rejected".format(item)) - val queueLocation = createSequentialPersistentPath(zkClient, path + "/", item) - debug("Added item %s to queue at location %s".format(item, queueLocation)) - queueLocation - } - - /** - * Reads all the items and their queue locations in this queue - * @returns A list of (queue_location, item) pairs - */ - def readAll(): Seq[(String, String)] = { - val allItems = getChildren(zkClient, path).sorted - allItems.size match { - case 0 => Seq.empty[(String, String)] - case _ => allItems.map { item => - // read the data and delete the node - val queueLocation = path + "/" + item - val data = ZkUtils.readData(zkClient, queueLocation) - (item, data) - } - } - } - - /** - * Returns true if this zookeeper queue contains no elements. - */ - def isEmpty: Boolean = (readAll().size == 0) - - // TODO: Implement the queue shrink operation if the queue is full, as part of create/delete topic - /** - * Returns true if this zookeeper queue contains number of items equal to the size of the queue - */ - def isFull: Boolean = (readAll().size == size) - - /** - * Retrieves but does not remove the head of this queue, waiting if necessary until an element becomes available. - * @returns The location of the head and the head element in the zookeeper queue - */ - def take(): (String, String) = { - // take the element key - val item = queueItems.take() - val queueLocation = path + "/" + item - val data = ZkUtils.readData(zkClient, queueLocation) - (item, data) - } - - /** - * Removes a single instance of the specified element from this queue, if it is present. More formally, removes an - * element e such that o.equals(e), if this queue contains one or more such elements. Returns true if this queue - * contained the specified element (or equivalently, if this queue changed as a result of the call). - * @param queueItem A tuple where the first element is the location of the item as returned by the take() API and the - * second element is the queue item to be removed - */ - def remove(queueItem: (String, String)): Boolean = { - val queueLocation = path + "/" + queueItem._1 - // we do not want to remove items from the queue if they were not read - assert(!queueItems.contains(queueItem._1), "Attempt to remove unconsumed item %s from the queue".format(queueItem)) - ZkUtils.deletePath(zkClient, queueLocation) - } - - class ZkQueueListener extends IZkChildListener with Logging { - - @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - debug("ZkQueue listener fired for queue %s with children %s and latest queue item priority %d" - .format(path, curChilds.toString, latestQueueItemPriority)) - import scala.collection.JavaConversions._ - val outstandingRequests = asBuffer(curChilds).sortWith((req1, req2) => req1.toInt < req2.toInt) - outstandingRequests.foreach { req => - val queueItemPriority = req.toInt - if(queueItemPriority > latestQueueItemPriority) { - latestQueueItemPriority = queueItemPriority - queueItems.add(req) - debug("Added item %s to queue %s".format(req, path)) - } - } - } - } - - class ZkQueueComparator extends Comparator[String] { - def compare(element1: String, element2: String): Int = { - element1.toInt - element2.toInt - } - } -} \ No newline at end of file Index: src/main/scala/kafka/server/KafkaZooKeeper.scala =================================================================== --- src/main/scala/kafka/server/KafkaZooKeeper.scala (revision 1355197) +++ src/main/scala/kafka/server/KafkaZooKeeper.scala (working copy) @@ -18,44 +18,56 @@ package kafka.server import java.net.InetAddress -import kafka.cluster.Replica import kafka.utils._ import org.apache.zookeeper.Watcher.Event.KeeperState -import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient} -import kafka.admin.AdminUtils -import java.lang.{Thread, IllegalStateException} -import collection.mutable.HashSet -import kafka.common.{InvalidPartitionException, NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient} +import org.I0Itec.zkclient.{IZkStateListener, ZkClient} +import kafka.common._ +import kafka.cluster.Replica +import kafka.api.LeaderAndISR + /** * Handles the server's interaction with zookeeper. The server needs to register the following paths: * /topics/[topic]/[node_id-partition_num] * /brokers/[0...N] --> host:port - * */ class KafkaZooKeeper(config: KafkaConfig, addReplicaCbk: (String, Int, Set[Int]) => Replica, - getReplicaCbk: (String, Int) => Option[Replica], - becomeLeader: (Replica, Seq[Int]) => Unit, - becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging { + becomeLeader: (Replica, LeaderAndISR) => Short, + becomeFollower: (Replica, LeaderAndISR) => Short) extends Logging { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId private var zkClient: ZkClient = null - private val leaderChangeListener = new LeaderChangeListener - private val topicPartitionsChangeListener = new TopicChangeListener - private var stateChangeHandler: StateChangeCommandHandler = null - private val topicListenerLock = new Object - private val leaderChangeLock = new Object + def startup() { + /* start client */ + info("connecting to ZK: " + config.zkConnect) + zkClient = KafkaZookeeperClient.getZookeeperClient(config) + zkClient.subscribeStateChanges(new SessionExpireListener) + registerBrokerInZk() + } - def startup() { - /* start client */ - info("connecting to ZK: " + config.zkConnect) - zkClient = KafkaZookeeperClient.getZookeeperClient(config) - startStateChangeCommandHandler() - zkClient.subscribeStateChanges(new SessionExpireListener) - registerBrokerInZk() - subscribeToTopicAndPartitionsChanges(true) + /** + * At broker startup, it reads the current setup in zookeeper to startup local replica, this function + * should be called after the replica manager is initialized, which depends on the zkClient which is + * initialized in the KafkaZooKeeper.startup() function + **/ + def initLocalReplicas() { + info("Broker %d startup and init local replicas".format(config.brokerId)) + val allTopics = ZkUtils.getAllTopics(zkClient) + val partitionAssignment = ZkUtils.getPartitionsAssignedToBroker(zkClient, allTopics, config.brokerId) + for((topicPartition, assignedReplicas) <- partitionAssignment){ + val replica = addReplicaCbk(topicPartition._1, topicPartition._2, assignedReplicas.toSet) + val leaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2) + + /** assign this broker as leader or follower only when it's configured in the leaderAndISR path **/ + if(leaderAndISR.isDefined){ + if(leaderAndISR.get.leader == config.brokerId) + becomeLeader(replica, leaderAndISR.get) + else + becomeFollower(replica, leaderAndISR.get) + } + } } private def registerBrokerInZk() { @@ -65,13 +77,6 @@ ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port) } - private def startStateChangeCommandHandler() { - val stateChangeQ = new ZkQueue(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId), config.stateChangeQSize) - stateChangeHandler = new StateChangeCommandHandler("StateChangeCommandHandler", config, stateChangeQ, - ensureStateChangeCommandValidityOnThisBroker, ensureEpochValidity) - stateChangeHandler.start() - } - /** * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a * connection for us. We need to re-register this broker in the broker registry. @@ -95,17 +100,11 @@ registerBrokerInZk() info("done re-registering broker") info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) - zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener) - val topics = ZkUtils.getAllTopics(zkClient) - debug("Existing topics are %s".format(topics.mkString(","))) - topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener)) - handleNewTopics(topics) } } def close() { if (zkClient != null) { - stateChangeHandler.shutdown() info("Closing zookeeper client...") zkClient.close() } @@ -128,250 +127,10 @@ } } - def getZookeeperClient = zkClient - - def handleNewTopics(topics: Seq[String]) { - // get relevant partitions to this broker - val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId) - debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(","))) - for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) { - // subscribe to leader changes for these partitions - subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker) - // start replicas for these partitions - startReplicasForPartitions(topic, partitionsAssignedToThisBroker) - } + def getZookeeperClient = { + //if (zkClient == null) + //zkClient = KafkaZookeeperClient.getZookeeperClient(config) + zkClient } - def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) { - info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) - zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener) - val topics = ZkUtils.getAllTopics(zkClient) - val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId) - debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(","))) - for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) { - // subscribe to leader changes for these partitions - subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker) - - // start replicas for these partitions - if(startReplicas) - startReplicasForPartitions(topic, partitionsAssignedToThisBroker) - } - } - - private def subscribeToLeaderForPartitions(topic: String, partitions: Seq[Int]) { - partitions.foreach { partition => - info("Broker %d subscribing to leader changes for topic %s partition %d".format(config.brokerId, topic, partition)) - // register leader change listener - zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener) - } - } - - private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) { - partitions.foreach { partition => - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt) - info("Assigned replicas list for topic %s partition %d is %s".format(topic, partition, assignedReplicas.mkString(","))) - if(assignedReplicas.contains(config.brokerId)) { - val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet) - startReplica(replica) - } else - warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it" - .format(partition, topic, config.brokerId)) - } - } - - private def startReplica(replica: Replica) { - info("Starting replica for topic %s partition %d on broker %d" - .format(replica.topic, replica.partition.partitionId, replica.brokerId)) - ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match { - case Some(leader) => - info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,leader)) - // check if this broker is the leader, if not, then become follower - if(leader != config.brokerId) - becomeFollower(replica, leader, zkClient) - case None => // leader election - leaderElection(replica) - } - } - - def leaderElection(replica: Replica) { - info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partitionId)) - // read the AR list for replica.partition from ZK - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt) - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId) - val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt) - if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId, assignedReplicas, inSyncReplicas, liveBrokers)) { - info("Broker %d will participate in leader election for topic %s partition %d" - .format(config.brokerId, replica.topic, replica.partition.partitionId)) - // wait for some time if it is not the preferred replica - try { - if(replica.brokerId != assignedReplicas.head) { - // sleep only if the preferred replica is alive - if(liveBrokers.contains(assignedReplicas.head)) { - info("Preferred replica %d for topic %s ".format(assignedReplicas.head, replica.topic) + - "partition %d is alive. Waiting for %d ms to allow it to become leader" - .format(replica.partition.partitionId, config.preferredReplicaWaitTime)) - Thread.sleep(config.preferredReplicaWaitTime) - } - } - } catch { - case e => // ignoring - } - val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, - replica.partition.partitionId, replica.brokerId) - newLeaderEpochAndISR match { - case Some(epochAndISR) => - info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, - replica.partition.partitionId)) - info("Current ISR for topic %s partition %d is %s".format(replica.topic, replica.partition.partitionId, - epochAndISR._2.mkString(","))) - becomeLeader(replica, epochAndISR._2) - case None => - ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match { - case Some(leader) => - becomeFollower(replica, leader, zkClient) - case None => - error("Lost leader for topic %s partition %d right after leader election".format(replica.topic, - replica.partition.partitionId)) - } - } - } - } - - private def canBecomeLeader(brokerId: Int, topic: String, partition: Int, assignedReplicas: Seq[Int], - inSyncReplicas: Seq[Int], liveBrokers: Seq[Int]): Boolean = { - // TODO: raise alert, mark the partition offline if no broker in the assigned replicas list is alive - assert(assignedReplicas.size > 0, "There should be at least one replica in the assigned replicas list for topic " + - " %s partition %d".format(topic, partition)) - inSyncReplicas.size > 0 match { - case true => // check if this broker is in the ISR. If yes, return true - inSyncReplicas.contains(brokerId) match { - case true => - info("Broker %d can become leader since it is in the ISR %s".format(brokerId, inSyncReplicas.mkString(",")) + - " for topic %s partition %d".format(topic, partition)) - true - case false => - // check if any broker in the ISR is alive. If not, return true only if this broker is in the AR - val liveBrokersInISR = inSyncReplicas.filter(r => liveBrokers.contains(r)) - liveBrokersInISR.isEmpty match { - case true => - if(assignedReplicas.contains(brokerId)) { - info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) + - " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s" - .format(partition, brokerId, assignedReplicas.mkString(","))) - true - } else { - info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) + - " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s" - .format(partition, brokerId, assignedReplicas.mkString(","))) - false - } - case false => - info("ISR for topic %s partition %d is %s. Out of these %s brokers are alive. Broker %d " - .format(topic, partition, inSyncReplicas.mkString(",")) + "cannot become leader since it doesn't exist " + - "in the ISR") - false // let one of the live brokers in the ISR become the leader - } - } - case false => - if(assignedReplicas.contains(brokerId)) { - info("ISR for topic %s partition %d is empty. Broker %d can become leader since it " - .format(topic, partition, brokerId) + "is part of the assigned replicas list") - true - } else { - info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it " - .format(topic, partition, brokerId) + "is not part of the assigned replicas list") - false - } - } - } - - class TopicChangeListener extends IZkChildListener with Logging { - private val allTopics = new HashSet[String]() - - @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - import collection.JavaConversions - topicListenerLock.synchronized { - debug("Topic/partition change listener fired for path " + parentPath) - val currentChildren = JavaConversions.asBuffer(curChilds).toSet - val newTopics = currentChildren -- allTopics - val deletedTopics = allTopics -- currentChildren - allTopics.clear() - allTopics ++ currentChildren - - debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(","))) - handleNewTopics(newTopics.toSeq) - // TODO: Handle topic deletions - //handleDeletedTopics(deletedTopics.toSeq) - } - } - - def doesTopicExistInCluster(topic: String): Boolean = { - allTopics.contains(topic) - } - } - - private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = { - // check if this broker hosts a replica for this topic and partition - ZkUtils.isPartitionOnBroker(zkClient, stateChangeCommand.topic, stateChangeCommand.partition, config.brokerId) - } - - private def ensureEpochValidity(stateChangeCommand: StateChangeCommand): Boolean = { - // get the topic and partition that this request is meant for - val topic = stateChangeCommand.topic - val partition = stateChangeCommand.partition - val epoch = stateChangeCommand.epoch - - val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition) - // check if the request's epoch matches the current leader's epoch OR the admin command's epoch - val validEpoch = (currentLeaderEpoch == epoch) || (epoch == AdminUtils.AdminEpoch) - if(epoch > currentLeaderEpoch) - throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " + - "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition)) - validEpoch - } - - class LeaderChangeListener extends IZkDataListener with Logging { - - @throws(classOf[Exception]) - def handleDataChange(dataPath: String, data: Object) { - // handle leader change event for path - val newLeaderAndEpochInfo: String = data.asInstanceOf[String] - val newLeader = newLeaderAndEpochInfo.split(";").head.toInt - val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt - debug("Leader change listener fired for path %s. New leader is %d. New epoch is %d".format(dataPath, newLeader, newEpoch)) - val topicPartitionInfo = dataPath.split("/") - val topic = topicPartitionInfo.takeRight(4).head - val partition = topicPartitionInfo.takeRight(2).head.toInt - info("Updating leader change information in replica for topic %s partition %d".format(topic, partition)) - val replica = getReplicaCbk(topic, partition).getOrElse(null) - assert(replica != null, "Replica for topic %s partition %d should exist on broker %d" - .format(topic, partition, config.brokerId)) - replica.partition.leaderId(Some(newLeader)) - assert(getReplicaCbk(topic, partition).get.partition.leaderId().get == newLeader, "New leader should be set correctly") - } - - @throws(classOf[Exception]) - def handleDataDeleted(dataPath: String) { - leaderChangeLock.synchronized { - // leader is deleted for topic partition - val topic = dataPath.split("/").takeRight(4).head - val partitionId = dataPath.split("/").takeRight(2).head.toInt - debug("Leader deleted listener fired for topic %s partition %d on broker %d" - .format(topic, partitionId, config.brokerId)) - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt) - if(assignedReplicas.contains(config.brokerId)) { - val replica = getReplicaCbk(topic, partitionId) - replica match { - case Some(r) => leaderElection(r) - case None => error("No replica exists for topic %s partition %s on broker %d" - .format(topic, partitionId, config.brokerId)) - } - } - } - } - } } - - - Index: src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- src/main/scala/kafka/server/KafkaServer.scala (revision 1355197) +++ src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -24,8 +24,10 @@ import java.util.concurrent._ import atomic.AtomicBoolean import kafka.cluster.Replica -import org.I0Itec.zkclient.ZkClient +import kafka.api.LeaderAndISR +import scala.collection._ + /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. @@ -42,7 +44,7 @@ var kafkaZookeeper: KafkaZooKeeper = null private var replicaManager: ReplicaManager = null private var apis: KafkaApis = null - var kafkaController: KafkaController = new KafkaController(config) + var kafkaController: KafkaController = null /** * Start up API for bringing up a single instance of the Kafka server. @@ -69,16 +71,26 @@ config.monitoringPeriodSecs, config.numQueuedRequests, config.maxSocketRequestSize) + socketServer.startup + Utils.registerMBean(socketServer.stats, statsMBeanName) - kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica, makeLeader, makeFollower) + kafkaZookeeper = new KafkaZooKeeper(config, addReplica, makeLeader, makeFollower) + // starting relevant replicas and leader election for partitions assigned to this broker + kafkaZookeeper.startup + replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient) - apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper) + kafkaZookeeper.initLocalReplicas() + + kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient) + + apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper, + addReplica, stopReplica, makeLeader, makeFollower, config.brokerId) requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads) - socketServer.startup + Mx4jLoader.maybeLoad /** @@ -87,14 +99,12 @@ */ logManager.startup - // starting relevant replicas and leader election for partitions assigned to this broker - kafkaZookeeper.startup - kafkaController.startup() info("Server started.") } - + + /** * Shutdown API for shutting down a single instance of the Kafka server. * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread @@ -113,11 +123,11 @@ Utils.unregisterMBean(statsMBeanName) if(logManager != null) logManager.close() + kafkaZookeeper.close + if(kafkaController != null) kafkaController.shutDown() - kafkaZookeeper.close - val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile) debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath()) cleanShutDownFile.createNewFile @@ -132,23 +142,27 @@ def awaitShutdown(): Unit = shutdownLatch.await() def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = { - info("Added local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId)) + //info("To add local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId)) // get local log val log = logManager.getOrCreateLog(topic, partition) replicaManager.addLocalReplica(topic, partition, log, assignedReplicas) } - def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) { - replicaManager.makeLeader(replica, currentISRInZk) + def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = { + replicaManager.makeLeader(replica, leaderAndISR) } - def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) { - replicaManager.makeFollower(replica, leaderBrokerId, zkClient) + def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = { + replicaManager.makeFollower(replica, leaderAndISR) } def getReplica(topic: String, partition: Int): Option[Replica] = replicaManager.getReplica(topic, partition) + def stopReplica(topic: String, partition: Int): Short = { + replicaManager.stopReplica(topic, partition) + } + def getLogManager(): LogManager = logManager def getStats(): SocketServerStats = socketServer.stats Index: src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- src/main/scala/kafka/server/ReplicaManager.scala (revision 1355197) +++ src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -13,22 +13,26 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.server import kafka.log.Log import kafka.cluster.{Partition, Replica} -import collection.mutable +import collection._ import java.lang.IllegalStateException import mutable.ListBuffer import org.I0Itec.zkclient.ZkClient import java.util.concurrent.locks.ReentrantLock import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging} -import kafka.common.InvalidPartitionException +import kafka.api.LeaderAndISR +import kafka.common.{ErrorMapping, InvalidPartitionException} +import java.io.File +import kafka.utils.Utils -class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging { - private var allReplicas = new mutable.HashMap[(String, Int), Partition]() +class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient) extends Logging { + + var allReplicas = new mutable.HashMap[(String, Int), Partition]() private var leaderReplicas = new ListBuffer[Partition]() private val leaderReplicaLock = new ReentrantLock() private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true) @@ -58,10 +62,24 @@ partition.assignedReplicas(Some(assignedReplicas)) // get the replica objects for the assigned replicas for this partition info("Added local replica %d for topic %s partition %s on broker %d" - .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId)) + .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId)) localReplica } + def stopReplica(topic: String, partition: Int): Short = { + val errorCode = ErrorMapping.NoError + val replica = getReplica(topic, partition) + if(replica.isDefined){ + replicaFetcherManager.removeFetcher(topic, partition) + if(replica.get.log.isDefined){ + val parititionDir: File = replica.get.log.get.dir + Utils.rm(parititionDir) + } + } + errorCode + } + + def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = { val newPartition = allReplicas.contains((topic, partitionId)) newPartition match { @@ -76,13 +94,22 @@ } } + + def partitionExists(topic: String, partitionId: Int): Boolean = { + val partitionOpt = allReplicas.get((topic, partitionId)) + partitionOpt match { + case Some(partition) => true + case None => false + } + } + def ensurePartitionExists(topic: String, partitionId: Int): Partition = { val partitionOpt = allReplicas.get((topic, partitionId)) partitionOpt match { case Some(partition) => partition case None => throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d" - .format(topic, partitionId, config.brokerId)) + .format(topic, partitionId, config.brokerId)) } } @@ -92,7 +119,7 @@ val replicaAdded = partition.addReplica(remoteReplica) if(replicaAdded) info("Added remote replica %d for topic %s partition %s on broker %d" - .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId)) + .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId)) remoteReplica } @@ -113,7 +140,7 @@ Some(replicas.leaderReplica()) case None => throw new IllegalStateException("Getting leader replica failed. Partition replica metadata for topic " + - "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId)) + "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId)) } } @@ -139,12 +166,15 @@ } } - def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) { + def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = { + info("Broker %d becoming Leader for topic [%s] partition [%d]" + .format(config.brokerId, replica.topic, replica.partition.partitionId)) + // stop replica fetcher thread, if any replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) // read and cache the ISR replica.partition.leaderId(Some(replica.brokerId)) - replica.partition.updateISR(currentISRInZk.toSet) + replica.partition.updateISR(leaderAndISR.ISR.toSet) // also add this partition to the list of partitions for which the leader is the current broker try { leaderReplicaLock.lock() @@ -152,11 +182,12 @@ }finally { leaderReplicaLock.unlock() } + ErrorMapping.NoError } - def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) { + def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = { info("broker %d intending to follow leader %d for topic %s partition %d" - .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) + .format(config.brokerId, leaderAndISR.leader, replica.topic, replica.partition.partitionId)) // remove this replica's partition from the ISR expiration queue try { leaderReplicaLock.lock() @@ -170,37 +201,41 @@ case None => } // get leader for this replica - val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head + val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderAndISR.leader)).head + debug("For follower [%d], the leaderBroker is [%d]".format(config.brokerId, leaderBroker.id)) + val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId) + // Become follower only if it is not already following the same leader if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) { info("broker %d becoming follower to leader %d for topic %s partition %d" - .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) + .format(config.brokerId, leaderBroker.id, replica.topic, replica.partition.partitionId)) // stop fetcher thread to previous leader replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) // start fetcher thread to current leader replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker) } + ErrorMapping.NoError } def maybeShrinkISR(): Unit = { try { info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR" - .format(config.keepInSyncTimeMs)) + .format(config.keepInSyncTimeMs)) leaderReplicaLock.lock() leaderReplicas.foreach { partition => - // shrink ISR if a follower is slow or stuck + // shrink ISR if a follower is slow or stuck val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.keepInSyncTimeMs, config.keepInSyncBytes) if(outOfSyncReplicas.size > 0) { val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.size > 0) info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId, - newInSyncReplicas.map(_.brokerId).mkString(","))) + newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in memory - partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient)) + partition.updateISR(newInSyncReplicas.map(_.brokerId)) } - } + } }catch { case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1) }finally { @@ -216,7 +251,7 @@ replica.logEndOffset() >= leaderHW } else throw new IllegalStateException("Replica %s is not in the assigned replicas list for ".format(replica.toString) + - " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId)) + " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId)) } def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = { @@ -228,11 +263,11 @@ if(checkIfISRCanBeExpanded(replica)) { val newISR = replica.partition.inSyncReplicas + replica // update ISR in ZK and cache - replica.partition.updateISR(newISR.map(_.brokerId), Some(zkClient)) + replica.partition.updateISR(newISR.map(_.brokerId)) } maybeIncrementLeaderHW(replica) case None => - throw new IllegalStateException("No replica %d in replica manager on %d".format(replicaId, config.brokerId)) + throw new IllegalStateException("No replica [%d] in replica manager on broker [%d]".format(replicaId, config.brokerId)) } } @@ -242,7 +277,7 @@ case Some(replica) => replica.logEndOffsetUpdateTime(Some(time.milliseconds)) case None => - throw new IllegalStateException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId)) + throw new IllegalStateException("No replica [%d] in replica manager on [%d]".format(config.brokerId, config.brokerId)) } } Index: src/main/scala/kafka/server/KafkaController.scala =================================================================== --- src/main/scala/kafka/server/KafkaController.scala (revision 1355197) +++ src/main/scala/kafka/server/KafkaController.scala (working copy) @@ -16,8 +16,8 @@ */ package kafka.server -import kafka.common.KafkaZookeeperClient import collection.mutable.HashMap +import collection._ import collection.immutable.Set import kafka.cluster.Broker import kafka.api._ @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener} import org.apache.zookeeper.Watcher.Event.KeeperState +import collection.JavaConversions._ class RequestSendThread(val brokerId: Int, @@ -82,6 +83,7 @@ class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) extends Logging{ private val brokers = new HashMap[Int, Broker] + allBrokers.map(b => brokers.put(b.id, b)) private val messageChannels = new HashMap[Int, BlockingChannel] private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]] private val messageThreads = new HashMap[Int, RequestSendThread] @@ -104,9 +106,11 @@ thread.start() messageThreads.put(broker.id, thread) } + info("Controller channel manager start up, see cached brokers: %s".format(brokers.keySet)) } def shutDown() = { + info("At controller shut down, see cached brokers: %s".format(brokers.keySet)) for((brokerId, broker) <- brokers){ removeBroker(brokerId) } @@ -117,9 +121,9 @@ } def addBroker(broker: Broker){ + debug("controller add cached broker " + broker.id) brokers.put(broker.id, broker) messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)) - info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port) val channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, @@ -130,6 +134,7 @@ thread.setDaemon(false) thread.start() messageThreads.put(broker.id, thread) + debug("controller finishes adding cached broker " + broker.id) } def removeBroker(brokerId: Int){ @@ -142,57 +147,91 @@ } } -class KafkaController(config : KafkaConfig) extends Logging { +class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging { info("controller startup"); - private val lock = new Object + private val controllerLock = new Object - private var zkClient: ZkClient = null private var controllerChannelManager: ControllerChannelManager = null private var allBrokers : Set[Broker] = null private var allTopics: Set[String] = null + private var allTopicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]] = null - private def tryToBecomeController() = { - lock synchronized { - val curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) - if (curController == null){ - try { - ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString()) + // Return true if this controller succeeds in the controller competition + private def tryToBecomeController(): Boolean = { + controllerLock synchronized { + try { + val curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 + if (curController == null){ + ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString) // Only the broker successfully registering as the controller can execute following code, otherwise // some exception will be thrown. registerBrokerChangeListener() registerTopicChangeListener() allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet + debug("Controller %d see all brokers %s".format(config.brokerId, allBrokers.map(_.id))) allTopics = ZkUtils.getAllTopics(zkClient).toSet + allTopicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, allTopics.iterator) controllerChannelManager = new ControllerChannelManager(allBrokers, config) controllerChannelManager.startUp() - } catch { - case e: ZkNodeExistsException => - registerControllerExistListener() - info("Broker " + config.brokerId + " didn't succeed registering as the controller since it's taken by someone else") - case e2 => throw e2 + true } + else{ + info("In controller election, broker " + config.brokerId + " see current controller not null: " + curController) + false + } + } catch { + case e: ZkNodeExistsException => + registerControllerExistListener() + info("Broker " + config.brokerId + " didn't succeed registering as the controller since it's taken by someone else") + false + case e2 => throw e2 } - else info("Broker " + config.brokerId + " see not null skip " + " current controller " + curController) } } + + private def controllerRegisterOrFailover(){ + info("Controller start or failover, broker " + config.brokerId + " try to become controller") + if(tryToBecomeController() == true){ + debug("Broker %d won the controller competition and work on leader and isr recovery".format(config.brokerId)) + leaderAndISRRecovery() + debug("Kafka controller %d work on broker changes".format(config.brokerId)) + onBrokerChange() + + // If there are some partition with leader not initialized, init the leader for them + val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator) + for((topicPartition, replicas) <- partitionReplicaAssignment){ + val leader = ZkUtils.getLeaderForPartition(zkClient, topicPartition._1, topicPartition._2) + if(leader.isDefined) + partitionReplicaAssignment.remove(topicPartition) + } + debug("Kafka controller %d work on init leaders: %s".format(config.brokerId, partitionReplicaAssignment.toString())) + initLeaders(partitionReplicaAssignment) + } + } + + + def isActive(): Boolean = { controllerChannelManager != null } def startup() = { - zkClient = KafkaZookeeperClient.getZookeeperClient(config) registerSessionExpirationListener() registerControllerExistListener() - tryToBecomeController() + controllerRegisterOrFailover() } def shutDown() = { - if(controllerChannelManager != null){ - controllerChannelManager.shutDown() + controllerLock synchronized { + if(controllerChannelManager != null){ + info("Shutting down kafka controller with id %d".format(config.brokerId)) + controllerChannelManager.shutDown() + controllerChannelManager = null + info("Kafka controller with id %d shutted down completely".format(config.brokerId)) + } } - zkClient.close() } def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = { @@ -234,15 +273,110 @@ controllerChannelManager.shutDown() controllerChannelManager = null info("Controller session expires, the channel manager shut downr: " + config.brokerId) - tryToBecomeController() + controllerRegisterOrFailover() } } + private def leaderAndISRRecovery(){ + val leaderAndISRInfos = ZkUtils.getPartitionLeaderAndISRFroTopics(zkClient, allTopics.iterator) + // Send the request only when there are some leader and ISR information to recovery + if (leaderAndISRInfos.size > 0){ + val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.IsInit, leaderAndISRInfos) + for(brokerId <- allBrokers.map(_.id)) + sendRequest(brokerId, leaderAndISRRequest) + } + } + + private def initLeaders(topicPartitionReplicaAssignment: collection.mutable.Map[(String, Int), collection.Set[Int]]) { + val leaderAndISRInfos = new collection.mutable.HashMap[(String, Int), LeaderAndISR] + val affectedBrokers = new collection.mutable.HashSet[Int] + + for((topicPartition, replicaAssignment) <- topicPartitionReplicaAssignment) { + val liveAssignedReplicas = allBrokers.map(_.id).filter(b => replicaAssignment.contains(b)) + debug("For topic [%s], partition [%d], live assigned replicas are: [%s]" + .format(topicPartition._1, + topicPartition._2, + liveAssignedReplicas.toString)) + if(!liveAssignedReplicas.isEmpty){ + val leader = liveAssignedReplicas.head + val leaderAndISR = new LeaderAndISR(leader, liveAssignedReplicas.toList) + debug("The leader and Replica for partition: (%s, %d) is %s".format(topicPartition._1, topicPartition._2, leaderAndISR.toString)) + leaderAndISRInfos.put(topicPartition, leaderAndISR) + affectedBrokers ++= liveAssignedReplicas + ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2.toString), leaderAndISR.toString) + } + } + debug("Affected brokers are: [%s]".format(affectedBrokers.toString)) + val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.IsInit, leaderAndISRInfos) + for(brokerId <- affectedBrokers){ + sendRequest(brokerId, leaderAndISRRequest) + } + } + + + private def onBrokerChange(){ + val liveBrokerIds: Set[Int] = allBrokers.map(_.id) + val leaderAndISRInfos = new collection.mutable.HashMap[(String, Int), LeaderAndISR] + val affectedBrokers = new collection.mutable.HashSet[Int] + // Find the set of affected partitions + val allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator) + for((topicPartition, assignedReplicas) <- allPartitionReplicaAssignment){ + val topic = topicPartition._1 + val partition = topicPartition._2 + debug("The replicas assigned to [%s, %d] are [%s]".format(topic, partition, assignedReplicas.toString)) + var noNeedToChangeLeaderOrUpdateLeaderISRZKPathSucceeded: Boolean = false + while(!noNeedToChangeLeaderOrUpdateLeaderISRZKPathSucceeded){ + val curLeaderAndISR = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition).get + val leader = curLeaderAndISR.leader + val leaderEpoc = curLeaderAndISR.leaderEpoc + val ISR = curLeaderAndISR.ISR + val curZkPathVersion = curLeaderAndISR.zkVersion + debug("Leader, epoc, ISR and zkPathVersion for partition (%s, %d) are: %d, %d, %s, %d".format(topic, partition, leader, leaderEpoc, ISR.toString, curZkPathVersion)) + + // The leader is no long alive, need reelection, we only care about the leader change here, the ISR change can be handled by the leader + if (!liveBrokerIds.contains(leader)){ + var leaderAndISR: LeaderAndISR = null + // The ISR contains at least 1 broker in the live broker list + val liveBrokersInISR = ISR.filter(r => liveBrokerIds.contains(r)) + + if(!liveBrokersInISR.isEmpty){ + val newLeader = liveBrokersInISR.head + leaderAndISR = new LeaderAndISR(newLeader, leaderEpoc +1, liveBrokersInISR.toList, curZkPathVersion + 1) + debug("Some broker in ISR is avlie, New leader and ISR is %s".format(leaderAndISR.toString())) + leaderAndISRInfos.put((topic, partition), leaderAndISR) + affectedBrokers.addAll(liveBrokersInISR) + } + else{ + val liveAssignedReplicas = assignedReplicas.intersect(liveBrokerIds) + debug("Live broker in ISR is empty, see live assigned replicas: %s".format(liveAssignedReplicas.mkString(","))) + if (!liveAssignedReplicas.isEmpty){ + val newLeader = liveAssignedReplicas.head + val newISR = List(newLeader) + leaderAndISR = new LeaderAndISR(newLeader, leaderEpoc + 1, newISR, curZkPathVersion + 1) + debug("No broker in ISR is alive, New leader and ISR is %s".format(leaderAndISR.toString())) + leaderAndISRInfos.put((topic, partition), leaderAndISR) + affectedBrokers.add(newLeader) + } + } + debug("The leader and ISR converted string: %s".format(leaderAndISR.toString)) + noNeedToChangeLeaderOrUpdateLeaderISRZKPathSucceeded = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partition.toString), leaderAndISR.toString, curZkPathVersion) + } + else + noNeedToChangeLeaderOrUpdateLeaderISRZKPathSucceeded = true + } + val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos) + for(brokerId <- affectedBrokers){ + sendRequest(brokerId, leaderAndISRRequest) + } + } + } + + class BrokerChangeListener() extends IZkChildListener with Logging { def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) { import scala.collection.JavaConversions._ - lock synchronized { - info("Broker change listener at controller triggerred") + controllerLock synchronized { + info("Broker change listener at controller triggered") val allBrokerIds = allBrokers.map(_.id) val curChildrenSeq: Seq[String] = javaCurChildren val curBrokerIdsSeq = curChildrenSeq.map(_.toInt) @@ -254,25 +388,54 @@ info("Deleted brokers: " + deletedBrokerIds.toString()) allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet + addedBrokersSeq.map(controllerChannelManager.addBroker(_)) + deletedBrokerIds.map(controllerChannelManager.removeBroker(_)) - for(broker <- addedBrokersSeq){ - controllerChannelManager.addBroker(broker) - } - for (brokerId <- deletedBrokerIds){ - controllerChannelManager.removeBroker(brokerId) - } + onBrokerChange() /** TODO: add other broker change handler logic**/ } } } + private def handleNewTopics(topics: Set[String]) { + // get relevant partitions to this broker + val topicPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,topics.iterator) + initLeaders(topicPartitionReplicaAssignment) + } + + private def handleDeletedTopics(topics: Set[String]){ + val affectedBrokers = new collection.mutable.HashSet[Int] + val partitionsToStopReplica = new collection.mutable.HashSet[(String, Int)] + for(topic <- topics){ + val allPartitions = allTopicPartitionAssignment.get(topic).get + for((partition, replicas) <- allPartitions){ + partitionsToStopReplica.add((topic, partition)) + affectedBrokers.addAll(replicas) + } + } + val stopReplicaRequest = new StopReplicaRequest(partitionsToStopReplica) + affectedBrokers.map(b => sendRequest(b, stopReplicaRequest)) + } + class TopicChangeListener extends IZkChildListener with Logging { @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - // TODO: Incomplete, do not need to review this time + import collection.JavaConversions + controllerLock synchronized { + info("Topic/partition change listener fired for path " + parentPath) + val currentChildren = JavaConversions.asBuffer(curChilds).toSet + val newTopics = currentChildren -- allTopics + val deletedTopics = allTopics -- currentChildren + allTopics = currentChildren + info("New topics: [%s]. Deleted topics: [%s]".format(newTopics, deletedTopics)) + handleNewTopics(newTopics) + handleDeletedTopics(deletedTopics) + allTopicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, allTopics.iterator) + } } } + class ControllerExistListener extends IZkDataListener with Logging { @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { @@ -281,8 +444,10 @@ @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { - info("Controller fail over, broker " + config.brokerId + " try to become controller") - tryToBecomeController() + controllerLock synchronized { + info("The controller failed, the broker %d competes to be new controller".format(config.brokerId)) + controllerRegisterOrFailover() + } } } } \ No newline at end of file Index: src/main/scala/kafka/server/StateChangeCommandHandler.scala =================================================================== --- src/main/scala/kafka/server/StateChangeCommandHandler.scala (revision 1355197) +++ src/main/scala/kafka/server/StateChangeCommandHandler.scala (working copy) @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.server - -import kafka.utils.{ZkQueue, Logging} -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.CountDownLatch - -class StateChangeCommandHandler(name: String, config: KafkaConfig, stateChangeQ: ZkQueue, - ensureStateChangeCommandValidityOnThisBroker: (StateChangeCommand) => Boolean, - ensureEpochValidity: (StateChangeCommand) => Boolean) extends Thread(name) with Logging { - val isRunning: AtomicBoolean = new AtomicBoolean(true) - private val shutdownLatch = new CountDownLatch(1) - - override def run() { - try { - while(isRunning.get()) { - // get outstanding state change requests for this broker - val command = stateChangeQ.take() - val stateChangeCommand = StateChangeCommand.getStateChangeRequest(command._2) - ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand) - - stateChangeCommand match { - case StartReplica(topic, partition, epoch) => - if(ensureEpochValidity(stateChangeCommand)) - handleStartReplica(topic, partition) - case CloseReplica(topic, partition, epoch) => - /** - * close replica requests are sent as part of delete topic or partition reassignment process - * To ensure that a topic will be deleted even if the broker is offline, this state change should not - * be protected with the epoch validity check - */ - handleCloseReplica(topic, partition) - } - stateChangeQ.remove(command) - } - }catch { - case e: InterruptedException => info("State change command handler interrupted. Shutting down") - case e1 => error("Error in state change command handler. Shutting down due to ", e1) - } - shutdownComplete() - } - - private def shutdownComplete() = shutdownLatch.countDown - - def shutdown() { - isRunning.set(false) - interrupt() - shutdownLatch.await() - info("State change command handler shutdown completed") - } - - def handleStartReplica(topic: String, partition: Int) { - info("Received start replica state change command for topic %s partition %d on broker %d" - .format(topic, partition, config.brokerId)) - // TODO: implement this as part of create topic support or partition reassignment support. Until then, it is unused - } - - def handleCloseReplica(topic: String, partition: Int) { - info("Received close replica state change command for topic %s partition %d on broker %d" - .format(topic, partition, config.brokerId)) - // TODO: implement this as part of delete topic support. Until then, it is unused - } -} \ No newline at end of file Index: src/main/scala/kafka/server/KafkaRequestHandler.scala =================================================================== --- src/main/scala/kafka/server/KafkaRequestHandler.scala (revision 1355197) +++ src/main/scala/kafka/server/KafkaRequestHandler.scala (working copy) @@ -21,10 +21,11 @@ import kafka.utils._ import java.util.concurrent.atomic.AtomicLong + /** * A thread that answers kafka requests. */ -class KafkaRequestHandler(val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { +class KafkaRequestHandler(val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { def run() { while(true) { @@ -40,10 +41,10 @@ } -class KafkaRequestHandlerPool(val requestChannel: RequestChannel, - val apis: KafkaApis, +class KafkaRequestHandlerPool(val requestChannel: RequestChannel, + val apis: KafkaApis, numThreads: Int) extends Logging { - + val threads = new Array[Thread](numThreads) val runnables = new Array[KafkaRequestHandler](numThreads) for(i <- 0 until numThreads) { Index: src/main/scala/kafka/server/StateChangeCommand.scala =================================================================== --- src/main/scala/kafka/server/StateChangeCommand.scala (revision 1355197) +++ src/main/scala/kafka/server/StateChangeCommand.scala (working copy) @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.server - -import util.parsing.json.JSON -import java.lang.IllegalStateException -import kafka.utils.{Utils, Logging} -import collection.mutable.HashMap - -object StateChangeCommand extends Logging { - val State = "state" - val Topic = "topic" - val Partition = "partition" - val Epoch = "epoch" - val StartReplica = "start-replica" - val CloseReplica = "close-replica" - - def getStateChangeRequest(requestJson: String): StateChangeCommand = { - var topMap : Map[String, String] = null - try { - JSON.parseFull(requestJson) match { - case Some(m) => - topMap = m.asInstanceOf[Map[String, String]] - val topic = topMap.get(StateChangeCommand.Topic).getOrElse(null) - val partition = topMap.get(StateChangeCommand.Partition).getOrElse("-1").toInt - val epoch = topMap.get(StateChangeCommand.Epoch).getOrElse("-1").toInt - val requestOpt = topMap.get(StateChangeCommand.State) - requestOpt match { - case Some(request) => - request match { - case StartReplica => new StartReplica(topic, partition, epoch) - case CloseReplica => new CloseReplica(topic, partition, epoch) - case _ => throw new IllegalStateException("Unknown state change request " + request) - } - case None => - throw new IllegalStateException("Illegal state change request JSON " + requestJson) - } - case None => throw new RuntimeException("Error parsing state change request : " + requestJson) - } - } catch { - case e => - error("Error parsing state change request JSON " + requestJson, e) - throw e - } - } -} - -sealed trait StateChangeCommand extends Logging { - def state: String - - def topic: String - - def partition: Int - - def epoch: Int - - def toJson(): String = { - val jsonMap = new HashMap[String, String] - jsonMap.put(StateChangeCommand.State, state) - jsonMap.put(StateChangeCommand.Topic, topic) - jsonMap.put(StateChangeCommand.Partition, partition.toString) - jsonMap.put(StateChangeCommand.Epoch, epoch.toString) - Utils.stringMapToJsonString(jsonMap) - } -} - -/* The elected leader sends the start replica state change request to all the new replicas that have been assigned -* a partition. Note that the followers must act on this request only if the request epoch == latest partition epoch or -1 */ -case class StartReplica(val topic: String, partition: Int, epoch: Int) extends StateChangeCommand { - val state: String = StateChangeCommand.StartReplica -} - -/* The elected leader sends the close replica state change request to all the replicas that have been un-assigned a partition -* OR if a topic has been deleted. Note that the followers must act on this request even if the epoch has changed */ -case class CloseReplica(topic: String, partition: Int, epoch: Int) extends StateChangeCommand { - val state: String = StateChangeCommand.CloseReplica -} Index: src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- src/main/scala/kafka/server/KafkaApis.scala (revision 1355197) +++ src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -25,20 +25,26 @@ import kafka.log._ import kafka.message._ import kafka.network._ -import kafka.utils.{SystemTime, Logging} import org.apache.log4j.Logger import scala.collection._ import mutable.HashMap import scala.math._ import java.lang.IllegalStateException import kafka.network.RequestChannel.Response +import kafka.utils.{ZkUtils, SystemTime, Logging} +import kafka.cluster.Replica /** * Logic to handle the various Kafka requests */ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, - val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging { + val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper, + addReplicaCbk: (String, Int, Set[Int]) => Replica, + stopReplicaCbk: (String, Int) => Short, + becomeLeader: (Replica, LeaderAndISR) => Short, + becomeFollower: (Replica, LeaderAndISR) => Short, + brokerId: Int) extends Logging { private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel) private val requestLogger = Logger.getLogger("kafka.request.logger") @@ -61,14 +67,44 @@ def handleLeaderAndISRRequest(request: RequestChannel.Request){ + val responseMap = new HashMap[(String, Int), Short] val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer) - val responseMap = new HashMap[(String, Int), Short] + for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){ + var errorCode = ErrorMapping.NoError + // This partition is relevant to current broker + if(leaderAndISR.ISR.contains(brokerId)){ + val topic = partitionInfo._1 + val partition = partitionInfo._2 - // TODO: put in actual logic later - for((key, value) <- leaderAndISRRequest.leaderAndISRInfos){ - responseMap.put(key, ErrorMapping.NoError) + // If the partition does not exist locally, create it + if(!replicaManager.partitionExists(topic, partition)){ + val assignedReplicas = ZkUtils.getReplicasForPartition(kafkaZookeeper.getZookeeperClient, topic, partition) + info("Assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString)) + if(assignedReplicas.contains(brokerId)) { + val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet) + info("Starting replica for topic [%s] partition [%d] on broker [%d]" + .format(replica.topic, replica.partition.partitionId, replica.brokerId)) + } + } + val replica = replicaManager.getReplica(topic, partition).get + // The command ask this broker to be new leader for P and it isn't the leader yet + val requestedLeaderId = leaderAndISR.leader + // If the broker is requested to be the leader and it's not current the leader (the leader id is set and not equal to broker id) + if(requestedLeaderId == brokerId && + (!replica.partition.leaderId().isDefined || replica.partition.leaderId().get!= brokerId)) + errorCode = becomeLeader(replica, leaderAndISR) + else + errorCode = becomeFollower(replica, leaderAndISR) + } + responseMap.put(partitionInfo, errorCode) } + if(leaderAndISRRequest.isInit == 1){ + val partitionsToRemove = replicaManager.allReplicas.keySet.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p)) + info("Partitions to remove: %s".format(partitionsToRemove)) + partitionsToRemove.map(p => replicaManager.allReplicas.remove(p)) + } + val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse), -1)) } @@ -78,9 +114,9 @@ val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer) val responseMap = new HashMap[(String, Int), Short] - // TODO: put in actual logic later for((topic, partition) <- stopReplicaRequest.stopReplicaSet){ - responseMap.put((topic, partition), ErrorMapping.NoError) + var errorCode = stopReplicaCbk(topic, partition) + responseMap.put((topic, partition), errorCode) } val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse), -1)) @@ -265,7 +301,7 @@ assert(replicaOpt.isDefined, "No replica %d in replica manager on %d" .format(fetchRequest.replicaId, replicaManager.config.brokerId)) val replica = replicaOpt.get - debug("Leader %d for topic %s partition %d received fetch request from follower %d" + debug("Leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]" .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) } Index: src/main/scala/kafka/api/StopReplicaRequest.scala =================================================================== --- src/main/scala/kafka/api/StopReplicaRequest.scala (revision 1355197) +++ src/main/scala/kafka/api/StopReplicaRequest.scala (working copy) @@ -24,8 +24,9 @@ import collection.mutable.Set object StopReplicaRequest { - val CurrentVersion = 1.shortValue() + val InitialVersion = 1.shortValue() val DefaultClientId = "" + val DefaultAckTimeout = 1000 def readFrom(buffer: ByteBuffer): StopReplicaRequest = { val versionId = buffer.getShort @@ -45,8 +46,8 @@ ackTimeout: Int, stopReplicaSet: Set[(String, Int)] ) extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) { - def this(ackTimeout: Int, stopReplicaSet: Set[(String, Int)]) = { - this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeout, stopReplicaSet) + def this(stopReplicaSet: Set[(String, Int)]) = { + this(StopReplicaRequest.InitialVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet) } def writeTo(buffer: ByteBuffer) { Index: src/main/scala/kafka/api/LeaderAndISRRequest.scala =================================================================== --- src/main/scala/kafka/api/LeaderAndISRRequest.scala (revision 1355197) +++ src/main/scala/kafka/api/LeaderAndISRRequest.scala (working copy) @@ -25,34 +25,49 @@ object LeaderAndISR { + val initialLeaderEpoc: Int = 1 + val initialZKVersion: Int = 0 def readFrom(buffer: ByteBuffer): LeaderAndISR = { val leader = buffer.getInt val leaderGenId = buffer.getInt val ISRString = Utils.readShortString(buffer, "UTF-8") val ISR = ISRString.split(",").map(_.toInt).toList - val zkVersion = buffer.getLong + val zkVersion = buffer.getInt new LeaderAndISR(leader, leaderGenId, ISR, zkVersion) } } -case class LeaderAndISR(leader: Int, leaderEpoc: Int, ISR: List[Int], zkVersion: Long){ +case class LeaderAndISR(leader: Int, leaderEpoc: Int, ISR: List[Int], zkVersion: Int){ + def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndISR.initialLeaderEpoc, ISR, LeaderAndISR.initialZKVersion) + def writeTo(buffer: ByteBuffer) { buffer.putInt(leader) buffer.putInt(leaderEpoc) Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8") - buffer.putLong(zkVersion) + buffer.putInt(zkVersion) } def sizeInBytes(): Int = { - val size = 4 + 4 + (2 + ISR.mkString(",").length) + 8 + val size = 4 + 4 + (2 + ISR.mkString(",").length) + 4 size } + + override def toString(): String = { + val jsonDataMap = new HashMap[String, String] + jsonDataMap.put("leader", leader.toString) + jsonDataMap.put("leaderEpoc", leaderEpoc.toString) + jsonDataMap.put("ISR", ISR.mkString(",")) + Utils.stringMapToJsonString(jsonDataMap) + } } object LeaderAndISRRequest { - val CurrentVersion = 1.shortValue() + val InitialLeaderAndISRRequestVersion = 1.shortValue() val DefaultClientId = "" + val IsInit: Byte = 1 + val NotInit: Byte = 1 + val DefaultAckTimeout: Int = 1000 def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = { val versionId = buffer.getShort @@ -78,11 +93,10 @@ clientId: String, isInit: Byte, ackTimeout: Int, - leaderAndISRInfos: - Map[(String, Int), LeaderAndISR]) + leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) { - def this(isInit: Byte, ackTimeout: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = { - this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeout, leaderAndISRInfos) + def this(isInit: Byte, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = { + this(LeaderAndISRRequest.InitialLeaderAndISRRequestVersion, LeaderAndISRRequest.DefaultClientId, isInit, LeaderAndISRRequest.DefaultAckTimeout, leaderAndISRInfos) } def writeTo(buffer: ByteBuffer) {