From 4d49721dac92b19ac56387b0ee094ee72afacb06 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Wed, 10 Jul 2013 18:43:17 -0700 Subject: [PATCH 1/3] Add Partitions implementation --- .../scala/kafka/admin/AddPartitionsCommand.scala | 123 +++++++++++++++++ core/src/main/scala/kafka/admin/AdminUtils.scala | 20 ++- .../scala/kafka/admin/CreateTopicCommand.scala | 2 +- .../consumer/ZookeeperConsumerConnector.scala | 147 +++++++++++---------- .../scala/kafka/controller/KafkaController.scala | 4 +- .../kafka/controller/PartitionStateMachine.scala | 29 +++- 6 files changed, 242 insertions(+), 83 deletions(-) create mode 100644 core/src/main/scala/kafka/admin/AddPartitionsCommand.scala diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala new file mode 100644 index 0000000..4a1fc3a --- /dev/null +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple.OptionParser +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import scala.collection.mutable +import kafka.common.{TopicAndPartition, KafkaException, Topic} +import org.I0Itec.zkclient.exception.ZkNoNodeException + +object AddPartitionsCommand extends Logging { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need to be added.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add to the topic") + .withRequiredArg + .describedAs("# of partitions") + .ofType(classOf[java.lang.Integer]) + val replicationFactorOpt = parser.accepts("replica", "Replication factor for each partitions in the topic") + .withRequiredArg + .describedAs("replication factor") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers") + .withRequiredArg + .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " + + "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...") + .ofType(classOf[String]) + .defaultsTo("") + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val topic = options.valueOf(topicOpt) + val zkConnect = options.valueOf(zkConnectOpt) + val nPartitions = options.valueOf(nPartitionsOpt).intValue + val replicationFactor = options.valueOf(replicationFactorOpt).intValue + val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) + var zkClient: ZkClient = null + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + addPartitions(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr) + println("adding partitions succeeded!") + } catch { + case e => + println("adding partitions failed because of " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + if (zkClient != null) + zkClient.close() + } + } + + def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") { + val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + if (existingPartitionsReplicaList.size == 0) + throw new AdministrationException("The topic %s does not exist".format(topic)) + + val partitionStartIndex = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get + + val brokerList = ZkUtils.getSortedBrokerList(zkClient) + val newPartitionReplicaList = if (replicaAssignmentStr == "") + AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor, partitionStartIndex.head, existingPartitionsReplicaList.size) + else + getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) + info("Add partition list for %s is %s".format(topic, newPartitionReplicaList)) + val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) + // add the new list + partitionReplicaList ++= newPartitionReplicaList + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true) + } + + def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = { + val partitionList = replicaAssignmentList.split(",") + val ret = new mutable.HashMap[Int, List[Int]]() + for (i <- 0 until partitionList.size) { + val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + if (brokerList.size <= 0) + throw new AdministrationException("replication factor must be larger than 0") + if (brokerList.size != brokerList.toSet.size) + throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList) + if (!brokerList.toSet.subsetOf(availableBrokerList)) + throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString + + "available broker:" + availableBrokerList.toString) + ret.put(i, brokerList.toList) + if (ret(i).size != ret(0).size) + throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList) + } + ret.toMap + } +} diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 41cb764..38d5bd9 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -48,7 +48,7 @@ object AdminUtils extends Logging { * p7 p8 p9 p5 p6 (3nd replica) */ def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int, - fixedStartIndex: Int = -1) // for testing only + fixedStartIndex: Int = -1, startPartitionId: Int = -1) // for testing only : Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new AdministrationException("number of partitions must be larger than 0") @@ -59,25 +59,33 @@ object AdminUtils extends Logging { " larger than available brokers: " + brokerList.size) val ret = new mutable.HashMap[Int, List[Int]]() val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) + var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0 var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) for (i <- 0 until nPartitions) { - if (i > 0 && (i % brokerList.size == 0)) + if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0)) secondReplicaShift += 1 - val firstReplicaIndex = (i + startIndex) % brokerList.size + val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size var replicaList = List(brokerList(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) - ret.put(i, replicaList.reverse) + ret.put(currentPartitionId, replicaList.reverse) + currentPartitionId = currentPartitionId + 1 } ret.toMap } - def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) { + def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient, update: Boolean = false) { try { val zkPath = ZkUtils.getTopicPath(topic) val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2))) - ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) + info("topic creation " + jsonPartitionData.toString) + if (!update) { + ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) + } + else { + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) + } debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala index e762115..21c1186 100644 --- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala @@ -93,7 +93,7 @@ object CreateTopicCommand extends Logging { else getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment)) - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient) } def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index d952187..28f71e2 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -401,81 +401,90 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, - brokers, - config.clientId, - config.socketTimeoutMs, - correlationId.getAndIncrement).topicsMetadata - val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] - topicsMetadata.foreach(m => { - val topic = m.topic - val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) - partitionsPerTopicMap.put(topic, partitions) - }) - - /** - * fetchers must be stopped to avoid data duplication, since if the current - * rebalancing attempt fails, the partitions that are released could be owned by another consumer. - * But if we don't stop the fetchers first, this consumer would continue returning data for released - * partitions in parallel. So, not stopping the fetchers leads to duplicate data. - */ - closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) - - releasePartitionOwnership(topicRegistry) - - var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]() - val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] - - for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { - currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) - - val topicDirs = new ZKGroupTopicDirs(group, topic) - val curConsumers = consumersPerTopicMap.get(topic).get - val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get - - val nPartsPerConsumer = curPartitions.size / curConsumers.size - val nConsumersWithExtraPart = curPartitions.size % curConsumers.size - - info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + - " for topic " + topic + " with consumers: " + curConsumers) - - for (consumerThreadId <- consumerThreadIdSet) { - val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId) - assert(myConsumerPosition >= 0) - val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) - val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) + if (brokers.size == 0) { + // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. + // We log an error and register for child changes on brokers/id so that rebalance can be triggered when the brokers + // are up. + error("no brokers found when trying to rebalance.") + zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener) + true + } else { + val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, + brokers, + config.clientId, + config.socketTimeoutMs, + correlationId.getAndIncrement).topicsMetadata + val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] + topicsMetadata.foreach(m => { + val topic = m.topic + val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) + partitionsPerTopicMap.put(topic, partitions) + }) /** - * Range-partition the sorted partitions to consumers for better locality. - * The first few consumers pick up an extra partition, if any. + * fetchers must be stopped to avoid data duplication, since if the current + * rebalancing attempt fails, the partitions that are released could be owned by another consumer. + * But if we don't stop the fetchers first, this consumer would continue returning data for released + * partitions in parallel. So, not stopping the fetchers leads to duplicate data. */ - if (nParts <= 0) - warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) - else { - for (i <- startPart until startPart + nParts) { - val partition = curPartitions(i) - info(consumerThreadId + " attempting to claim partition " + partition) - addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId) - // record the partition ownership decision - partitionOwnershipDecision += ((topic, partition) -> consumerThreadId) + closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) + + releasePartitionOwnership(topicRegistry) + + var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]() + val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] + + for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { + currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) + + val topicDirs = new ZKGroupTopicDirs(group, topic) + val curConsumers = consumersPerTopicMap.get(topic).get + val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get + + val nPartsPerConsumer = curPartitions.size / curConsumers.size + val nConsumersWithExtraPart = curPartitions.size % curConsumers.size + + info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + + " for topic " + topic + " with consumers: " + curConsumers) + + for (consumerThreadId <- consumerThreadIdSet) { + val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId) + assert(myConsumerPosition >= 0) + val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) + val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) + + /** + * Range-partition the sorted partitions to consumers for better locality. + * The first few consumers pick up an extra partition, if any. + */ + if (nParts <= 0) + warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) + else { + for (i <- startPart until startPart + nParts) { + val partition = curPartitions(i) + info(consumerThreadId + " attempting to claim partition " + partition) + addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId) + // record the partition ownership decision + partitionOwnershipDecision += ((topic, partition) -> consumerThreadId) + } + } } } - } - } - /** - * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt - * A rebalancing attempt is completed successfully only after the fetchers have been started correctly - */ - if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) { - info("Updating the cache") - debug("Partitions per topic cache " + partitionsPerTopicMap) - debug("Consumers per topic cache " + consumersPerTopicMap) - topicRegistry = currentTopicRegistry - updateFetcher(cluster) - true - } else { - false + /** + * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt + * A rebalancing attempt is completed successfully only after the fetchers have been started correctly + */ + if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) { + info("Updating the cache") + debug("Partitions per topic cache " + partitionsPerTopicMap) + debug("Consumers per topic cache " + consumersPerTopicMap) + topicRegistry = currentTopicRegistry + updateFetcher(cluster) + true + } else { + false + } } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 5ac38fd..a620bcf 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -210,6 +210,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerPreferredReplicaElectionListener() + // register the partition change listeners for all existing topics on failover + ZkUtils.getAllTopics(zkClient).foreach(p => registerPartitionChangeListener(p)) partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() @@ -307,9 +309,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) { info("New topic creation callback for %s".format(newPartitions.mkString(","))) + onNewPartitionCreation(newPartitions) // subscribe to partition changes topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) - onNewPartitionCreation(newPartitions) } /** diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index deebed0..e43eb99 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} import kafka.utils.{Logging, ZkUtils} -import org.I0Itec.zkclient.IZkChildListener +import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.apache.log4j.Logger @@ -334,7 +334,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } def registerPartitionChangeListener(topic: String) = { - zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic)) + zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), new AddPartitionsListener(topic)) } private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { @@ -383,15 +383,32 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } - class PartitionChangeListener(topic: String) extends IZkChildListener with Logging { - this.logIdent = "[Controller " + controller.config.brokerId + "]: " + + class AddPartitionsListener(topic: String) extends IZkDataListener with Logging { + + this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: " @throws(classOf[Exception]) - def handleChildChange(parentPath : String, children : java.util.List[String]) { + def handleDataChange(dataPath : String, data: Object) { controllerContext.controllerLock synchronized { - // TODO: To be completed as part of KAFKA-41 + try { + info("Add Partition triggered " + data.toString + " for path " + dataPath) + val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + val partitionsRemainingToBeAdded = addedPartitionReplicaAssignment.filter(p => + !controllerContext.partitionLeadershipInfo.contains(p._1)) + controllerContext.partitionReplicaAssignment.++=(partitionsRemainingToBeAdded) + info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) + controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) + } catch { + case e => error("Error while handling add partitions", e ) + } } } + + @throws(classOf[Exception]) + def handleDataDeleted(parentPath : String) { + // this is not implemented for partition change + } } } -- 1.7.12.4 (Apple Git-37) From 7e7bf68764fc613ce56531c87de5055cc7619671 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Thu, 11 Jul 2013 10:00:53 -0700 Subject: [PATCH 2/3] add test for add partitions --- core/src/main/scala/kafka/admin/AdminUtils.scala | 5 +- .../scala/kafka/controller/KafkaController.scala | 4 +- .../scala/unit/kafka/admin/AddPartitionsTest.scala | 134 +++++++++++++++++++++ .../test/scala/unit/kafka/admin/AdminTest.scala | 16 +-- 4 files changed, 146 insertions(+), 13 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 38d5bd9..0d38eaa 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -82,9 +82,8 @@ object AdminUtils extends Logging { info("topic creation " + jsonPartitionData.toString) if (!update) { ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) - } - else { - ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) + } else { + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) } debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a620bcf..f5a64ad 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -210,13 +210,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerPreferredReplicaElectionListener() - // register the partition change listeners for all existing topics on failover - ZkUtils.getAllTopics(zkClient).foreach(p => registerPartitionChangeListener(p)) partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() replicaStateMachine.startup() partitionStateMachine.startup() + // register the partition change listeners for all existing topics on failover + ZkUtils.getAllTopics(zkClient).foreach(p => partitionStateMachine.registerPartitionChangeListener(p)) Utils.registerMBean(this, KafkaController.MBeanName) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) initializeAndMaybeTriggerPartitionReassignment() diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala new file mode 100644 index 0000000..e51ca92 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import org.scalatest.junit.JUnit3Suite +import kafka.zk.ZooKeeperTestHarness +import kafka.admin.{AdministrationException, AddPartitionsCommand, CreateTopicCommand} +import kafka.utils.TestUtils._ +import junit.framework.Assert._ +import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.cluster.Broker +import org.I0Itec.zkclient.ZkClient +import kafka.client.ClientUtils +import kafka.server.{KafkaConfig, KafkaServer} + +class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { + val brokerId1 = 0 + val brokerId2 = 1 + val brokerId3 = 2 + val brokerId4 = 3 + + val port1 = TestUtils.choosePort() + val port2 = TestUtils.choosePort() + val port3 = TestUtils.choosePort() + val port4 = TestUtils.choosePort() + + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) + + var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + var brokers: Seq[Broker] = Seq.empty[Broker] + + val partitionId = 0 + + val topic1 = "new-topic1" + val topic2 = "new-topic2" + val topic3 = "new-topic3" + val topic4 = "new-topic4" + + override def setUp() { + super.setUp() + // start all the servers + val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) + val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) + val server3 = TestUtils.createServer(new KafkaConfig(configProps3)) + val server4 = TestUtils.createServer(new KafkaConfig(configProps4)) + + servers ++= List(server1, server2, server3, server4) + brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port)) + + // create topics with 1 partition, 2 replicas, one on each broker + CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1") + CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2") + CreateTopicCommand.createTopic(zkClient, topic3, 1, 2, "2:3") + CreateTopicCommand.createTopic(zkClient, topic4, 1, 2, "0:3") + + + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId, 500) + var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId, 500) + var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId, 500) + + debug("Leader for " + topic1 + " is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1))) + + assertTrue("Leader should get elected", leader1.isDefined) + assertTrue("Leader should get elected", leader2.isDefined) + assertTrue("Leader should get elected", leader3.isDefined) + assertTrue("Leader should get elected", leader4.isDefined) + + assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2)) + assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3)) + assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3)) + } + + override def tearDown() { + servers.map(server => server.shutdown()) + servers.map(server => Utils.rm(server.config.logDirs)) + super.tearDown() + } + + def testTopicDoesNotExist { + try { + AddPartitionsCommand.addPartitions(zkClient, "Blah", 1, 1) + fail("Topic should not exist") + } catch { + case e: AdministrationException => //this is good + case e2 => throw e2 + } + } + + def testIncrementPartitions { + AddPartitionsCommand.addPartitions(zkClient, topic1, 2, 2) + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500) + val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 1).get + val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get + assertEquals(leader1.get, leader1FromZk) + assertEquals(leader2.get, leader2FromZk) + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 2, 1000) + val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", + 2000,0).topicsMetadata + val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) + val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata + assertEquals(partitionDataForTopic1(1).partitionId, 1) + assertEquals(partitionDataForTopic1(2).partitionId, 2) + partitionDataForTopic1(1). + assertEquals(partitionDataForTopic1(1).leader.get, ) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 0d8b70f..dc0013f 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -157,7 +157,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topic = "test" TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap @@ -166,7 +166,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) try { - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) fail("shouldn't be able to create a topic already exists") } catch { case e: TopicExistsException => // this is good @@ -181,7 +181,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 val newReplicas = Seq(0, 2, 3) val partitionToBeReassigned = 0 @@ -206,7 +206,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 val newReplicas = Seq(1, 2, 3) val partitionToBeReassigned = 0 @@ -232,7 +232,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -273,7 +273,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // put the partition in the reassigned path as well // reassign partition 0 val newReplicas = Seq(0, 1) @@ -312,7 +312,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get @@ -333,7 +333,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000) -- 1.7.12.4 (Apple Git-37) From f07a9eb5f7de1f6c060ff40f4d2f004880f8a631 Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Fri, 12 Jul 2013 00:11:21 -0700 Subject: [PATCH 3/3] More unit tests --- .../scala/kafka/admin/AddPartitionsCommand.scala | 13 ++- .../scala/unit/kafka/admin/AddPartitionsTest.scala | 117 ++++++++++++++++++++- 2 files changed, 121 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala index 4a1fc3a..60dd6c9 100644 --- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -57,7 +57,8 @@ object AddPartitionsCommand extends Logging { for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) { if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") + System.err.println("***Please note that this tool can only be used to add partitions when data for a topic does not use a key.***\n" + + "Missing required argument. " + " \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } @@ -94,7 +95,7 @@ object AddPartitionsCommand extends Logging { val newPartitionReplicaList = if (replicaAssignmentStr == "") AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor, partitionStartIndex.head, existingPartitionsReplicaList.size) else - getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) + getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size) info("Add partition list for %s is %s".format(topic, newPartitionReplicaList)) val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) // add the new list @@ -102,9 +103,10 @@ object AddPartitionsCommand extends Logging { AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true) } - def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = { + def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = { val partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() + var partitionId = startPartitionId for (i <- 0 until partitionList.size) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) if (brokerList.size <= 0) @@ -114,9 +116,10 @@ object AddPartitionsCommand extends Logging { if (!brokerList.toSet.subsetOf(availableBrokerList)) throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString + "available broker:" + availableBrokerList.toString) - ret.put(i, brokerList.toList) - if (ret(i).size != ret(0).size) + ret.put(partitionId, brokerList.toList) + if (ret(partitionId).size != ret(startPartitionId).size) throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList) + partitionId = partitionId + 1 } ret.toMap } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index e51ca92..2593732 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -119,16 +119,125 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get assertEquals(leader1.get, leader1FromZk) assertEquals(leader2.get, leader2FromZk) + // read metadata from a broker and verify the new topic partitions exist - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 1, 1000) - TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 2, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2, 1000) val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", 2000,0).topicsMetadata val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata + assertEquals(partitionDataForTopic1.size, 3) assertEquals(partitionDataForTopic1(1).partitionId, 1) assertEquals(partitionDataForTopic1(2).partitionId, 2) - partitionDataForTopic1(1). - assertEquals(partitionDataForTopic1(1).leader.get, ) + val replicas = partitionDataForTopic1(1).replicas + assertEquals(replicas.size, 2) + assert(replicas.contains(partitionDataForTopic1(1).leader.get)) + } + + def testManualAssignmentOfReplicas { + AddPartitionsCommand.addPartitions(zkClient, topic2, 2, 2, "0:1,2:3") + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500) + val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 1).get + val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 2).get + assertEquals(leader1.get, leader1FromZk) + assertEquals(leader2.get, leader2FromZk) + + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2, 1000) + val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", + 2000,0).topicsMetadata + val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) + val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata + assertEquals(partitionDataForTopic2.size, 3) + assertEquals(partitionDataForTopic2(1).partitionId, 1) + assertEquals(partitionDataForTopic2(2).partitionId, 2) + val replicas = partitionDataForTopic2(1).replicas + assertEquals(replicas.size, 2) + assert(replicas(0).id == 0 || replicas(0).id == 1) + assert(replicas(1).id == 0 || replicas(1).id == 1) + } + + def testReplicaPlacement { + AddPartitionsCommand.addPartitions(zkClient, topic3, 6, 4) + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500) + var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 3, 500) + var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 4, 500) + var leader5 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 5, 500) + var leader6 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 6, 500) + + val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 1).get + val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 2).get + val leader3FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 3).get + val leader4FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 4).get + val leader5FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 5).get + val leader6FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 6).get + + assertEquals(leader1.get, leader1FromZk) + assertEquals(leader2.get, leader2FromZk) + assertEquals(leader3.get, leader3FromZk) + assertEquals(leader4.get, leader4FromZk) + assertEquals(leader5.get, leader5FromZk) + assertEquals(leader6.get, leader6FromZk) + + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6, 1000) + + val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", + 2000,0).topicsMetadata + + val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head + val partition1DataForTopic3 = metaDataForTopic3.partitionsMetadata(1) + val partition2DataForTopic3 = metaDataForTopic3.partitionsMetadata(2) + val partition3DataForTopic3 = metaDataForTopic3.partitionsMetadata(3) + val partition4DataForTopic3 = metaDataForTopic3.partitionsMetadata(4) + val partition5DataForTopic3 = metaDataForTopic3.partitionsMetadata(5) + val partition6DataForTopic3 = metaDataForTopic3.partitionsMetadata(6) + + assertEquals(partition1DataForTopic3.replicas.size, 4) + assertEquals(partition1DataForTopic3.replicas(0).id, 3) + assertEquals(partition1DataForTopic3.replicas(1).id, 2) + assertEquals(partition1DataForTopic3.replicas(2).id, 0) + assertEquals(partition1DataForTopic3.replicas(3).id, 1) + + assertEquals(partition2DataForTopic3.replicas.size, 4) + assertEquals(partition2DataForTopic3.replicas(0).id, 0) + assertEquals(partition2DataForTopic3.replicas(1).id, 3) + assertEquals(partition2DataForTopic3.replicas(2).id, 1) + assertEquals(partition2DataForTopic3.replicas(3).id, 2) + + assertEquals(partition3DataForTopic3.replicas.size, 4) + assertEquals(partition3DataForTopic3.replicas(0).id, 1) + assertEquals(partition3DataForTopic3.replicas(1).id, 0) + assertEquals(partition3DataForTopic3.replicas(2).id, 2) + assertEquals(partition3DataForTopic3.replicas(3).id, 3) + + assertEquals(partition4DataForTopic3.replicas.size, 4) + assertEquals(partition4DataForTopic3.replicas(0).id, 2) + assertEquals(partition4DataForTopic3.replicas(1).id, 3) + assertEquals(partition4DataForTopic3.replicas(2).id, 0) + assertEquals(partition4DataForTopic3.replicas(3).id, 1) + + assertEquals(partition5DataForTopic3.replicas.size, 4) + assertEquals(partition5DataForTopic3.replicas(0).id, 3) + assertEquals(partition5DataForTopic3.replicas(1).id, 0) + assertEquals(partition5DataForTopic3.replicas(2).id, 1) + assertEquals(partition5DataForTopic3.replicas(3).id, 2) + + assertEquals(partition6DataForTopic3.replicas.size, 4) + assertEquals(partition6DataForTopic3.replicas(0).id, 0) + assertEquals(partition6DataForTopic3.replicas(1).id, 1) + assertEquals(partition6DataForTopic3.replicas(2).id, 2) + assertEquals(partition6DataForTopic3.replicas(3).id, 3) } } \ No newline at end of file -- 1.7.12.4 (Apple Git-37)