Index: core/src/test/scala/unit/kafka/admin/AdminTest.scala =================================================================== --- core/src/test/scala/unit/kafka/admin/AdminTest.scala (revision 0) +++ core/src/test/scala/unit/kafka/admin/AdminTest.scala (revision 0) @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import junit.framework.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnit3Suite +import kafka.zk.ZooKeeperTestHarness +import kafka.utils.TestZKUtils + +class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { + val zkConnect = TestZKUtils.zookeeperConnect + + @Test + def testReplicaAssignment() { + val brokerList = List("0", "1", "2", "3", "4") + + // test 0 replication factor + try { + AdminUtils.assginReplicasToBrokers(brokerList, 10, 0) + fail("shouldn't allow replication factor 0") + } + catch { + case e: AdministrationException => // this is good + case e2 => throw e2 + } + + // test wrong replication factor + try { + AdminUtils.assginReplicasToBrokers(brokerList, 10, 6) + fail("shouldn't allow replication factor larger than # of brokers") + } + catch { + case e: AdministrationException => // this is good + case e2 => throw e2 + } + + // correct assignment + { + val expectedAssignment = Array( + List("0", "1", "2"), + List("1", "2", "3"), + List("2", "3", "4"), + List("3", "4", "0"), + List("4", "0", "1"), + List("0", "2", "3"), + List("1", "3", "4"), + List("2", "4", "0"), + List("3", "0", "1"), + List("4", "1", "2") + ) + + val actualAssignment = AdminUtils.assginReplicasToBrokers(brokerList, 10, 3, 0) + val e = (expectedAssignment.toList == actualAssignment.toList) + assertTrue(expectedAssignment.toList == actualAssignment.toList) + } + } + + @Test + def testManualReplicaAssignment() { + val brokerList = Set("0", "1", "2", "3", "4") + + // duplicated brokers + try { + val replicationAssignmentStr = "0,0,1:1,2,3" + CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) + fail("replication assginment shouldn't have duplicated brokers") + } + catch { + case e: AdministrationException => // this is good + case e2 => throw e2 + } + + // non-exist brokers + try { + val replicationAssignmentStr = "0,1,2:1,2,7" + CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) + fail("replication assginment shouldn't contain non-exist brokers") + } + catch { + case e: AdministrationException => // this is good + case e2 => throw e2 + } + + // inconsistent replication factor + try { + val replicationAssignmentStr = "0,1,2:1,2" + CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) + fail("all partitions should have the same replication factor") + } + catch { + case e: AdministrationException => // this is good + case e2 => throw e2 + } + + // good assignment + { + val replicationAssignmentStr = "0:1:2,1:2:3" + val expectedReplicationAssignment = Array( + List("0", "1", "2"), + List("1", "2", "3") + ) + val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) + assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList) + } + } + + @Test + def testTopicCreationInZK() { + val expectedReplicationAssignment = Array( + List("0", "1", "2"), + List("1", "2", "3"), + List("2", "3", "4"), + List("3", "4", "0"), + List("4", "0", "1"), + List("0", "2", "3"), + List("1", "3", "4"), + List("2", "4", "0"), + List("3", "0", "1"), + List("4", "1", "2"), + List("1", "2", "3"), + List("1", "3", "4") + ) + val topic = "test" + AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client) + val actualReplicationAssignment = AdminUtils.getTopicMetaDataFromZK(topic, zookeeper.client).get.map(p => p.replicaList) + assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList) + + try { + AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client) + fail("shouldn't be able to create a topic already exist") + } + catch { + case e: AdministrationException => // this is good + case e2 => throw e2 + } + } +} Index: core/src/main/scala/kafka/admin/DeleteTopicCommand.scala =================================================================== --- core/src/main/scala/kafka/admin/DeleteTopicCommand.scala (revision 0) +++ core/src/main/scala/kafka/admin/DeleteTopicCommand.scala (revision 0) @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple.OptionParser +import org.I0Itec.zkclient.ZkClient +import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} + +object DeleteTopicCommand { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, zkConnectOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val topic = options.valueOf(topicOpt) + val zkConnect = options.valueOf(zkConnectOpt) + var zkClient: ZkClient = null + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) + println("deletion succeeded!") + } + catch { + case e => + println("delection failed because of " + e.getMessage) + println(Utils.stackTrace(e)) + } + finally { + if (zkClient != null) + zkClient.close() + } + } +} \ No newline at end of file Index: core/src/main/scala/kafka/admin/CreateTopicCommand.scala =================================================================== --- core/src/main/scala/kafka/admin/CreateTopicCommand.scala (revision 0) +++ core/src/main/scala/kafka/admin/CreateTopicCommand.scala (revision 0) @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple.OptionParser +import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} +import org.I0Itec.zkclient.ZkClient + +object CreateTopicCommand { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be created.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val nPartitionsOpt = parser.accepts("partition", "number of partitions in the topic") + .withRequiredArg + .describedAs("# of partitions") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val replicationFactorOpt = parser.accepts("replica", "replication factor for each partitions in the topic") + .withRequiredArg + .describedAs("replication factor") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manuallly assigning replicas to brokers") + .withRequiredArg + .describedAs("broker_id_for_part1_replica1:broker_id_for_part1_replica2,broker_id_for_part2_replica1:broker_id_for_part2_replica2, ...") + .ofType(classOf[String]) + .defaultsTo("") + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, zkConnectOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val topic = options.valueOf(topicOpt) + val zkConnect = options.valueOf(zkConnectOpt) + val nPartitions = options.valueOf(nPartitionsOpt).intValue + val replicationFactor = options.valueOf(replicationFactorOpt).intValue + val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) + var zkClient: ZkClient = null + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val brokerList = ZkUtils.getSortedBrokerList(zkClient) + var replicaAssignment: Seq[List[String]] = null + + if (replicaAssignmentStr == "") + replicaAssignment = AdminUtils.assginReplicasToBrokers(brokerList, nPartitions, replicationFactor) + else + replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) + AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient) + println("creation succeeded!") + } + catch { + case e => + println("creation failed because of " + e.getMessage) + println(Utils.stackTrace(e)) + } + finally { + if (zkClient != null) + zkClient.close() + } + } + + def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Array[List[String]] = { + val partitionList = replicaAssignmentList.split(",") + val ret = new Array[List[String]](partitionList.size) + for (i <- 0 until partitionList.size) { + val brokerList = partitionList(i).split(":") + if (brokerList.size <= 0) + throw new AdministrationException("replication factor must be larger than 0") + if (brokerList.size != brokerList.toSet.size) + throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList) + if (!brokerList.toSet.subsetOf(availableBrokerList)) + throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList + + "available broker:" + availableBrokerList) + ret(i) = brokerList.toList + if (ret(i).size != ret(0).size) + throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList) + } + ret + } +} Index: core/src/main/scala/kafka/admin/AdminUtils.scala =================================================================== --- core/src/main/scala/kafka/admin/AdminUtils.scala (revision 0) +++ core/src/main/scala/kafka/admin/AdminUtils.scala (revision 0) @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import java.util.Random +import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.exception.ZkNodeExistsException +import kafka.utils.{SystemTime, Utils, ZkUtils} + +object AdminUtils { + val rand = new Random + + /** + * There are 2 goals of replica assignment: + * 1. Spread the replicas evenly among brokers. + * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers. + * + * To achieve this goal, we: + * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list. + * 2. Assign the remaining replicas of each partition with an increasing shift. + * + * Here is an example of assigning + * broker-0 broker-1 broker-2 broker-3 broker-4 + * p0 p1 p2 p3 p4 (1st replica) + * p5 p6 p7 p8 p9 (1st replica) + * p4 p0 p1 p2 p3 (2nd replica) + * p8 p9 p5 p6 p7 (2nd replica) + * p3 p4 p0 p1 p2 (3nd replica) + * p7 p8 p9 p5 p6 (3nd replica) + */ + def assginReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int, + fixedStartIndex: Int = -1) // for testing only + : Array[List[String]] = { + if (nPartitions <= 0) + throw new AdministrationException("number of partitions must be larger than 0") + if (replicationFactor <= 0) + throw new AdministrationException("replication factor must be larger than 0") + if (replicationFactor > brokerList.size) + throw new AdministrationException("replication factor: " + replicationFactor + + " larger than available brokers: " + brokerList.size) + val ret = new Array[List[String]](nPartitions) + val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) + + var secondReplicaShift = -1 + for (i <- 0 until nPartitions) { + if (i % brokerList.size == 0) + secondReplicaShift += 1 + val firstReplicaIndex = (i + startIndex) % brokerList.size + var replicaList = List(brokerList(firstReplicaIndex)) + for (j <- 0 until replicationFactor - 1) + replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) + ret(i) = replicaList.reverse + } + + ret + } + + def createReplicaAssignmentPathInZK(topic: String, replicaAssignmentList: Seq[List[String]], zkClient: ZkClient) { + try { + val topicVersion = SystemTime.milliseconds + ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerTopicsPath + "/" + topic, topicVersion.toString) + for (i <- 0 until replicaAssignmentList.size) { + val zkPath = ZkUtils.getTopicPartReplicasPath(topic, i.toString) + ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i))) + } + } + catch { + case e: ZkNodeExistsException => + throw new AdministrationException("topic " + topic + " already exists, with version " + + ZkUtils.getTopicVersion (zkClient, topic)) + case e2 => + throw new AdministrationException(e2.toString) + } + } + + def getTopicMetaDataFromZK(topic: String, zkClient: ZkClient): Option[Seq[PartitionMetaData]] = { + if (!ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) + return None + + val topicPartitionsPath = ZkUtils.getTopicPartsPath(topic) + val partitions = ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).sortWith((s,t) => s.toInt < t.toInt) + val ret = new Array[PartitionMetaData](partitions.size) + for (i <-0 until ret.size) { + val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartReplicasPath(topic, partitions(i))) + val inSync = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartInSyncPath(topic, partitions(i))) + val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartLeaderPath(topic, partitions(i))) + ret(i) = new PartitionMetaData(partitions(i), + Utils.getCSVList(replicas).toList, + Utils.getCSVList(inSync).toList, + if (leader == null) None else Some(leader) + ) + } + Some(ret) + } + + private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { + val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) + (firstReplicaIndex + shift) % nBrokers + } +} + +class AdministrationException(val errorMessage: String) extends RuntimeException(errorMessage) { + def this() = this(null) +} Index: core/src/main/scala/kafka/admin/ListTopicCommand.scala =================================================================== --- core/src/main/scala/kafka/admin/ListTopicCommand.scala (revision 0) +++ core/src/main/scala/kafka/admin/ListTopicCommand.scala (revision 0) @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple.OptionParser +import org.I0Itec.zkclient.ZkClient +import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} + +object ListTopicCommand { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be deleted.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + .defaultsTo("") + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + + val options = parser.parse(args : _*) + + for(arg <- List(zkConnectOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val topic = options.valueOf(topicOpt) + val zkConnect = options.valueOf(zkConnectOpt) + var zkClient: ZkClient = null + try { + var topicList: Seq[String] = Nil + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + + if (topic == "") + topicList = ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath) + else + topicList = List(topic) + + if (topicList.size <= 0) + println("no topics exist!") + + for (t <- topicList) + showTopic(t, zkClient) + } + catch { + case e => + println("list topic failed because of " + e.getMessage) + println(Utils.stackTrace(e)) + } + finally { + if (zkClient != null) + zkClient.close() + } + } + + def showTopic(topic: String, zkClient: ZkClient) { + val topicMetaData = AdminUtils.getTopicMetaDataFromZK(topic, zkClient) + topicMetaData match { + case None => + println("topic " + topic + " doesn't exist!") + case Some(tmd) => + println("topic: " + topic) + for (part <- tmd) + println(part.toString) + } + } +} \ No newline at end of file Index: core/src/main/scala/kafka/admin/PartitionMetaData.scala =================================================================== --- core/src/main/scala/kafka/admin/PartitionMetaData.scala (revision 0) +++ core/src/main/scala/kafka/admin/PartitionMetaData.scala (revision 0) @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +class PartitionMetaData(val partitionId: String, + val replicaList: Seq[String], + val inSyncList: Seq[String], + val leaderId: Option[String]) { + + override def toString(): String = { + val builder = new StringBuilder + builder.append("partition id: " + partitionId) + builder.append(" replica list: " + replicaList.mkString(",")) + builder.append(" in-sync list: " + inSyncList.mkString(",")) + builder.append(" leader: " + leaderId) + builder.toString + } + +} \ No newline at end of file Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1230848) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -553,6 +553,16 @@ } } + def seqToCSV(seq: Seq[String]): String = { + var csvString = "" + for (i <- 0 until seq.size) { + if (i > 0) + csvString = csvString + ',' + csvString = csvString + seq(i) + } + csvString + } + def getTopicRentionHours(retentionHours: String) : Map[String, Int] = { val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: " val successMsg = "The retention hour for " Index: core/src/main/scala/kafka/utils/ZkUtils.scala =================================================================== --- core/src/main/scala/kafka/utils/ZkUtils.scala (revision 1230848) +++ core/src/main/scala/kafka/utils/ZkUtils.scala (working copy) @@ -29,6 +29,38 @@ val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" + def getTopicPath(topic: String): String ={ + BrokerTopicsPath + "/" + topic + } + + def getTopicPartsPath(topic: String): String ={ + getTopicPath(topic) + "/" + "partitions" + } + + def getTopicPartPath(topic: String, partitionId: String): String ={ + getTopicPartsPath(topic) + "/" + partitionId + } + + def getTopicVersion(zkClient: ZkClient, topic: String): String ={ + readDataMaybeNull(zkClient, getTopicPath(topic)) + } + + def getTopicPartReplicasPath(topic: String, partitionId: String): String ={ + getTopicPartPath(topic, partitionId) + "/" + "replicas" + } + + def getTopicPartInSyncPath(topic: String, partitionId: String): String ={ + getTopicPartPath(topic, partitionId) + "/" + "isr" + } + + def getTopicPartLeaderPath(topic: String, partitionId: String): String ={ + getTopicPartPath(topic, partitionId) + "/" + "leader" + } + + def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={ + ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted + } + /** * make sure a persistent path exists in ZK. Create the path if not exist. */ @@ -94,6 +126,21 @@ } /** + * Create an persistent node with the given path and data. Create parents if necessary. + */ + def createPersistentPath(client: ZkClient, path: String, data: String): Unit = { + try { + client.createPersistent(path, data) + } + catch { + case e: ZkNoNodeException => { + createParentPath(client, path) + client.createPersistent(path, data) + } + } + } + + /** * Update the value of a persistent node with the given path and data. * create parrent directory if necessary. Never throw NodeExistException. */ Index: bin/kafka-delete-topic.sh =================================================================== --- bin/kafka-delete-topic.sh (revision 0) +++ bin/kafka-delete-topic.sh (revision 0) @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) +export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" +$base_dir/kafka-run-class.sh kafka.admin.DeleteTopicCommand $@ Property changes on: bin/kafka-delete-topic.sh ___________________________________________________________________ Added: svn:executable + * Index: bin/kafka-create-topic.sh =================================================================== --- bin/kafka-create-topic.sh (revision 0) +++ bin/kafka-create-topic.sh (revision 0) @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) +export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" +$base_dir/kafka-run-class.sh kafka.admin.CreateTopicCommand $@ Property changes on: bin/kafka-create-topic.sh ___________________________________________________________________ Added: svn:executable + * Index: bin/kafka-list-topic.sh =================================================================== --- bin/kafka-list-topic.sh (revision 0) +++ bin/kafka-list-topic.sh (revision 0) @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) +export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" +$base_dir/kafka-run-class.sh kafka.admin.ListTopicCommand $@ Property changes on: bin/kafka-list-topic.sh ___________________________________________________________________ Added: svn:executable + *