diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index ebcf669..336cd17 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -21,6 +21,8 @@ import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.common.{TopicAndPartition, AdminCommandFailedException} +import collection._ +import mutable.ListBuffer object PreferredReplicaLeaderElectionCommand extends Logging { @@ -28,7 +30,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val parser = new OptionParser val jsonFileOpt = parser.accepts("path-to-json-file", "The JSON file with the list of partitions " + "for which preferred replica leader election should be done, in the following format - \n" + - "[{\"topic\": \"foo\", \"partition\": \"1\"}, {\"topic\": \"foobar\", \"partition\": \"2\"}]. \n" + + "{\"version\": 1,\n \"partitions\":\n\t[{\"topic\": \"foo\", \"partition\": 1},\n\t {\"topic\": \"foobar\", \"partition\": 2}]\n}\n" + "Defaults to all existing partitions") .withRequiredArg .describedAs("list of partitions for which preferred replica leader election needs to be triggered") @@ -76,14 +78,18 @@ object PreferredReplicaLeaderElectionCommand extends Logging { } } - def parsePreferredReplicaJsonData(jsonString: String): Set[TopicAndPartition] = { + def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = { Json.parseFull(jsonString) match { - case Some(partitionList) => - val partitions = (partitionList.asInstanceOf[List[Any]]) - Set.empty[TopicAndPartition] ++ partitions.map { m => - val topic = m.asInstanceOf[Map[String, String]].get("topic").get - val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt - TopicAndPartition(topic, partition) + case Some(m) => + m.asInstanceOf[Map[String, Any]].get("partitions") match { + case Some(partitionsList) => + val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]] + partitions.map { p => + val topic = p.get("topic").get.asInstanceOf[String] + val partition = p.get("partition").get.asInstanceOf[Int] + TopicAndPartition(topic, partition) + }.toSet + case None => throw new AdministrationException("Preferred replica election data is empty") } case None => throw new AdministrationException("Preferred replica election data is empty") } @@ -92,9 +98,13 @@ object PreferredReplicaLeaderElectionCommand extends Logging { def writePreferredReplicaElectionData(zkClient: ZkClient, partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) { val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath - val jsonData = Utils.seqToJson(partitionsUndergoingPreferredReplicaElection.map { p => - Utils.mapToJson(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)), valueInQuotes = true) - }.toSeq.sorted, valueInQuotes = false) + var partitionsData: mutable.ListBuffer[String] = ListBuffer[String]() + for (p <- partitionsUndergoingPreferredReplicaElection) { + partitionsData += Utils.mergeJsonFields(Utils.mapToJsonFields(Map("topic" -> p.topic), valueInQuotes = true) ++ + Utils.mapToJsonFields(Map("partition" -> p.partition.toString), valueInQuotes = false)) + } + val jsonPartitionsData = Utils.seqToJson(partitionsData, valueInQuotes = false) + val jsonData = Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonPartitionsData), valueInQuotes = false) try { ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) info("Created preferred replica election path with %s".format(jsonData)) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index b2204b8..7c34ce4 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -28,7 +28,7 @@ object ReassignPartitionsCommand extends Logging { val parser = new OptionParser val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " + "new replicas they should be reassigned to in the following format - \n" + - "[{\"topic\": \"foo\", \"partition\": \"1\", \"replicas\": \"1,2,3\" }]") + "{\"version\": 1,\n \"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }]\n}") .withRequiredArg .describedAs("partition reassignment json file path") .ofType(classOf[String]) @@ -55,18 +55,9 @@ object ReassignPartitionsCommand extends Logging { try { // read the json file into a string - val partitionsToBeReassigned = Json.parseFull(jsonString) match { - case Some(reassignedPartitions) => - val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] - partitions.map { m => - val topic = m.asInstanceOf[Map[String, String]].get("topic").get - val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt - val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get - val newReplicas = replicasList.split(",").map(_.toInt) - (TopicAndPartition(topic, partition), newReplicas.toSeq) - }.toMap - case None => throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile)) - } + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) + if (partitionsToBeReassigned.isEmpty) + throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile)) zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) @@ -94,15 +85,15 @@ object ReassignPartitionsCommand extends Logging { } } -class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immutable.Map[TopicAndPartition, Seq[Int]]) +class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]]) extends Logging { def reassignPartitions(): Boolean = { try { val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition)) - val jsonReassignmentData = Utils.mapWithSeqValuesToJson(validPartitions.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2)) + val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions) ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData) true - }catch { + } catch { case ze: ZkNodeExistsException => val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) throw new AdminCommandFailedException("Partition reassignment currently in " + diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 916fb59..587247e 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -19,7 +19,6 @@ package kafka.api import java.nio._ import kafka.message._ -import scala.collection.Map import kafka.api.ApiUtils._ import kafka.common._ import kafka.network.RequestChannel.Response diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index c9e4127..b266f3f 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -213,8 +213,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = { info("begin registering consumer " + consumerIdString + " in ZK") val consumerRegistrationInfo = - Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false), - Utils.mapToJson(Map("pattern" -> topicCount.pattern), valueInQuotes = true))) + Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false) + ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true)) createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo) info("end registering consumer " + consumerIdString + " in ZK") } diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index f3a5095..7f0d1ce 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -35,7 +35,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index fe4c925..c639efb 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -465,27 +465,42 @@ object Utils extends Logging { def nullOrEmpty(s: String): Boolean = s == null || s.equals("") /** - * Format a Map[String, String] as JSON object. + * Merge JSON fields of the format "key" : value/object/array. */ - def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = { + def mergeJsonFields(objects: Seq[String]): String = { val builder = new StringBuilder builder.append("{ ") - var numElements = 0 - for ( (key, value) <- jsonDataMap.toList.sorted) { - if (numElements > 0) - builder.append(", ") + builder.append(objects.sorted.map(_.trim).mkString(", ")) + builder.append(" }") + builder.toString + } + + /** + * Format a Map[String, String] as JSON object. + */ + def mapToJsonFields(jsonDataMap: Map[String, String], valueInQuotes: Boolean): Seq[String] = { + val jsonFields: mutable.ListBuffer[String] = ListBuffer() + val builder = new StringBuilder + for ((key, value) <- jsonDataMap.toList.sorted) { builder.append("\"" + key + "\":") if (valueInQuotes) builder.append("\"" + value + "\"") else builder.append(value) - numElements += 1 + jsonFields += builder.toString + builder.clear() } - builder.append(" }") - builder.toString + jsonFields } /** + * Format a Map[String, String] as JSON object. + */ + def mapToJson(jsonDataMap: Map[String, String], valueInQuotes: Boolean): String = { + mergeJsonFields(mapToJsonFields(jsonDataMap, valueInQuotes)) + } + + /** * Format a Seq[String] as JSON array. */ def seqToJson(jsonData: Seq[String], valueInQuotes: Boolean): String = { @@ -504,45 +519,11 @@ object Utils extends Logging { */ def mapWithSeqValuesToJson(jsonDataMap: Map[String, Seq[Int]]): String = { - val builder = new StringBuilder - builder.append("{ ") - var numElements = 0 - for ((key, value) <- jsonDataMap.toList.sortBy(_._1)) { - if (numElements > 0) - builder.append(", ") - builder.append("\"" + key + "\": ") - builder.append(Utils.seqToJson(value.map(_.toString), valueInQuotes = false)) - numElements += 1 - } - builder.append(" }") - builder.toString - } - - - /** - * Merge arbitrary JSON objects. - */ - def mergeJsonObjects(objects: Seq[String]): String = { - val builder = new StringBuilder - builder.append("{ ") - var obs = List[String]() - objects.foreach(ob => obs = obs ::: getJsonContents(ob).split(',').toList) - obs = obs.sorted.map(_.trim) - builder.append(obs.mkString(", ")) - builder.append(" }") - builder.toString + mergeJsonFields(mapToJsonFields(jsonDataMap.map(e => (e._1 -> seqToJson(e._2.map(_.toString), valueInQuotes = false))), + valueInQuotes = false)) } /** - * Get the contents of a JSON object or array. - */ - def getJsonContents(str: String): String = { - str.trim().substring(1, str.length - 1) - } - - - - /** * Create a circular (looping) iterator over a collection. * @param coll An iterable over the underlying collection. * @return A circular iterator over the collection. diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index f0aba12..9a0e250 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -24,6 +24,7 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, import org.I0Itec.zkclient.serialize.ZkSerializer import collection._ import kafka.api.LeaderAndIsr +import mutable.ListBuffer import org.apache.zookeeper.data.Stat import java.util.concurrent.locks.{ReentrantLock, Condition} import kafka.admin._ @@ -183,8 +184,9 @@ object ZkUtils extends Logging { def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val brokerInfo = - Utils.mergeJsonObjects(Seq(Utils.mapToJson(Map("host" -> host), valueInQuotes = true), - Utils.mapToJson(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString), valueInQuotes = false))) + Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++ + Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString), + valueInQuotes = false)) try { createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo) } catch { @@ -209,7 +211,7 @@ object ZkUtils extends Logging { * Get JSON partition to replica map from zookeeper. */ def replicaAssignmentZkdata(map: Map[String, Seq[Int]]): String = { - val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map.map(e => (e._1.toString -> e._2))) + val jsonReplicaAssignmentMap = Utils.mapWithSeqValuesToJson(map) Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> jsonReplicaAssignmentMap), valueInQuotes = false) } @@ -559,26 +561,41 @@ object ZkUtils extends Logging { jsonPartitionMapOpt match { case Some(jsonPartitionMap) => val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap) - reassignedPartitions.map { p => - val newReplicas = p._2 - (p._1 -> new ReassignedPartitionsContext(newReplicas)) - } + reassignedPartitions.map(p => (p._1 -> new ReassignedPartitionsContext(p._2))) case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext] } } - def parsePartitionReassignmentData(jsonData: String):Map[TopicAndPartition, Seq[Int]] = { + def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = { + val reassignedPartitions: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map() Json.parseFull(jsonData) match { case Some(m) => - val replicaMap = m.asInstanceOf[Map[String, Seq[Int]]] - replicaMap.map { reassignedPartitions => - val topic = reassignedPartitions._1.split(",").head.trim - val partition = reassignedPartitions._1.split(",").last.trim.toInt - val newReplicas = reassignedPartitions._2 - TopicAndPartition(topic, partition) -> newReplicas + m.asInstanceOf[Map[String, Any]].get("partitions") match { + case Some(partitionsSeq) => + partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => { + val topic = p.get("topic").get.asInstanceOf[String] + val partition = p.get("partition").get.asInstanceOf[Int] + val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]] + reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas + }) + case None => } - case None => Map.empty[TopicAndPartition, Seq[Int]] + case None => + } + reassignedPartitions + } + + def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { + var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]() + for (p <- partitionsToBeReassigned) { + val jsonReplicasData = Utils.seqToJson(p._2.map(_.toString), valueInQuotes = false) + val jsonTopicData = Utils.mapToJsonFields(Map("topic" -> p._1.topic), valueInQuotes = true) + val jsonPartitionData = Utils.mapToJsonFields(Map("partition" -> p._1.partition.toString, "replicas" -> jsonReplicasData), + valueInQuotes = false) + jsonPartitionsData += Utils.mergeJsonFields(jsonTopicData ++ jsonPartitionData) } + Utils.mapToJson(Map("version" -> 1.toString, "partitions" -> Utils.seqToJson(jsonPartitionsData.toSeq, valueInQuotes = false)), + valueInQuotes = false) } def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { @@ -588,11 +605,11 @@ object ZkUtils extends Logging { deletePath(zkClient, zkPath) info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath)) case _ => - val jsonData = Utils.mapWithSeqValuesToJson(partitionsToBeReassigned.map(p => ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2)) + val jsonData = getPartitionReassignmentZkData(partitionsToBeReassigned) try { updatePersistentPath(zkClient, zkPath, jsonData) info("Updated partition reassignment path with %s".format(jsonData)) - }catch { + } catch { case nne: ZkNoNodeException => ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData)) diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index 0b6244f..6b21554 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -23,8 +23,7 @@ import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Assert._ import kafka.common.KafkaException -import org.junit.{Test} -import kafka.tools.KafkaMigrationTool +import org.junit.Test class UtilsTest extends JUnitSuite {