diff --git a/.gitignore b/.gitignore index 99b32a6..b1cbc97 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,5 @@ TAGS .settings .gradle kafka.ipr -kafka.iws \ No newline at end of file +kafka.iws +gradle/wrapper/ diff --git a/core/src/main/scala/kafka/admin/BrokerBalanceCommand.scala b/core/src/main/scala/kafka/admin/BrokerBalanceCommand.scala new file mode 100644 index 0000000..1452139 --- /dev/null +++ b/core/src/main/scala/kafka/admin/BrokerBalanceCommand.scala @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.utils._ +import joptsimple.OptionParser +import org.I0Itec.zkclient.ZkClient +import kafka.cluster.Broker +import scala.collection._ +import scala.collection.mutable.ListBuffer +import scala.util.control.Breaks +import util.control.Breaks._ +import kafka.common.TopicAndPartition +import scala.Some + +object BrokerBalanceCommand extends Logging { + + def main(args : Array[String]) { + + val opts = new BrokerBalanceCommandOptions(args) + if(args.size == 0) { + CommandLineUtils.printUsageAndDie(opts.parser, "cluster load balance") + } + + opts.checkArgs() + + val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + val modeOpt = opts.options.valueOf(opts.modeOpt) + val mode = if(modeOpt == null) true + else modeOpt.toLowerCase + match { + case "y" => true + case "n" => false + case _ => false + } + val includeTopics = opts.options.valueOf(opts.includeOpt) + val excludeTopics = opts.options.valueOf(opts.excludeOpt) + val includeTopicSet = if(includeTopics == null) Set[String]() else includeTopics.split(",").toSet + val excludeTopicSet = if(excludeTopics == null) Set[String]() else excludeTopics.split(",").toSet + + + + val brokerBalanceCommand = new BrokerBalanceCommand(zkClient, includeTopicSet, excludeTopicSet, mode) + + Runtime.getRuntime().addShutdownHook(new Thread() { + override def run() = { + brokerBalanceCommand.shutdown + } + }) + + brokerBalanceCommand.executeBalance() + + } + + class BrokerBalanceCommandOptions(args : Array[String]) { + + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + + val modeOpt = parser.accepts("interactive", "OPTIONAL: The balance mode, decide whether to confirm before balancing each time(interactive by default)") + .withOptionalArg() + .describedAs("Y/N") + .ofType(classOf[String]) + + val includeOpt = parser.accepts("include", "OPTIONAL: The topic list need to balance") + .withOptionalArg() + .describedAs("topic1,topic2,...topicN") + .ofType(classOf[String]) + + val excludeOpt = parser.accepts("exclude", "OPTIONAL: The topic list that do not need to balance") + .withOptionalArg() + .describedAs("topic1,topic2,...topicN") + .ofType(classOf[String]) + + val helpOpt = parser.accepts("help", "Print usage information.") + + val options = parser.parse(args : _*) + + def checkArgs() { + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + } + + } +} + +class BrokerBalanceCommand(zkClient: ZkClient, includeTopicSet: Set[String], excludeTopicSet: Set[String], mode: Boolean = true) extends Logging { + + private var isStop = false + private var brokers: Seq[Broker] = null + private var topics: Seq[String] = null + private var allTopicsAssignment: Map[TopicAndPartition, Seq[Int]] = null + + + def refreshMeta() { + brokers = ZkUtils.getAllBrokersInCluster(zkClient) + topics = ZkUtils.getAllTopics(zkClient) + println("All topics[%s]".format(topics.toString())) + println("Brokers size: %d".format(brokers.size)) + allTopicsAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, topics) + + } + + def filterValidTopicAssignment() = { + val groupedByTopic = allTopicsAssignment.groupBy(tp => tp._1.topic) + /** + * check replicas: + * replicas amount should be more than 0 and less than 3 + * all partitions should have the same amount of replicas + * + */ + var validTopicAssignment = groupedByTopic.filter( + t => { + t._2.head._2.size > 0 && t._2.head._2.size < 3 && t._2.values.map(seq => seq.length).toSet.size == 1 + } + ) + + if(includeTopicSet.size != 0) { + validTopicAssignment = validTopicAssignment.filter(topicInfo => includeTopicSet.contains(topicInfo._1)) + } + + if(excludeTopicSet.size != 0) { + validTopicAssignment = validTopicAssignment.filter(topicInfo => (! excludeTopicSet.contains(topicInfo._1))) + } + + validTopicAssignment + } + + def executeBalance() { + + // make sure execute leaderRebalance first + + refreshMeta() + + val validTopicAssignment = filterValidTopicAssignment() + + val partitionOptimizer = new PartitionOptimizer(brokers) + + for((topicName, topicAssignment) <- validTopicAssignment) { + + breakable { + if(! isStop) { + + if(mode) { + println("Please confirm to process topic [%s]".format(topicName)) + val ret = StdInUtils.readYesOrNo() + if(! ret) { + println("Cancel processing topic [%s]".format(topicName)) + break + } + } + + val newTopicAssignment = partitionOptimizer.optimize(topicAssignment) + + val partitionsToBeReassigned = removeUnchanged(topicAssignment, newTopicAssignment) + + if(partitionsToBeReassigned.size > 0) { + + executeAssignment(partitionsToBeReassigned) + var isFinished = false + do { + isFinished = verifyAssignment(partitionsToBeReassigned) + Thread.sleep(10000) + } while((! isFinished) && (! isStop)) + + println("Finished processing topic [%s]".format(topicName)) + + } else { + println("Topic[%s] is well distribute, do not need to balance".format(topicName)) + } + } else { + println("Shutdown complete") + return + } + } + } + + } + + def removeUnchanged(oldTopicAssignment: Map[TopicAndPartition, Seq[Int]], + newTopicAssignment: Map[TopicAndPartition, Seq[Int]]): mutable.Map[TopicAndPartition, Seq[Int]] = { + + val retTopicAssignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]() + newTopicAssignment.foreach( + e => { + oldTopicAssignment.get(e._1) match { + case Some(seq) => + if(seq != e._2) { + retTopicAssignment.put(e._1, e._2) + } + case None => + } + } + ) + retTopicAssignment + + } + + def executeAssignment(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { + + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + // before starting assignment, output the current replica assignment to facilitate rollback + val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) + println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback" + .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) + // start the reassignment + if(reassignPartitionsCommand.reassignPartitions()) + println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))) + else + println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + + } + + def checkIfReassignmentSucceeded(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) + :Map[TopicAndPartition, ReassignmentStatus] = { + + + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) + partitionsToBeReassigned.map { topicAndPartition => + (topicAndPartition._1, ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, + topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) + } + } + + def verifyAssignment(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): Boolean = { + println("Status of partition reassignment:") + var isFinished = true + val reassignedPartitionsStatus = checkIfReassignmentSucceeded(partitionsToBeReassigned) + reassignedPartitionsStatus.foreach { partition => + partition._2 match { + case ReassignmentCompleted => + println("Reassignment of partition %s completed successfully".format(partition._1)) + case ReassignmentFailed => + println("Reassignment of partition %s failed".format(partition._1)) + case ReassignmentInProgress => + println("Reassignment of partition %s is still in progress".format(partition._1)) + isFinished = false + } + } + isFinished + } + + def shutdown() { + isStop = true + } + + +} + + + diff --git a/core/src/main/scala/kafka/admin/PartitionOptimizer.scala b/core/src/main/scala/kafka/admin/PartitionOptimizer.scala new file mode 100644 index 0000000..059f44e --- /dev/null +++ b/core/src/main/scala/kafka/admin/PartitionOptimizer.scala @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import scala.collection._ + +class PartitionOptimizer(brokers: Seq[Broker]) { + + val brokerIdList = brokers.map(_.id.toInt).sorted + val nBrokers = brokerIdList.size + + + def findStartIndex(targetBrokerId: Int) = brokerIdList.indexOf(targetBrokerId) + + + def optimize(topicAssignment: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, Seq[Int]] = { + + val replicaFactor = topicAssignment.head._2.size + val nPartitions = topicAssignment.size + + val oldReplicaAssignment = new Array[Array[Int]](replicaFactor) + for(i <- 0 until replicaFactor) { + val replicaAssignment = new Array[Int](nPartitions) + topicAssignment.foreach(ta => replicaAssignment(ta._1.partition) = ta._2(i)) + oldReplicaAssignment(i) = replicaAssignment + } + + val newReplicaAssignment = new Array[Array[Int]](replicaFactor) + + for(i <- 0 until replicaFactor) { + val replicaAssignment = new Array[Int](nPartitions) + replicaAssignment(0) = oldReplicaAssignment(i)(0) + val startIndex = findStartIndex(replicaAssignment(0)) + for(j <- 1 until nPartitions) { + replicaAssignment(j) = brokerIdList((startIndex + j) % nBrokers) + } + newReplicaAssignment(i) = replicaAssignment + } + + val topicName = topicAssignment.head._1.topic + val newTopicAssignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]() + + for(i <- 0 until nPartitions) { + val arr = new Array[Int](replicaFactor) + + for(j <- 0 until replicaFactor) { + arr(j) = newReplicaAssignment(j)(i) + } + newTopicAssignment.put(new TopicAndPartition(topicName, i), arr.toSeq) + + } + + immutable.HashMap(newTopicAssignment.toSeq:_*) + + } + + + +} diff --git a/core/src/main/scala/kafka/admin/StdInUtils.scala b/core/src/main/scala/kafka/admin/StdInUtils.scala new file mode 100644 index 0000000..887a8ac --- /dev/null +++ b/core/src/main/scala/kafka/admin/StdInUtils.scala @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import util.control.Breaks._ + +object StdInUtils { + + def readYesOrNo(): Boolean = { + + val prompt = "Please input y/n" + println(prompt) + + while(true) { + var line: String = readLine() + breakable { + if(line == null) { + break + } + line = line.toLowerCase + if(line.equals("y") || line.equals("n")) { + val ret = line.toLowerCase match { + case "y" => true + case "n" => false + case _ => false + + } + return ret + } else { + println(prompt) + } + + } + } + + false + } + +} diff --git a/core/src/test/scala/unit/kafka/admin/BrokerBalanceTest.scala b/core/src/test/scala/unit/kafka/admin/BrokerBalanceTest.scala new file mode 100644 index 0000000..fabb907 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/BrokerBalanceTest.scala @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.admin + +import org.scalatest.junit.JUnit3Suite +import kafka.utils.{ZkUtils, Utils, TestUtils, Logging} +import org.junit.Test +import junit.framework.Assert._ +import kafka.zk.ZooKeeperTestHarness +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.cluster.Broker +import kafka.utils.TestUtils._ +import kafka.admin.BrokerBalanceCommand +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +class BrokerBalanceTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { + + val brokerId1 = 1 + val brokerId2 = 2 + val brokerId3 = 3 + val brokerId4 = 4 + + val port1 = TestUtils.choosePort() + val port2 = TestUtils.choosePort() + val port3 = TestUtils.choosePort() + val port4 = TestUtils.choosePort() + + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, false) + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, false) + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, false) + + var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + var brokers: Seq[Broker] = Seq.empty[Broker] + + val partitionId = 0 + + val topic1 = "new-topic1" + val topic2 = "new-topic2" + val topic3 = "new-topic3" + + override def setUp() { + super.setUp() + // start all the servers + val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) + val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) + val server3 = TestUtils.createServer(new KafkaConfig(configProps3)) + + + servers ++= List(server1, server2, server3) + brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port)) + + // create topics first +// createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(1,2)), servers = servers) +// createTopic(zkClient, topic2, partitionReplicaAssignment = Map(0->Seq(1,2), 1->Seq(1,2)), servers = servers) + createTopic(zkClient, topic3, partitionReplicaAssignment = Map(0->Seq(2,1), 1->Seq(1,2), 2->Seq(1,3), 3->Seq(1,3), 4->Seq(1,3), 5->Seq(1,2)), servers = servers) + + } + + override def tearDown() { + servers.map(server => server.shutdown()) + servers.map(server => Utils.rm(server.config.logDirs)) + super.tearDown() + } + + @Test + def testBrokerBalance() { + val brokerBalanceCommand = new BrokerBalanceCommand(zkClient, Set[String](), Set[String](), false) + brokerBalanceCommand.executeBalance() + + val topicAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, Seq(topic3)) + val replicaFactor = topicAssignment.head._2.size + val brokerToReplicaArr = new Array[mutable.HashMap[Int, mutable.Set[Int]]](replicaFactor) + for(i <- 0 until replicaFactor) { + brokerToReplicaArr(i) = mutable.HashMap.empty[Int, mutable.Set[Int]] + for(broker <- brokers) { + brokerToReplicaArr(i).put(broker.id, new mutable.HashSet[Int]()) + } + } + + topicAssignment.foreach( + assignment => { + val replicas = assignment._2 + for(i <- 0 until replicas.size) { + brokerToReplicaArr(i).get(replicas(i)) match { + case Some(set) => + set += assignment._1.partition + case None => + val set = new mutable.HashSet[Int]() + set.add(assignment._1.partition) + brokerToReplicaArr(i).put(replicas(i), set) + } + } + } + ) + + for(i <- 0 until replicaFactor) { + val max = brokerToReplicaArr(i).maxBy(_._2.size)._2.size + val min = brokerToReplicaArr(i).minBy(_._2.size)._2.size + assertTrue("replica max %d, replica min %d".format(max, min), max - min < 2) + } + + println("TestBrokerBalance finished") + + + + + } + +} diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..b761216 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..1d24391 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Wed Apr 08 20:30:35 CST 2015 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-2.0-bin.zip