Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1310924) +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy) @@ -24,7 +24,6 @@ import org.junit.Test import kafka.api._ import kafka.cluster.Broker -import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.producer.async._ import kafka.serializer.{StringEncoder, StringDecoder, Encoder} @@ -35,6 +34,7 @@ import collection.mutable.ListBuffer import org.scalatest.junit.JUnit3Suite import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils} +import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException, InvalidConfigException, QueueFullException} class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (revision 1310924) +++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (working copy) @@ -22,7 +22,8 @@ import kafka.admin.CreateTopicCommand import kafka.utils.TestUtils._ import junit.framework.Assert._ -import kafka.utils.{Utils, TestUtils} +import kafka.utils.{ZkUtils, ZKStringSerializer, Utils, TestUtils} +import org.I0Itec.zkclient.ZkClient class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -35,27 +36,23 @@ val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] override def setUp() { super.setUp() - - // start both servers - val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) - val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) - - servers ++= List(server1, server2) + zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer) } override def tearDown() { - // shutdown the servers and delete data hosted on them - servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDir)) - super.tearDown() } def testLeaderElectionWithCreateTopic { + var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + // start both servers + val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) + val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) + + servers ++= List(server1, server2) // start 2 brokers val topic = "new-topic" val partitionId = 0 @@ -64,15 +61,16 @@ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") // wait until leader is elected - var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) + var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200) + assertTrue("Leader should get elected", leader.isDefined) + // NOTE: this is to avoid transient test failures + assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1)) - assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1)) - // kill the server hosting the preferred replica servers.head.shutdown() // check if leader moves to the other server - leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000) + leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) Thread.sleep(zookeeper.tickTime) @@ -81,7 +79,6 @@ servers.head.startup() leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) - // TODO: Once the optimization for preferred replica re-election is in, this check should change to broker 0 assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1)) // shutdown current leader (broker 1) @@ -90,5 +87,41 @@ // test if the leader is the preferred replica assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1)) + // shutdown the servers and delete data hosted on them + servers.map(server => server.shutdown()) + servers.map(server => Utils.rm(server.config.logDir)) } + + // Assuming leader election happens correctly, test if epoch changes as expected + def testEpoch() { + // keep switching leaders to see if epoch changes correctly + val topic = "new-topic" + val partitionId = 0 + + // setup 2 brokers in ZK + val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2)) + + try { + // create topic with 1 partition, 2 replicas, one on each broker + CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") + + var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0) + assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined) + assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get) + + ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString)) + newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1) + assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined) + assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get) + + ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString)) + newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0) + assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined) + assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get) + + }finally { + TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2)) + } + + } } \ No newline at end of file Index: core/src/test/scala/unit/kafka/server/StateChangeTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/StateChangeTest.scala (revision 0) +++ core/src/test/scala/unit/kafka/server/StateChangeTest.scala (revision 0) @@ -0,0 +1,112 @@ +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ +import kafka.zk.ZooKeeperTestHarness +import org.I0Itec.zkclient.ZkClient +import kafka.utils.{ZKQueue, TestUtils, ZKStringSerializer} +import kafka.common.QueueFullException + +class StateChangeTest extends JUnit3Suite with ZooKeeperTestHarness { + + val brokerId1 = 0 + val port1 = TestUtils.choosePort() + var stateChangeQ: ZKQueue = null + val config = new KafkaConfig(TestUtils.createBrokerConfig(brokerId1, port1)) + + override def setUp() { + super.setUp() + + // create a queue + val queuePath = "/brokers/state/" + config.brokerId + stateChangeQ = new ZKQueue(zkClient, queuePath, 10) + } + + override def tearDown() { + super.tearDown() + } + + def testZkQueuePoll() { + // test put API + val itemPath = stateChangeQ.put("test:0:follower") + val item = itemPath.split("/").last.split("-").last.toInt + assertEquals(0, item) + + // dequeue the above item + val dequeuedItem = stateChangeQ.poll() + assertTrue("Queue should have one item", dequeuedItem.isDefined) + assertEquals("test:0:follower", dequeuedItem.get._2) + assertEquals("item-0000000000", dequeuedItem.get._1) + } + + def testZkQueuePeek() { + val itemPath = stateChangeQ.put("test:0:follower") + val item = itemPath.split("/").last.split("-").last.toInt + assertEquals(0, item) + + val peekedItem = stateChangeQ.peek + assertTrue("Queue should have one item", peekedItem.isDefined) + assertEquals("test:0:follower", peekedItem.get._2) + + val peekedItem2 = stateChangeQ.peek + assertTrue("Queue should have one item", peekedItem2.isDefined) + assertEquals("test:0:follower", peekedItem2.get._2) + } + + def testZkQueueDrainAll() { + for(i <- 0 until 5) { + val itemPath = stateChangeQ.put("test:0:follower") + val item = itemPath.split("/").last.split("-").last.toInt + assertEquals(i, item) + } + + val items = stateChangeQ.drainAll() + assertEquals(5, items.size) + for(i <- 0 until 5) + assertEquals("test:0:follower", items(i)._2) + + for(i <- 5 until 10) { + val itemPath = stateChangeQ.put("test:1:follower") + val item = itemPath.split("/").last.split("-").last.toInt + assertEquals(i+5, item) + } + + val moreItems = stateChangeQ.drainAll() + assertEquals(5, moreItems.size) + for(i <- 0 until 5) + assertEquals("test:1:follower", moreItems(i)._2) + + val peekItem = stateChangeQ.peek + assertFalse("Queue should have no more items after drainAll", peekItem.isDefined) + } + + def testZkQueueFull() { + for(i <- 0 until 10) { + val itemPath = stateChangeQ.put("test:0:follower") + val item = itemPath.split("/").last.split("-").last.toInt + assertEquals(i, item) + } + + try { + stateChangeQ.put("test:0:follower") + fail("Queue should be full") + }catch { + case e:QueueFullException => // expected + } + } + + // TODO: Do this after patch for delete topic/delete partition is in + def testStateChangeRequestValidity() { + // mock out the StateChangeRequestHandler + + // setup 3 replicas for one topic partition + + // shutdown follower 1 + + // restart leader to trigger epoch change + + // start follower 1 + + // test follower 1 acted only on one become follower request + } +} \ No newline at end of file Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1310924) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -18,11 +18,11 @@ import async._ import kafka.utils._ -import kafka.common.InvalidConfigException import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import kafka.serializer.Encoder import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} import org.I0Itec.zkclient.ZkClient +import kafka.common.{QueueFullException, InvalidConfigException} class Producer[K,V](config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // for testing only Index: core/src/main/scala/kafka/producer/async/QueueFullException.scala =================================================================== --- core/src/main/scala/kafka/producer/async/QueueFullException.scala (revision 1310924) +++ core/src/main/scala/kafka/producer/async/QueueFullException.scala (working copy) @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.producer.async - -/* Indicates the queue for sending messages is full of unsent messages */ -class QueueFullException(message: String) extends RuntimeException(message) { - def this() = this(null) -} Index: core/src/main/scala/kafka/admin/AdminUtils.scala =================================================================== --- core/src/main/scala/kafka/admin/AdminUtils.scala (revision 1310924) +++ core/src/main/scala/kafka/admin/AdminUtils.scala (working copy) @@ -69,7 +69,6 @@ replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) ret(i) = replicaList.reverse } - ret } @@ -102,14 +101,14 @@ for (i <-0 until partitionMetadata.size) { val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString)) - val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString)) + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitions(i)) val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i)) debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) partitionMetadata(i) = new PartitionMetadata(partitions(i), leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) }, getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)), - getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)), + getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas), None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */) } Some(new TopicMetadata(topic, partitionMetadata)) @@ -117,7 +116,6 @@ None } } - metadataList.toList } Index: core/src/main/scala/kafka/common/NoEpochForPartitionException.scala =================================================================== --- core/src/main/scala/kafka/common/NoEpochForPartitionException.scala (revision 0) +++ core/src/main/scala/kafka/common/NoEpochForPartitionException.scala (revision 0) @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Thrown when a get epoch request is made for partition, but no epoch exists for that partition + */ +class NoEpochForPartitionException(message: String) extends RuntimeException(message) { + def this() = this(null) +} \ No newline at end of file Index: core/src/main/scala/kafka/common/QueueFullException.scala =================================================================== --- core/src/main/scala/kafka/common/QueueFullException.scala (revision 1304473) +++ core/src/main/scala/kafka/common/QueueFullException.scala (working copy) @@ -1,3 +1,5 @@ +package kafka.common + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -15,8 +17,6 @@ * limitations under the License. */ -package kafka.producer.async - /* Indicates the queue for sending messages is full of unsent messages */ class QueueFullException(message: String) extends RuntimeException(message) { def this() = this(null) Index: core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala =================================================================== --- core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (revision 1310924) +++ core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (working copy) @@ -59,7 +59,7 @@ if(validateRebalancingOperation(zkClient, group)) info("Rebalance operation successful !") else - error("Rebalance operation failed !") + error("Rebalance operation failed !") } catch { case e2: Throwable => error("Error while verifying current rebalancing operation", e2) } Index: core/src/main/scala/kafka/utils/ZkUtils.scala =================================================================== --- core/src/main/scala/kafka/utils/ZkUtils.scala (revision 1310924) +++ core/src/main/scala/kafka/utils/ZkUtils.scala (working copy) @@ -25,11 +25,13 @@ import kafka.consumer.TopicCount import org.I0Itec.zkclient.{IZkDataListener, ZkClient} import java.util.concurrent.locks.Condition +import kafka.common.NoEpochForPartitionException object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" + val BrokerStatePath = "/brokers/state" def getTopicPath(topic: String): String ={ BrokerTopicsPath + "/" + topic @@ -59,6 +61,10 @@ getTopicPartitionPath(topic, partitionId) + "/" + "leader" } + def getBrokerStateChangePath(brokerId: Int): String = { + BrokerStatePath + "/" + brokerId + } + def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={ ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted } @@ -69,11 +75,39 @@ } def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = { - val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString)) - if(leader == null) None - else Some(leader.toInt) + val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString)) + if(leaderAndEpoch == null) None + else { + val leaderAndEpochInfo = leaderAndEpoch.split(";") + Some(leaderAndEpochInfo.head.toInt) + } } + /** + * This API should read the epoch in the ISR path. It is sufficient to read the epoch in the ISR path, since if the + * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some + * other broker will retry becoming leader with the same new epoch value. + */ + def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = { + val lastKnownEpoch = try { + val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString)) + if(isrAndEpoch != null) { + val isrAndEpochInfo = isrAndEpoch.split(";") + if(isrAndEpochInfo.last.isEmpty) + throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition)) + else + isrAndEpochInfo.last.toInt + }else { + throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition)) + } + }catch { + case e: ZkNoNodeException => + throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition)) + case e1 => throw e1 + } + lastKnownEpoch + } + def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = { val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString)) if(replicaListString == null) @@ -83,22 +117,61 @@ } } + def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = { + val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString)) + if(replicaListAndEpochString == null) + Seq.empty[Int] + else { + val replicasAndEpochInfo = replicaListAndEpochString.split(";") + Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt) + } + } + def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { val replicas = getReplicasForPartition(zkClient, topic, partition) debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas)) replicas.contains(brokerId.toString) } - def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { + def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = { try { - createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString) - true + // NOTE: first increment epoch, then become leader + val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId) + // TODO: Check the order of updating epoch in leader and ISR path, for failure cases + createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString), + "%d;%d".format(brokerId, newEpoch)) + val currentISR = getInSyncReplicasForPartition(client, topic, partition) + updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString), + "%s;%d".format(currentISR.mkString(","), newEpoch)) + info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition)) + Some(newEpoch) } catch { - case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); false - case oe => false + case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None + case oe => None } } + def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = { + // read previous epoch, increment it and write it to the leader path and the ISR path. + val epoch = try { + Some(getEpochForPartition(client, topic, partition)) + }catch { + case e: NoEpochForPartitionException => None + case e1 => throw e1 + } + + val newEpoch = epoch match { + case Some(partitionEpoch) => + debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, epoch)) + partitionEpoch + 1 + case None => + // this is the first time leader is elected for this partition. So set epoch to 1 + debug("First epoch is 1 for topic %s partition %d".format(topic, partition)) + 1 + } + newEpoch + } + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val broker = new Broker(id, creator, host, port) @@ -186,7 +259,7 @@ /** * Create an persistent node with the given path and data. Create parents if necessary. */ - def createPersistentPath(client: ZkClient, path: String, data: String): Unit = { + def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = { try { client.createPersistent(path, data) } @@ -198,6 +271,10 @@ } } + def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = { + client.createPersistentSequential(path, data) + } + /** * Update the value of a persistent node with the given path and data. * create parrent directory if necessary. Never throw NodeExistException. Index: core/src/main/scala/kafka/utils/ZKQueue.scala =================================================================== --- core/src/main/scala/kafka/utils/ZKQueue.scala (revision 0) +++ core/src/main/scala/kafka/utils/ZKQueue.scala (revision 0) @@ -0,0 +1,78 @@ +package kafka.utils + +import kafka.utils.ZkUtils._ +import org.I0Itec.zkclient.ZkClient +import kafka.common.QueueFullException + +class ZKQueue(zkClient: ZkClient, path: String, size: Int) { + // create the queue in ZK, if one does not exist + makeSurePersistentPathExists(zkClient, path) + + // TODO: This API will be used by the leader to enqueue state change requests to the followers + def put(data: String): String = { + // if queue is full, throw QueueFullException + if(isFull) + throw new QueueFullException("Queue is full. Item %s will be rejected".format(data)) + val queueLocation = createSequentialPersistentPath(zkClient, path + "/item-", data) + debug("Added item %s to queue at location %s".format(data, queueLocation)) + queueLocation + } + + def poll(): Option[(String, String)] = { + val allItems = getChildren(zkClient, path).sorted + allItems.size match { + case 0 => None + case _ => + val item = allItems.head + val stateChangePath = path + "/" + item + val request = ZkUtils.readData(zkClient, stateChangePath) + deletePath(zkClient, stateChangePath) + Some(item, request) + } + } + + def peek: Option[(String, String)] = { + val allItems = getChildren(zkClient, path).sorted + allItems.size match { + case 0 => None + case _ => + val item = allItems.head + val stateChangePath = path + "/" + item + Some(item, ZkUtils.readData(zkClient, stateChangePath)) + } + } + + def remove() = poll() + + def drainAll(): Seq[(String, String)] = { + val allItems = getChildren(zkClient, path).sorted + allItems.size match { + case 0 => Seq.empty[(String, String)] + case _ => allItems.map { item => + // read the data and delete the node + val stateChangePath = path + "/" + item + val data = ZkUtils.readData(zkClient, stateChangePath) + ZkUtils.deletePath(zkClient, stateChangePath) + (item, data) + } + } + } + + def readAll(): Seq[(String, String)] = { + val allItems = getChildren(zkClient, path).sorted + allItems.size match { + case 0 => Seq.empty[(String, String)] + case _ => allItems.map { item => + // read the data and delete the node + val stateChangePath = path + "/" + item + val data = ZkUtils.readData(zkClient, stateChangePath) + (item, data) + } + } + } + + def isEmpty: Boolean = (readAll().size == 0) + + // TODO: Implement the queue shrink operation if the queue is full, as part of create/delete topic + def isFull: Boolean = (readAll().size == size) +} \ No newline at end of file Index: core/src/main/scala/kafka/server/StateChangeRequestHandler.scala =================================================================== --- core/src/main/scala/kafka/server/StateChangeRequestHandler.scala (revision 0) +++ core/src/main/scala/kafka/server/StateChangeRequestHandler.scala (revision 0) @@ -0,0 +1,47 @@ +package kafka.server + +import kafka.utils.{ZKQueue, Logging} + + +class StateChangeRequestHandler(config: KafkaConfig, stateChangeQ: ZKQueue) extends Logging { + + def handleStateChanges(requestValidityCheck: (Int, String) => Boolean, + epochValidityCheck: (Int, String) => Boolean) { + // get outstanding state change requests for this broker + val queuedStateChanges = stateChangeQ.drainAll() + + queuedStateChanges.filter(req => requestValidityCheck(req._1.toInt, req._2)).foreach { + request => + val stateChangeRequestInfo = request._2.split(":") + val topic = stateChangeRequestInfo.head + val partition = stateChangeRequestInfo.take(2).last.toInt + val stateChange = stateChangeRequestInfo.last + val stateChangeRequest = StateChangeRequest.getStateChangeRequest(stateChange) + + stateChangeRequest match { + case StartReplica => + if(epochValidityCheck(request._1.toInt, request._2)) + handleStartReplica(topic, partition) + case CloseReplica => + /** + * close replica requests are sent as part of delete topic or partition reassignment process + * To ensure that a topic will be deleted even if the broker is offline, this state change should not + * be protected with the epoch validity check + */ + handleCloseReplica(topic, partition) + } + } + } + + def handleStartReplica(topic: String, partition: Int) { + info("Received start replica state change request for topic %s partition %d on broker %d" + .format(topic, partition, config.brokerId)) + // TODO: implement this as part of create topic support or partition reassignment support. Until then, it is unused + } + + def handleCloseReplica(topic: String, partition: Int) { + info("Received close replica state change request for topic %s partition %d on broker %d" + .format(topic, partition, config.brokerId)) + // TODO: implement this as part of delete topic support. Until then, it is unused + } +} \ No newline at end of file Index: core/src/main/scala/kafka/server/StateChangeRequest.scala =================================================================== --- core/src/main/scala/kafka/server/StateChangeRequest.scala (revision 0) +++ core/src/main/scala/kafka/server/StateChangeRequest.scala (revision 0) @@ -0,0 +1,21 @@ +package kafka.server + +object StateChangeRequest { + def getStateChangeRequest(request: String): StateChangeRequest = { + request match { + case StartReplica.request => StartReplica + case CloseReplica.request => CloseReplica + case _ => throw new kafka.common.UnknownCodecException("%d is an unknown state change request".format(request)) + } + } +} + +sealed trait StateChangeRequest { def request: String } + +/* The elected leader sends the start replica state change request to all the new replicas that have been assigned +* a partition. Note that the followers must act on this request only if the request epoch == latest partition epoch or -1 */ +case object StartReplica extends StateChangeRequest { val request = "start-replica" } + +/* The elected leader sends the close replica state change request to all the replicas that have been un-assigned a partition +* OR if a topic has been deleted. Note that the followers must act on this request even if the epoch has changed */ +case object CloseReplica extends StateChangeRequest { val request = "close-replica" } Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaZooKeeper.scala (revision 1310924) +++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala (working copy) @@ -23,6 +23,7 @@ import kafka.common.{InvalidPartitionException, KafkaZookeeperClient} import kafka.cluster.Replica import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient} +import java.lang.IllegalStateException /** * Handles the server's interaction with zookeeper. The server needs to register the following paths: @@ -37,17 +38,27 @@ val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId private var zkClient: ZkClient = null var topics: List[String] = Nil - val lock = new Object() var existingTopics: Set[String] = Set.empty[String] val leaderChangeListener = new LeaderChangeListener val topicPartitionsChangeListener = new TopicChangeListener + val stateChangeListener = new StateChangeListener + private var stateChangeHandler: StateChangeRequestHandler = null + private var stateChangeQ: ZKQueue = null + private val topicListenerLock = new Object private val leaderChangeLock = new Object + private val stateChangeLock = new Object def startup() { /* start client */ info("connecting to ZK: " + config.zkConnect) zkClient = KafkaZookeeperClient.getZookeeperClient(config) + val stateChangePath = ZkUtils.getBrokerStateChangePath(config.brokerId) + stateChangeQ = new ZKQueue(zkClient, stateChangePath, config.stateChangeQSize) + stateChangeHandler = new StateChangeRequestHandler(config, stateChangeQ) + zkClient.subscribeChildChanges(stateChangePath, stateChangeListener) + // create state change path if one doesn't exist. This is needed here in the absence of a create cluster admin command + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId)) zkClient.subscribeStateChanges(new SessionExpireListener) registerBrokerInZk() subscribeToTopicAndPartitionsChanges(true) @@ -184,7 +195,6 @@ case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader)) case None => // leader election leaderElection(replica) - } } @@ -201,9 +211,12 @@ }catch { case e => // ignoring } - if(ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)) { - info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId)) - // TODO: Become leader as part of KAFKA-302 + val newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId) + newLeaderEpoch match { + case Some(epoch) => + info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId)) + // TODO: Become leader as part of KAFKA-302 + case None => } } } @@ -233,6 +246,46 @@ } } + class StateChangeListener extends IZkChildListener with Logging { + + @throws(classOf[Exception]) + def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + stateChangeLock.synchronized { + debug("State change listener fired for path %s in broker %d".format(parentPath, config.brokerId)) + import scala.collection.JavaConversions._ + val outstandingRequests = asBuffer(curChilds).sorted + debug("Sorted list of state change requests %s to broker id %d".format(outstandingRequests.mkString(","), config.brokerId)) + + stateChangeHandler.handleStateChanges(ensureStateChangeRequestIsValidOnThisBroker, ensureEpochValidity) + } + } + + private def ensureStateChangeRequestIsValidOnThisBroker(epoch: Int, request: String): Boolean = { + // get the topic and partition that this request is meant for + val requestInfo = request.split(":") + val topic = requestInfo.head + val partition = requestInfo.take(2).last.toInt + + // check if this broker hosts a replica for this topic and partition + ZkUtils.isPartitionOnBroker(zkClient, topic, partition, config.brokerId) + } + + private def ensureEpochValidity(epoch: Int, request: String): Boolean = { + // get the topic and partition that this request is meant for + val requestInfo = request.split(":") + val topic = requestInfo.head + val partition = requestInfo.take(2).last.toInt + + // check if the request's epoch matches the current leader's epoch + val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition) + val validEpoch = currentLeaderEpoch == epoch + if(epoch > currentLeaderEpoch) + throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " + + "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition)) + validEpoch + } + } + class LeaderChangeListener extends IZkDataListener with Logging { @throws(classOf[Exception]) Index: core/src/main/scala/kafka/server/KafkaConfig.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1310924) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (working copy) @@ -105,4 +105,7 @@ * leader election on all replicas minus the preferred replica */ val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300) + /* size of the state change request queue in Zookeeper */ + val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000) + }