From 4d49721dac92b19ac56387b0ee094ee72afacb06 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Wed, 10 Jul 2013 18:43:17 -0700 Subject: [PATCH 1/6] Add Partitions implementation --- .../scala/kafka/admin/AddPartitionsCommand.scala | 123 +++++++++++++++++ core/src/main/scala/kafka/admin/AdminUtils.scala | 20 ++- .../scala/kafka/admin/CreateTopicCommand.scala | 2 +- .../scala/kafka/controller/KafkaController.scala | 4 +- .../kafka/controller/PartitionStateMachine.scala | 29 +++- 6 files changed, 242 insertions(+), 83 deletions(-) create mode 100644 core/src/main/scala/kafka/admin/AddPartitionsCommand.scala diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala new file mode 100644 index 0000000..4a1fc3a --- /dev/null +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple.OptionParser +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import scala.collection.mutable +import kafka.common.{TopicAndPartition, KafkaException, Topic} +import org.I0Itec.zkclient.exception.ZkNoNodeException + +object AddPartitionsCommand extends Logging { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need to be added.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add to the topic") + .withRequiredArg + .describedAs("# of partitions") + .ofType(classOf[java.lang.Integer]) + val replicationFactorOpt = parser.accepts("replica", "Replication factor for each partitions in the topic") + .withRequiredArg + .describedAs("replication factor") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers") + .withRequiredArg + .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " + + "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...") + .ofType(classOf[String]) + .defaultsTo("") + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val topic = options.valueOf(topicOpt) + val zkConnect = options.valueOf(zkConnectOpt) + val nPartitions = options.valueOf(nPartitionsOpt).intValue + val replicationFactor = options.valueOf(replicationFactorOpt).intValue + val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) + var zkClient: ZkClient = null + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + addPartitions(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr) + println("adding partitions succeeded!") + } catch { + case e => + println("adding partitions failed because of " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + if (zkClient != null) + zkClient.close() + } + } + + def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") { + val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + if (existingPartitionsReplicaList.size == 0) + throw new AdministrationException("The topic %s does not exist".format(topic)) + + val partitionStartIndex = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get + + val brokerList = ZkUtils.getSortedBrokerList(zkClient) + val newPartitionReplicaList = if (replicaAssignmentStr == "") + AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor, partitionStartIndex.head, existingPartitionsReplicaList.size) + else + getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) + info("Add partition list for %s is %s".format(topic, newPartitionReplicaList)) + val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) + // add the new list + partitionReplicaList ++= newPartitionReplicaList + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true) + } + + def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = { + val partitionList = replicaAssignmentList.split(",") + val ret = new mutable.HashMap[Int, List[Int]]() + for (i <- 0 until partitionList.size) { + val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + if (brokerList.size <= 0) + throw new AdministrationException("replication factor must be larger than 0") + if (brokerList.size != brokerList.toSet.size) + throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList) + if (!brokerList.toSet.subsetOf(availableBrokerList)) + throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString + + "available broker:" + availableBrokerList.toString) + ret.put(i, brokerList.toList) + if (ret(i).size != ret(0).size) + throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList) + } + ret.toMap + } +} diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 41cb764..38d5bd9 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -48,7 +48,7 @@ object AdminUtils extends Logging { * p7 p8 p9 p5 p6 (3nd replica) */ def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int, - fixedStartIndex: Int = -1) // for testing only + fixedStartIndex: Int = -1, startPartitionId: Int = -1) // for testing only : Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new AdministrationException("number of partitions must be larger than 0") @@ -59,25 +59,33 @@ object AdminUtils extends Logging { " larger than available brokers: " + brokerList.size) val ret = new mutable.HashMap[Int, List[Int]]() val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) + var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0 var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) for (i <- 0 until nPartitions) { - if (i > 0 && (i % brokerList.size == 0)) + if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0)) secondReplicaShift += 1 - val firstReplicaIndex = (i + startIndex) % brokerList.size + val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size var replicaList = List(brokerList(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) - ret.put(i, replicaList.reverse) + ret.put(currentPartitionId, replicaList.reverse) + currentPartitionId = currentPartitionId + 1 } ret.toMap } - def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) { + def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient, update: Boolean = false) { try { val zkPath = ZkUtils.getTopicPath(topic) val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2))) - ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) + info("topic creation " + jsonPartitionData.toString) + if (!update) { + ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) + } + else { + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) + } debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala index e762115..21c1186 100644 --- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala @@ -93,7 +93,7 @@ object CreateTopicCommand extends Logging { else getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment)) - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient) } def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = { diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 5ac38fd..a620bcf 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -210,6 +210,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerPreferredReplicaElectionListener() + // register the partition change listeners for all existing topics on failover + ZkUtils.getAllTopics(zkClient).foreach(p => registerPartitionChangeListener(p)) partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() @@ -307,9 +309,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) { info("New topic creation callback for %s".format(newPartitions.mkString(","))) + onNewPartitionCreation(newPartitions) // subscribe to partition changes topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) - onNewPartitionCreation(newPartitions) } /** diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index deebed0..e43eb99 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} import kafka.utils.{Logging, ZkUtils} -import org.I0Itec.zkclient.IZkChildListener +import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.apache.log4j.Logger @@ -334,7 +334,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } def registerPartitionChangeListener(topic: String) = { - zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic)) + zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), new AddPartitionsListener(topic)) } private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { @@ -383,15 +383,32 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } - class PartitionChangeListener(topic: String) extends IZkChildListener with Logging { - this.logIdent = "[Controller " + controller.config.brokerId + "]: " + + class AddPartitionsListener(topic: String) extends IZkDataListener with Logging { + + this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: " @throws(classOf[Exception]) - def handleChildChange(parentPath : String, children : java.util.List[String]) { + def handleDataChange(dataPath : String, data: Object) { controllerContext.controllerLock synchronized { - // TODO: To be completed as part of KAFKA-41 + try { + info("Add Partition triggered " + data.toString + " for path " + dataPath) + val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + val partitionsRemainingToBeAdded = addedPartitionReplicaAssignment.filter(p => + !controllerContext.partitionLeadershipInfo.contains(p._1)) + controllerContext.partitionReplicaAssignment.++=(partitionsRemainingToBeAdded) + info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) + controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) + } catch { + case e => error("Error while handling add partitions", e ) + } } } + + @throws(classOf[Exception]) + def handleDataDeleted(parentPath : String) { + // this is not implemented for partition change + } } } -- 1.7.12.4 (Apple Git-37) From 7e7bf68764fc613ce56531c87de5055cc7619671 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Thu, 11 Jul 2013 10:00:53 -0700 Subject: [PATCH 2/6] add test for add partitions --- core/src/main/scala/kafka/admin/AdminUtils.scala | 5 +- .../scala/kafka/controller/KafkaController.scala | 4 +- .../scala/unit/kafka/admin/AddPartitionsTest.scala | 134 +++++++++++++++++++++ .../test/scala/unit/kafka/admin/AdminTest.scala | 16 +-- 4 files changed, 146 insertions(+), 13 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 38d5bd9..0d38eaa 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -82,9 +82,8 @@ object AdminUtils extends Logging { info("topic creation " + jsonPartitionData.toString) if (!update) { ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) - } - else { - ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) + } else { + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) } debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a620bcf..f5a64ad 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -210,13 +210,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerPreferredReplicaElectionListener() - // register the partition change listeners for all existing topics on failover - ZkUtils.getAllTopics(zkClient).foreach(p => registerPartitionChangeListener(p)) partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() replicaStateMachine.startup() partitionStateMachine.startup() + // register the partition change listeners for all existing topics on failover + ZkUtils.getAllTopics(zkClient).foreach(p => partitionStateMachine.registerPartitionChangeListener(p)) Utils.registerMBean(this, KafkaController.MBeanName) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) initializeAndMaybeTriggerPartitionReassignment() diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala new file mode 100644 index 0000000..e51ca92 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import org.scalatest.junit.JUnit3Suite +import kafka.zk.ZooKeeperTestHarness +import kafka.admin.{AdministrationException, AddPartitionsCommand, CreateTopicCommand} +import kafka.utils.TestUtils._ +import junit.framework.Assert._ +import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.cluster.Broker +import org.I0Itec.zkclient.ZkClient +import kafka.client.ClientUtils +import kafka.server.{KafkaConfig, KafkaServer} + +class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { + val brokerId1 = 0 + val brokerId2 = 1 + val brokerId3 = 2 + val brokerId4 = 3 + + val port1 = TestUtils.choosePort() + val port2 = TestUtils.choosePort() + val port3 = TestUtils.choosePort() + val port4 = TestUtils.choosePort() + + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) + + var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + var brokers: Seq[Broker] = Seq.empty[Broker] + + val partitionId = 0 + + val topic1 = "new-topic1" + val topic2 = "new-topic2" + val topic3 = "new-topic3" + val topic4 = "new-topic4" + + override def setUp() { + super.setUp() + // start all the servers + val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) + val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) + val server3 = TestUtils.createServer(new KafkaConfig(configProps3)) + val server4 = TestUtils.createServer(new KafkaConfig(configProps4)) + + servers ++= List(server1, server2, server3, server4) + brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port)) + + // create topics with 1 partition, 2 replicas, one on each broker + CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1") + CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2") + CreateTopicCommand.createTopic(zkClient, topic3, 1, 2, "2:3") + CreateTopicCommand.createTopic(zkClient, topic4, 1, 2, "0:3") + + + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId, 500) + var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId, 500) + var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId, 500) + + debug("Leader for " + topic1 + " is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1))) + + assertTrue("Leader should get elected", leader1.isDefined) + assertTrue("Leader should get elected", leader2.isDefined) + assertTrue("Leader should get elected", leader3.isDefined) + assertTrue("Leader should get elected", leader4.isDefined) + + assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2)) + assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3)) + assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3)) + } + + override def tearDown() { + servers.map(server => server.shutdown()) + servers.map(server => Utils.rm(server.config.logDirs)) + super.tearDown() + } + + def testTopicDoesNotExist { + try { + AddPartitionsCommand.addPartitions(zkClient, "Blah", 1, 1) + fail("Topic should not exist") + } catch { + case e: AdministrationException => //this is good + case e2 => throw e2 + } + } + + def testIncrementPartitions { + AddPartitionsCommand.addPartitions(zkClient, topic1, 2, 2) + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500) + val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 1).get + val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get + assertEquals(leader1.get, leader1FromZk) + assertEquals(leader2.get, leader2FromZk) + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 2, 1000) + val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", + 2000,0).topicsMetadata + val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) + val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata + assertEquals(partitionDataForTopic1(1).partitionId, 1) + assertEquals(partitionDataForTopic1(2).partitionId, 2) + partitionDataForTopic1(1). + assertEquals(partitionDataForTopic1(1).leader.get, ) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 0d8b70f..dc0013f 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -157,7 +157,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topic = "test" TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap @@ -166,7 +166,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) try { - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) fail("shouldn't be able to create a topic already exists") } catch { case e: TopicExistsException => // this is good @@ -181,7 +181,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 val newReplicas = Seq(0, 2, 3) val partitionToBeReassigned = 0 @@ -206,7 +206,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 val newReplicas = Seq(1, 2, 3) val partitionToBeReassigned = 0 @@ -232,7 +232,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -273,7 +273,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // put the partition in the reassigned path as well // reassign partition 0 val newReplicas = Seq(0, 1) @@ -312,7 +312,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get @@ -333,7 +333,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000) -- 1.7.12.4 (Apple Git-37) From f07a9eb5f7de1f6c060ff40f4d2f004880f8a631 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Fri, 12 Jul 2013 00:11:21 -0700 Subject: [PATCH 3/6] More unit tests --- .../scala/kafka/admin/AddPartitionsCommand.scala | 13 ++- .../scala/unit/kafka/admin/AddPartitionsTest.scala | 117 ++++++++++++++++++++- 2 files changed, 121 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala index 4a1fc3a..60dd6c9 100644 --- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -57,7 +57,8 @@ object AddPartitionsCommand extends Logging { for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) { if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") + System.err.println("***Please note that this tool can only be used to add partitions when data for a topic does not use a key.***\n" + + "Missing required argument. " + " \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } @@ -94,7 +95,7 @@ object AddPartitionsCommand extends Logging { val newPartitionReplicaList = if (replicaAssignmentStr == "") AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor, partitionStartIndex.head, existingPartitionsReplicaList.size) else - getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) + getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size) info("Add partition list for %s is %s".format(topic, newPartitionReplicaList)) val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) // add the new list @@ -102,9 +103,10 @@ object AddPartitionsCommand extends Logging { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true) } - def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = { + def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = { val partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() + var partitionId = startPartitionId for (i <- 0 until partitionList.size) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) if (brokerList.size <= 0) @@ -114,9 +116,10 @@ object AddPartitionsCommand extends Logging { if (!brokerList.toSet.subsetOf(availableBrokerList)) throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString + "available broker:" + availableBrokerList.toString) - ret.put(i, brokerList.toList) - if (ret(i).size != ret(0).size) + ret.put(partitionId, brokerList.toList) + if (ret(partitionId).size != ret(startPartitionId).size) throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList) + partitionId = partitionId + 1 } ret.toMap } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index e51ca92..2593732 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -119,16 +119,125 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get assertEquals(leader1.get, leader1FromZk) assertEquals(leader2.get, leader2FromZk) + // read metadata from a broker and verify the new topic partitions exist - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 1, 1000) - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 2, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2, 1000) val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", 2000,0).topicsMetadata val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata + assertEquals(partitionDataForTopic1.size, 3) assertEquals(partitionDataForTopic1(1).partitionId, 1) assertEquals(partitionDataForTopic1(2).partitionId, 2) - partitionDataForTopic1(1). - assertEquals(partitionDataForTopic1(1).leader.get, ) + val replicas = partitionDataForTopic1(1).replicas + assertEquals(replicas.size, 2) + assert(replicas.contains(partitionDataForTopic1(1).leader.get)) + } + + def testManualAssignmentOfReplicas { + AddPartitionsCommand.addPartitions(zkClient, topic2, 2, 2, "0:1,2:3") + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500) + val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 1).get + val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 2).get + assertEquals(leader1.get, leader1FromZk) + assertEquals(leader2.get, leader2FromZk) + + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2, 1000) + val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", + 2000,0).topicsMetadata + val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) + val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata + assertEquals(partitionDataForTopic2.size, 3) + assertEquals(partitionDataForTopic2(1).partitionId, 1) + assertEquals(partitionDataForTopic2(2).partitionId, 2) + val replicas = partitionDataForTopic2(1).replicas + assertEquals(replicas.size, 2) + assert(replicas(0).id == 0 || replicas(0).id == 1) + assert(replicas(1).id == 0 || replicas(1).id == 1) + } + + def testReplicaPlacement { + AddPartitionsCommand.addPartitions(zkClient, topic3, 6, 4) + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500) + var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 3, 500) + var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 4, 500) + var leader5 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 5, 500) + var leader6 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 6, 500) + + val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 1).get + val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 2).get + val leader3FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 3).get + val leader4FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 4).get + val leader5FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 5).get + val leader6FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 6).get + + assertEquals(leader1.get, leader1FromZk) + assertEquals(leader2.get, leader2FromZk) + assertEquals(leader3.get, leader3FromZk) + assertEquals(leader4.get, leader4FromZk) + assertEquals(leader5.get, leader5FromZk) + assertEquals(leader6.get, leader6FromZk) + + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6, 1000) + + val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", + 2000,0).topicsMetadata + + val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head + val partition1DataForTopic3 = metaDataForTopic3.partitionsMetadata(1) + val partition2DataForTopic3 = metaDataForTopic3.partitionsMetadata(2) + val partition3DataForTopic3 = metaDataForTopic3.partitionsMetadata(3) + val partition4DataForTopic3 = metaDataForTopic3.partitionsMetadata(4) + val partition5DataForTopic3 = metaDataForTopic3.partitionsMetadata(5) + val partition6DataForTopic3 = metaDataForTopic3.partitionsMetadata(6) + + assertEquals(partition1DataForTopic3.replicas.size, 4) + assertEquals(partition1DataForTopic3.replicas(0).id, 3) + assertEquals(partition1DataForTopic3.replicas(1).id, 2) + assertEquals(partition1DataForTopic3.replicas(2).id, 0) + assertEquals(partition1DataForTopic3.replicas(3).id, 1) + + assertEquals(partition2DataForTopic3.replicas.size, 4) + assertEquals(partition2DataForTopic3.replicas(0).id, 0) + assertEquals(partition2DataForTopic3.replicas(1).id, 3) + assertEquals(partition2DataForTopic3.replicas(2).id, 1) + assertEquals(partition2DataForTopic3.replicas(3).id, 2) + + assertEquals(partition3DataForTopic3.replicas.size, 4) + assertEquals(partition3DataForTopic3.replicas(0).id, 1) + assertEquals(partition3DataForTopic3.replicas(1).id, 0) + assertEquals(partition3DataForTopic3.replicas(2).id, 2) + assertEquals(partition3DataForTopic3.replicas(3).id, 3) + + assertEquals(partition4DataForTopic3.replicas.size, 4) + assertEquals(partition4DataForTopic3.replicas(0).id, 2) + assertEquals(partition4DataForTopic3.replicas(1).id, 3) + assertEquals(partition4DataForTopic3.replicas(2).id, 0) + assertEquals(partition4DataForTopic3.replicas(3).id, 1) + + assertEquals(partition5DataForTopic3.replicas.size, 4) + assertEquals(partition5DataForTopic3.replicas(0).id, 3) + assertEquals(partition5DataForTopic3.replicas(1).id, 0) + assertEquals(partition5DataForTopic3.replicas(2).id, 1) + assertEquals(partition5DataForTopic3.replicas(3).id, 2) + + assertEquals(partition6DataForTopic3.replicas.size, 4) + assertEquals(partition6DataForTopic3.replicas(0).id, 0) + assertEquals(partition6DataForTopic3.replicas(1).id, 1) + assertEquals(partition6DataForTopic3.replicas(2).id, 2) + assertEquals(partition6DataForTopic3.replicas(3).id, 3) } } \ No newline at end of file -- 1.7.12.4 (Apple Git-37) From c5c8ad2c35b0488a5b8ba56b4eb9ae91cba2eb98 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Fri, 19 Jul 2013 11:11:17 -0700 Subject: [PATCH 4/6] Review changes --- core/src/main/scala/kafka/admin/AddPartitionsCommand.scala | 4 ++++ core/src/main/scala/kafka/admin/AdminUtils.scala | 10 +++++----- core/src/main/scala/kafka/controller/KafkaController.scala | 2 +- .../main/scala/kafka/controller/PartitionStateMachine.scala | 1 - core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala | 4 ++-- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala index 60dd6c9..714ca62 100644 --- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -91,6 +91,10 @@ object AddPartitionsCommand extends Logging { val partitionStartIndex = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get + if (partitionStartIndex.size != replicationFactor) + throw new AdministrationException("The replication factor provided " + replicationFactor + + " is not equal to the existing replication factor for the topic " + partitionStartIndex.size) + val brokerList = ZkUtils.getSortedBrokerList(zkClient) val newPartitionReplicaList = if (replicaAssignmentStr == "") AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor, partitionStartIndex.head, existingPartitionsReplicaList.size) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 0d38eaa..d7835cf 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -48,7 +48,7 @@ object AdminUtils extends Logging { * p7 p8 p9 p5 p6 (3nd replica) */ def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int, - fixedStartIndex: Int = -1, startPartitionId: Int = -1) // for testing only + fixedStartIndex: Int = -1, startPartitionId: Int = -1) : Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new AdministrationException("number of partitions must be larger than 0") @@ -61,14 +61,14 @@ object AdminUtils extends Logging { val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0 - var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) + var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) for (i <- 0 until nPartitions) { if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0)) - secondReplicaShift += 1 + nextReplicaShift += 1 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size var replicaList = List(brokerList(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) - replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) + replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size)) ret.put(currentPartitionId, replicaList.reverse) currentPartitionId = currentPartitionId + 1 } @@ -79,7 +79,7 @@ object AdminUtils extends Logging { try { val zkPath = ZkUtils.getTopicPath(topic) val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2))) - info("topic creation " + jsonPartitionData.toString) + info("Topic creation " + jsonPartitionData.toString) if (!update) { ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) } else { diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f5a64ad..eebfd7b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -216,7 +216,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg replicaStateMachine.startup() partitionStateMachine.startup() // register the partition change listeners for all existing topics on failover - ZkUtils.getAllTopics(zkClient).foreach(p => partitionStateMachine.registerPartitionChangeListener(p)) + controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) Utils.registerMBean(this, KafkaController.MBeanName) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) initializeAndMaybeTriggerPartitionReassignment() diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index e43eb99..3de91ed 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -396,7 +396,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) val partitionsRemainingToBeAdded = addedPartitionReplicaAssignment.filter(p => !controllerContext.partitionLeadershipInfo.contains(p._1)) - controllerContext.partitionReplicaAssignment.++=(partitionsRemainingToBeAdded) info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) } catch { diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 2593732..63f39fd 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -68,8 +68,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { // create topics with 1 partition, 2 replicas, one on each broker CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1") CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2") - CreateTopicCommand.createTopic(zkClient, topic3, 1, 2, "2:3") - CreateTopicCommand.createTopic(zkClient, topic4, 1, 2, "0:3") + CreateTopicCommand.createTopic(zkClient, topic3, 1, 4, "2:3:0:1") + CreateTopicCommand.createTopic(zkClient, topic4, 1, 4, "0:3") // wait until leader is elected -- 1.7.12.4 (Apple Git-37) From 8ed07bb5d5a7122bd8d338805631698128185631 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Mon, 22 Jul 2013 11:02:24 -0700 Subject: [PATCH 5/6] Jun's review --- core/src/main/scala/kafka/admin/AddPartitionsCommand.scala | 5 ++--- core/src/main/scala/kafka/admin/AdminUtils.scala | 4 +++- core/src/main/scala/kafka/controller/KafkaController.scala | 2 +- core/src/main/scala/kafka/controller/PartitionStateMachine.scala | 8 ++++---- core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala | 2 -- 5 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala index 714ca62..50cc844 100644 --- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -21,8 +21,7 @@ import joptsimple.OptionParser import kafka.utils._ import org.I0Itec.zkclient.ZkClient import scala.collection.mutable -import kafka.common.{TopicAndPartition, KafkaException, Topic} -import org.I0Itec.zkclient.exception.ZkNoNodeException +import kafka.common.TopicAndPartition object AddPartitionsCommand extends Logging { @@ -46,7 +45,7 @@ object AddPartitionsCommand extends Logging { .describedAs("replication factor") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers") + val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers for the new partitions") .withRequiredArg .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " + "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...") diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index d7835cf..d8c5592 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -79,10 +79,12 @@ object AdminUtils extends Logging { try { val zkPath = ZkUtils.getTopicPath(topic) val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2))) - info("Topic creation " + jsonPartitionData.toString) + if (!update) { + info("Topic creation " + jsonPartitionData.toString) ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) } else { + info("Topic update " + jsonPartitionData.toString) ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) } debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index eebfd7b..b07e27b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -309,9 +309,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) { info("New topic creation callback for %s".format(newPartitions.mkString(","))) - onNewPartitionCreation(newPartitions) // subscribe to partition changes topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) + onNewPartitionCreation(newPartitions) } /** diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 3de91ed..0135d45 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -393,13 +393,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controllerContext.controllerLock synchronized { try { info("Add Partition triggered " + data.toString + " for path " + dataPath) - val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) - val partitionsRemainingToBeAdded = addedPartitionReplicaAssignment.filter(p => - !controllerContext.partitionLeadershipInfo.contains(p._1)) + val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p => + !controllerContext.partitionReplicaAssignment.contains(p._1)) info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) } catch { - case e => error("Error while handling add partitions", e ) + case e => error("Error while handling add partitions for data path " + dataPath, e ) } } } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 63f39fd..df1dfb7 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -19,12 +19,10 @@ package kafka.admin import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness -import kafka.admin.{AdministrationException, AddPartitionsCommand, CreateTopicCommand} import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} import kafka.cluster.Broker -import org.I0Itec.zkclient.ZkClient import kafka.client.ClientUtils import kafka.server.{KafkaConfig, KafkaServer} -- 1.7.12.4 (Apple Git-37) From cd28c3028f1a536693bb09bd27ed6af2928eebec Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Mon, 22 Jul 2013 16:39:47 -0700 Subject: [PATCH 6/6] more review changes --- .../scala/kafka/admin/AddPartitionsCommand.scala | 26 ++++++++++------------ core/src/main/scala/kafka/admin/AdminUtils.scala | 4 ++-- .../scala/unit/kafka/admin/AddPartitionsTest.scala | 18 +++++++++++---- 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala index 50cc844..5757c32 100644 --- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -40,11 +40,6 @@ object AddPartitionsCommand extends Logging { .withRequiredArg .describedAs("# of partitions") .ofType(classOf[java.lang.Integer]) - val replicationFactorOpt = parser.accepts("replica", "Replication factor for each partitions in the topic") - .withRequiredArg - .describedAs("replication factor") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers for the new partitions") .withRequiredArg .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " + @@ -66,12 +61,11 @@ object AddPartitionsCommand extends Logging { val topic = options.valueOf(topicOpt) val zkConnect = options.valueOf(zkConnectOpt) val nPartitions = options.valueOf(nPartitionsOpt).intValue - val replicationFactor = options.valueOf(replicationFactorOpt).intValue val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) var zkClient: ZkClient = null try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - addPartitions(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr) + addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) println("adding partitions succeeded!") } catch { case e => @@ -83,22 +77,26 @@ object AddPartitionsCommand extends Logging { } } - def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") { + def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") { val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) if (existingPartitionsReplicaList.size == 0) throw new AdministrationException("The topic %s does not exist".format(topic)) - val partitionStartIndex = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get - - if (partitionStartIndex.size != replicationFactor) - throw new AdministrationException("The replication factor provided " + replicationFactor + - " is not equal to the existing replication factor for the topic " + partitionStartIndex.size) + val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get + // create the new partition replication list val brokerList = ZkUtils.getSortedBrokerList(zkClient) val newPartitionReplicaList = if (replicaAssignmentStr == "") - AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor, partitionStartIndex.head, existingPartitionsReplicaList.size) + AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size) else getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size) + + // check if manual assignment has the right replication factor + val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size)) + if (unmatchedRepFactorList.size != 0) + throw new AdministrationException("The replication factor in manual replication assignment " + + " is not equal to the existing replication factor for the topic " + existingReplicaList.size) + info("Add partition list for %s is %s".format(topic, newPartitionReplicaList)) val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) // add the new list diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index d8c5592..c399bc7 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -84,8 +84,8 @@ object AdminUtils extends Logging { info("Topic creation " + jsonPartitionData.toString) ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) } else { - info("Topic update " + jsonPartitionData.toString) - ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) + info("Topic update " + jsonPartitionData.toString) + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) } debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index df1dfb7..06be990 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -100,7 +100,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { def testTopicDoesNotExist { try { - AddPartitionsCommand.addPartitions(zkClient, "Blah", 1, 1) + AddPartitionsCommand.addPartitions(zkClient, "Blah", 1) fail("Topic should not exist") } catch { case e: AdministrationException => //this is good @@ -108,8 +108,18 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { } } + def testWrongReplicaCount { + try { + AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2") + fail("Add partitions should fail") + } catch { + case e: AdministrationException => //this is good + case e2 => throw e2 + } + } + def testIncrementPartitions { - AddPartitionsCommand.addPartitions(zkClient, topic1, 2, 2) + AddPartitionsCommand.addPartitions(zkClient, topic1, 2) // wait until leader is elected var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500) var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500) @@ -134,7 +144,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { } def testManualAssignmentOfReplicas { - AddPartitionsCommand.addPartitions(zkClient, topic2, 2, 2, "0:1,2:3") + AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3") // wait until leader is elected var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500) var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500) @@ -160,7 +170,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { } def testReplicaPlacement { - AddPartitionsCommand.addPartitions(zkClient, topic3, 6, 4) + AddPartitionsCommand.addPartitions(zkClient, topic3, 6) // wait until leader is elected var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500) var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500) -- 1.7.12.4 (Apple Git-37)