diff --git a/bin/kafka-create-topic.sh b/bin/kafka-create-topic.sh deleted file mode 100755 index fccda7b..0000000 --- a/bin/kafka-create-topic.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -base_dir=$(dirname $0) -export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" -$base_dir/kafka-run-class.sh kafka.admin.CreateTopicCommand $@ diff --git a/bin/kafka-delete-topic.sh b/bin/kafka-delete-topic.sh deleted file mode 100755 index f266ae3..0000000 --- a/bin/kafka-delete-topic.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -base_dir=$(dirname $0) -export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" -$base_dir/kafka-run-class.sh kafka.admin.DeleteTopicCommand $@ diff --git a/bin/kafka-list-topic.sh b/bin/kafka-list-topic.sh deleted file mode 100755 index 1235ad0..0000000 --- a/bin/kafka-list-topic.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -base_dir=$(dirname $0) -export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" -$base_dir/kafka-run-class.sh kafka.admin.ListTopicCommand $@ diff --git a/bin/kafka-topics.sh b/bin/kafka-topics.sh new file mode 100755 index 0000000..b3195ee --- /dev/null +++ b/bin/kafka-topics.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) +export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" +$base_dir/kafka-run-class.sh kafka.admin.TopicCommand $@ diff --git a/core/src/main/scala/kafka/admin/AdminOperationException.scala b/core/src/main/scala/kafka/admin/AdminOperationException.scala new file mode 100644 index 0000000..a45b3f7 --- /dev/null +++ b/core/src/main/scala/kafka/admin/AdminOperationException.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +class AdminOperationException(val error: String, cause: Throwable) extends RuntimeException(error, cause) { + def this(error: Throwable) = this(error.getMessage, error) + def this(msg: String) = this(msg, null) +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 437a685..b7b2ffe 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -18,11 +18,15 @@ package kafka.admin import java.util.Random +import java.util.Properties +import java.io.{StringReader, StringWriter} import kafka.api.{TopicMetadata, PartitionMetadata} import kafka.cluster.Broker +import kafka.log.LogConfig +import kafka.server.TopicConfigManager import kafka.utils.{Logging, Utils, ZkUtils} import org.I0Itec.zkclient.ZkClient -import org.I0Itec.zkclient.exception.ZkNodeExistsException +import org.I0Itec.zkclient.exception._ import scala.collection._ import scala.collection.mutable import kafka.common._ @@ -30,7 +34,7 @@ import scala.Some object AdminUtils extends Logging { val rand = new Random - val AdminEpoch = -1 + val TopicConfigChangeZnodePrefix = "config_change_" /** * There are 2 goals of replica assignment: @@ -50,43 +54,32 @@ object AdminUtils extends Logging { * p3 p4 p0 p1 p2 (3nd replica) * p7 p8 p9 p5 p6 (3nd replica) */ - def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int, - fixedStartIndex: Int = -1) // for testing only - : Map[Int, Seq[String]] = { - if (nPartitions <= 0) - throw new AdministrationException("number of partitions must be larger than 0") + def assignReplicasToBrokers(brokerList: Seq[Int], + partitions: Int, + replicationFactor: Int, + fixedStartIndex: Int = -1 /* for testing */): Map[Int, Seq[Int]] = { + if (partitions <= 0) + throw new AdminOperationException("Number of partitions must be larger than 0") if (replicationFactor <= 0) - throw new AdministrationException("replication factor must be larger than 0") + throw new AdminOperationException("Replication factor must be larger than 0") if (replicationFactor > brokerList.size) - throw new AdministrationException("replication factor: " + replicationFactor + - " larger than available brokers: " + brokerList.size) - val ret = new mutable.HashMap[Int, List[String]]() + throw new AdminOperationException("Replication factor " + replicationFactor + + " larger than the number of available brokers (" + brokerList.size + ")") + val ret = new mutable.HashMap[Int, List[Int]]() val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) var secondReplicaShift = -1 - for (i <- 0 until nPartitions) { + for (i <- 0 until partitions) { if (i % brokerList.size == 0) secondReplicaShift += 1 val firstReplicaIndex = (i + startIndex) % brokerList.size var replicaList = List(brokerList(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) - replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) + replicaList ::= brokerList(replicaIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) ret.put(i, replicaList.reverse) } ret.toMap } - - def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[String]], zkClient: ZkClient) { - try { - val zkPath = ZkUtils.getTopicPath(topic) - val jsonPartitionMap = Utils.mapToJson(replicaAssignment.map(e => (e._1.toString -> e._2))) - ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionMap) - debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) - } catch { - case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) - case e2 => throw new AdministrationException(e2.toString) - } - } def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata = fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker]) @@ -96,37 +89,107 @@ object AdminUtils extends Logging { topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo)) } + def deleteTopic(zkClient: ZkClient, topic: String) { + zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) + } + + def topicExists(zkClient: ZkClient, topic: String): Boolean = + zkClient.exists(ZkUtils.getTopicPath(topic)) + + def createTopic(zkClient: ZkClient, + topic: String, + partitions: Int, + replicationFactor: Int, + topicConfig: Properties = new Properties) { + val brokerList = ZkUtils.getSortedBrokerList(zkClient) + val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) + AdminUtils.createTopicWithAssignment(zkClient, topic, replicaAssignment, topicConfig) + } + + def createTopicWithAssignment(zkClient: ZkClient, + topic: String, + partitionReplicaAssignment: Map[Int, Seq[Int]], + config: Properties = new Properties) { + // validate arguments + Topic.validate(topic) + LogConfig.validate(config) + require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.") + + val topicPath = ZkUtils.getTopicPath(topic) + if(zkClient.exists(topicPath)) + throw new TopicExistsException("Topic \"%s\" already exists.".format(topic)) + partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment)) + + // write out the config is there is any, this isn't transactional with the partition assignments + if(config.size > 0) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Utils.asString(config)) + + // create the partition assignment + val json = Utils.mapToJson(partitionReplicaAssignment.map(e => (e._1.toString -> e._2.map(_.toString)))) + try { + ZkUtils.createPersistentPath(zkClient, topicPath, json) + } catch { + case e: ZkNodeExistsException => throw new TopicExistsException("Topic \"%s\" already exists.".format(topic)) + } + } + + /** + * Update the config for an existing topic and create a change notification so the change will propagate to other brokers + */ + def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { + LogConfig.validate(config) + if(!topicExists(zkClient, topic)) + throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) + + // write the new config--may not exist if there were previously no overrides + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Utils.asString(config)) + + // create the change notification + zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, topic) + } + + def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = { + val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true) + val props = new Properties() + if(str != null) + props.load(new StringReader(str)) + props + } + + def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] = + ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap + private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = { if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) { val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) val partitionMetadata = sortedPartitions.map { partitionMap => - val partition = partitionMap._1 - val replicas = partitionMap._2 - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition) - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) - debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) + val partition = partitionMap._1 + val replicas = partitionMap._2 + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition) + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) + debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) - var leaderInfo: Option[Broker] = None - var replicaInfo: Seq[Broker] = Nil - var isrInfo: Seq[Broker] = Nil - try { + var leaderInfo: Option[Broker] = None + var replicaInfo: Seq[Broker] = Nil + var isrInfo: Seq[Broker] = Nil try { - leaderInfo = leader match { - case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) - case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) + try { + leaderInfo = leader match { + case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) + case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) + } + } catch { + case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition)) } - } catch { - case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition)) - } - try { - replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) - isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) - } catch { - case e => throw new ReplicaNotAvailableException(e) - } + try { + replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) + isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) + } catch { + case e => throw new ReplicaNotAvailableException(e) + } new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { @@ -163,12 +226,9 @@ object AdminUtils extends Logging { } } - private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { + private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) (firstReplicaIndex + shift) % nBrokers } } -class AdministrationException(val errorMessage: String) extends RuntimeException(errorMessage) { - def this() = this(null) -} diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala deleted file mode 100644 index fd3a397..0000000 --- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin - -import joptsimple.OptionParser -import kafka.utils._ -import org.I0Itec.zkclient.ZkClient -import scala.collection.mutable -import kafka.common.Topic - -object CreateTopicCommand extends Logging { - - def main(args: Array[String]): Unit = { - val parser = new OptionParser - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to be created.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - val nPartitionsOpt = parser.accepts("partition", "number of partitions in the topic") - .withRequiredArg - .describedAs("# of partitions") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val replicationFactorOpt = parser.accepts("replica", "replication factor for each partitions in the topic") - .withRequiredArg - .describedAs("replication factor") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manually assigning replicas to brokers") - .withRequiredArg - .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " + - "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...") - .ofType(classOf[String]) - .defaultsTo("") - - val options = parser.parse(args : _*) - - for(arg <- List(topicOpt, zkConnectOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val topic = options.valueOf(topicOpt) - val zkConnect = options.valueOf(zkConnectOpt) - val nPartitions = options.valueOf(nPartitionsOpt).intValue - val replicationFactor = options.valueOf(replicationFactorOpt).intValue - val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) - var zkClient: ZkClient = null - try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr) - println("creation succeeded!") - } catch { - case e => - println("creation failed because of " + e.getMessage) - println(Utils.stackTrace(e)) - } finally { - if (zkClient != null) - zkClient.close() - } - } - - def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") { - Topic.validate(topic) - - val brokerList = ZkUtils.getSortedBrokerList(zkClient) - - val partitionReplicaAssignment = if (replicaAssignmentStr == "") - AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor) - else - getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) - debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment)) - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient) - } - - def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Map[Int, List[String]] = { - val partitionList = replicaAssignmentList.split(",") - val ret = new mutable.HashMap[Int, List[String]]() - for (i <- 0 until partitionList.size) { - val brokerList = partitionList(i).split(":").map(s => s.trim()) - if (brokerList.size <= 0) - throw new AdministrationException("replication factor must be larger than 0") - if (brokerList.size != brokerList.toSet.size) - throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList) - if (!brokerList.toSet.subsetOf(availableBrokerList)) - throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString + - "available broker:" + availableBrokerList.toString) - ret.put(i, brokerList.toList) - if (ret(i).size != ret(0).size) - throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList) - } - ret.toMap - } -} diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index a2a4ff3..889617c 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -85,7 +85,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt TopicAndPartition(topic, partition) } - case None => throw new AdministrationException("Preferred replica election data is empty") + case None => throw new AdminOperationException("Preferred replica election data is empty") } } @@ -102,9 +102,9 @@ object PreferredReplicaLeaderElectionCommand extends Logging { case nee: ZkNodeExistsException => val partitionsUndergoingPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1) - throw new AdministrationException("Preferred replica leader election currently in progress for " + + throw new AdminOperationException("Preferred replica leader election currently in progress for " + "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)) - case e2 => throw new AdministrationException(e2.toString) + case e2 => throw new AdminOperationException(e2.toString) } } } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala new file mode 100644 index 0000000..2c86c8e --- /dev/null +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple._ +import java.util.Properties +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import scala.collection._ +import scala.collection.JavaConversions._ +import kafka.common.Topic +import kafka.cluster.Broker + +object TopicCommand { + + def main(args: Array[String]): Unit = { + + val opts = new TopicCommandOptions(args) + + // should have exactly one action + val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _) + if(actions != 1) { + System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter") + opts.parser.printHelpOn(System.err) + System.exit(1) + } + + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) + + val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + + if(opts.options.has(opts.createOpt)) + createTopic(zkClient, opts) + else if(opts.options.has(opts.alterOpt)) + alterTopic(zkClient, opts) + else if(opts.options.has(opts.deleteOpt)) + deleteTopic(zkClient, opts) + else if(opts.options.has(opts.listOpt)) + listTopics(zkClient) + else if(opts.options.has(opts.describeOpt)) + describeTopic(zkClient, opts) + + zkClient.close() + } + + def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + val topics = opts.options.valuesOf(opts.topicOpt) + val configs = parseTopicConfigs(opts) + for (topic <- topics) { + if (opts.options.has(opts.replicaAssignmentOpt)) { + val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) + AdminUtils.createTopicWithAssignment(zkClient, topic, assignment, configs) + } else { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) + val partitions = opts.options.valueOf(opts.partitionsOpt).intValue + val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue + AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs) + } + println("Created topic \"%s\".".format(topic)) + } + } + + def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + val topics = opts.options.valuesOf(opts.topicOpt) + val configs = parseTopicConfigs(opts) + if(opts.options.has(opts.partitionsOpt)) + Utils.croak("Changing the number of partitions is not supported.") + if(opts.options.has(opts.replicationFactorOpt)) + Utils.croak("Changing the replication factor is not supported.") + for(topic <- topics) { + AdminUtils.changeTopicConfig(zkClient, topic, configs) + println("Updated config for topic \"%s\".".format(topic)) + } + } + + def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + for(topic <- opts.options.valuesOf(opts.topicOpt)) { + AdminUtils.deleteTopic(zkClient, topic) + println("Topic \"%s\" deleted.".format(topic)) + } + } + + def listTopics(zkClient: ZkClient) { + for(topic <- ZkUtils.getAllTopics(zkClient).sorted) + println(topic) + } + + def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + val topics = opts.options.valuesOf(opts.topicOpt) + val metadata = AdminUtils.fetchTopicMetadataFromZk(topics.toSet, zkClient) + for(md <- metadata) { + println(md.topic) + val config = AdminUtils.fetchTopicConfig(zkClient, md.topic) + println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", ")) + println("\tpartitions: " + md.partitionsMetadata.size) + for(pd <- md.partitionsMetadata) { + println("\t\tpartition " + pd.partitionId) + println("\t\tleader: " + (if(pd.leader.isDefined) formatBroker(pd.leader.get) else "none")) + println("\t\treplicas: " + pd.replicas.map(formatBroker).mkString(", ")) + println("\t\tisr: " + pd.isr.map(formatBroker).mkString(", ")) + } + } + } + + def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")" + + def parseTopicConfigs(opts: TopicCommandOptions): Properties = { + val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*")) + require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".") + val props = new Properties + configs.foreach(pair => props.setProperty(pair(0), pair(1))) + props + } + + def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = { + val partitionList = replicaAssignmentList.split(",") + val ret = new mutable.HashMap[Int, List[Int]]() + for (i <- 0 until partitionList.size) { + val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + ret.put(i, brokerList.toList) + if (ret(i).size != ret(0).size) + throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList) + } + ret.toMap + } + + class TopicCommandOptions(args: Array[String]) { + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val listOpt = parser.accepts("list", "List all available topics.") + val createOpt = parser.accepts("create", "Create a new topic.") + val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.") + val deleteOpt = parser.accepts("delete", "Delete the topic.") + val describeOpt = parser.accepts("describe", "List details for the given topics.") + val helpOpt = parser.accepts("help", "Print usage information.") + val topicOpt = parser.accepts("topic", "The topic to be created.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val configOpt = parser.accepts("config", "A topic configuration for this topic.") + .withRequiredArg + .describedAs("name=value") + .ofType(classOf[String]) + val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic.") + .withRequiredArg + .describedAs("# of partitions") + .ofType(classOf[java.lang.Integer]) + val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic.") + .withRequiredArg + .describedAs("replication factor") + .ofType(classOf[java.lang.Integer]) + val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments.") + .withRequiredArg + .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " + + "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...") + .ofType(classOf[String]) + + + val options = parser.parse(args : _*) + } + +} diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala index 4e1313e..b9cf850 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -24,14 +24,9 @@ import kafka.common.TopicAndPartition import kafka.utils.Logging object OffsetCommitResponse extends Logging { - val CurrentVersion: Short = 0 - val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetCommitResponse = { - // Read values from the envelope - val versionId = buffer.getShort val correlationId = buffer.getInt - val clientId = readShortString(buffer) // Read the OffsetResponse val topicCount = buffer.getInt @@ -44,23 +39,18 @@ object OffsetCommitResponse extends Logging { (TopicAndPartition(topic, partitionId), error) }) }) - OffsetCommitResponse(Map(pairs:_*), versionId, correlationId, clientId) + OffsetCommitResponse(Map(pairs:_*), correlationId) } } case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], - versionId: Short = OffsetCommitResponse.CurrentVersion, - correlationId: Int = 0, - clientId: String = OffsetCommitResponse.DefaultClientId) + correlationId: Int = 0) extends RequestOrResponse { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) def writeTo(buffer: ByteBuffer) { - // Write envelope - buffer.putShort(versionId) buffer.putInt(correlationId) - writeShortString(buffer, clientId) // Write OffsetCommitResponse buffer.putInt(requestInfoGroupedByTopic.size) // number of topics @@ -75,9 +65,7 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], } override def sizeInBytes = - 2 + /* versionId */ 4 + /* correlationId */ - shortStringLength(clientId) + 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { val (topic, offsets) = topicAndOffsets diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala index fb5e6cb..bfa6295 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala @@ -24,14 +24,10 @@ import kafka.common.{TopicAndPartition, OffsetMetadataAndError} import kafka.utils.Logging object OffsetFetchResponse extends Logging { - val CurrentVersion: Short = 0 - val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetFetchResponse = { // Read values from the envelope - val versionId = buffer.getShort val correlationId = buffer.getInt - val clientId = readShortString(buffer) // Read the OffsetResponse val topicCount = buffer.getInt @@ -46,23 +42,18 @@ object OffsetFetchResponse extends Logging { (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error)) }) }) - OffsetFetchResponse(Map(pairs:_*), versionId, correlationId, clientId) + OffsetFetchResponse(Map(pairs:_*), correlationId) } } case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError], - versionId: Short = OffsetFetchResponse.CurrentVersion, - correlationId: Int = 0, - clientId: String = OffsetFetchResponse.DefaultClientId) + correlationId: Int = 0) extends RequestOrResponse { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) def writeTo(buffer: ByteBuffer) { - // Write envelope - buffer.putShort(versionId) buffer.putInt(correlationId) - writeShortString(buffer, clientId) // Write OffsetFetchResponse buffer.putInt(requestInfoGroupedByTopic.size) // number of topics @@ -79,9 +70,7 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat } override def sizeInBytes = - 2 + /* versionId */ 4 + /* correlationId */ - shortStringLength(clientId) + 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { val (topic, offsets) = topicAndOffsets diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index 89ce92a..26cdc58 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -29,6 +29,7 @@ object RequestKeys { val StopReplicaKey: Short = 5 val OffsetCommitKey: Short = 6 val OffsetFetchKey: Short = 7 + val ModifyTopicKey: Short = 8 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -38,7 +39,8 @@ object RequestKeys { LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), - OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)) + OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), + ModifyTopicKey -> ("ModifyTopic", null)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index af80631..dd3eb1a 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -17,9 +17,11 @@ package kafka.cluster import scala.collection._ +import kafka.admin.AdminUtils import kafka.utils._ import java.lang.Object import kafka.api.LeaderAndIsr +import kafka.log.LogConfig import kafka.server.ReplicaManager import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup @@ -74,7 +76,8 @@ class Partition(val topic: String, case Some(replica) => replica case None => if (isReplicaLocal(replicaId)) { - val log = logManager.getOrCreateLog(topic, partitionId) + val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) + val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) val offset = checkpoint.read.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset) val localReplica = new Replica(replicaId, this, time, offset, Some(log)) diff --git a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala deleted file mode 100644 index bace1d2..0000000 --- a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.common - -import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, ZKConfig} -import java.util.concurrent.atomic.AtomicReference - -object KafkaZookeeperClient { - private val INSTANCE = new AtomicReference[ZkClient](null) - - def getZookeeperClient(config: ZKConfig): ZkClient = { - // TODO: This cannot be a singleton since unit tests break if we do that -// INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, -// ZKStringSerializer)) - INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer)) - INSTANCE.get() - } -} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index ac12b74..f48cde3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -49,7 +49,7 @@ import com.yammer.metrics.core.Gauge */ @threadsafe class Log(val dir: File, - val config: LogConfig, + @volatile var config: LogConfig, val needsRecovery: Boolean, val scheduler: Scheduler, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 5a10bef..dc42a74 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -17,7 +17,7 @@ package kafka.log -import java.io.File +import java.util.Properties import scala.collection._ import kafka.common._ @@ -46,6 +46,99 @@ case class LogConfig(val segmentSize: Int = 1024*1024, val indexInterval: Int = 4096, val fileDeleteDelayMs: Long = 60*1000, val minCleanableRatio: Double = 0.5, - val dedupe: Boolean = false) + val dedupe: Boolean = false) { + + def toProps: Properties = { + val props = new Properties() + import LogConfig._ + props.put(SegmentBytesProp, segmentSize.toString) + props.put(SegmentMsProp, segmentMs.toString) + props.put(SegmentIndexBytesProp, maxIndexSize.toString) + props.put(FlushMessagesProp, flushInterval.toString) + props.put(FlushMsProp, flushMs.toString) + props.put(RetentionBytesProp, retentionSize.toString) + props.put(RententionMsProp, retentionMs.toString) + props.put(MaxMessageBytesProp, maxMessageSize.toString) + props.put(IndexIntervalBytesProp, indexInterval.toString) + props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) + props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) + props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete") + props + } + +} + +object LogConfig { + val SegmentBytesProp = "segment.bytes" + val SegmentMsProp = "segment.ms" + val SegmentIndexBytesProp = "segment.index.bytes" + val FlushMessagesProp = "flush.messages" + val FlushMsProp = "flush.ms" + val RetentionBytesProp = "retention.bytes" + val RententionMsProp = "retention.ms" + val MaxMessageBytesProp = "max.message.bytes" + val IndexIntervalBytesProp = "index.interval.bytes" + val FileDeleteDelayMsProp = "file.delete.delay.ms" + val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" + val CleanupPolicyProp = "cleanup.policy" + + val ConfigNames = Set(SegmentBytesProp, + SegmentMsProp, + SegmentIndexBytesProp, + FlushMessagesProp, + FlushMsProp, + RetentionBytesProp, + RententionMsProp, + MaxMessageBytesProp, + IndexIntervalBytesProp, + FileDeleteDelayMsProp, + MinCleanableDirtyRatioProp, + CleanupPolicyProp) + + + /** + * Parse the given properties instance into a LogConfig object + */ + def fromProps(props: Properties): LogConfig = { + new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt, + segmentMs = props.getProperty(SegmentMsProp).toLong, + maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt, + flushInterval = props.getProperty(FlushMessagesProp).toLong, + flushMs = props.getProperty(FlushMsProp).toLong, + retentionSize = props.getProperty(RetentionBytesProp).toLong, + retentionMs = props.getProperty(RententionMsProp).toLong, + maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt, + indexInterval = props.getProperty(IndexIntervalBytesProp).toInt, + fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt, + minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble, + dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe") + } + + /** + * Create a log config instance using the given properties and defaults + */ + def fromProps(defaults: Properties, overrides: Properties): LogConfig = { + val props = new Properties(defaults) + props.putAll(overrides) + fromProps(props) + } + + /** + * Check that property names are valid + */ + private def validateNames(props: Properties) { + for(name <- JavaConversions.asMap(props).keys) + require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name)) + } + + /** + * Check that the given properties contain only valid log config names, and that all values can be parsed. + */ + def validate(props: Properties) { + validateNames(props) + LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 438d802..0d567e4 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -174,9 +174,8 @@ class LogManager(val logDirs: Array[File], /** * Get the log if it exists, otherwise return None */ - def getLog(topic: String, partition: Int): Option[Log] = { - val topicAndPartiton = TopicAndPartition(topic, partition) - val log = logs.get(topicAndPartiton) + def getLog(topicAndPartition: TopicAndPartition): Option[Log] = { + val log = logs.get(topicAndPartition) if (log == null) None else @@ -184,21 +183,10 @@ class LogManager(val logDirs: Array[File], } /** - * Create the log if it does not exist, otherwise just return it - */ - def getOrCreateLog(topic: String, partition: Int): Log = { - val topicAndPartition = TopicAndPartition(topic, partition) - logs.get(topicAndPartition) match { - case null => createLogIfNotExists(topicAndPartition) - case log: Log => log - } - } - - /** * Create a log for the given topic and the given partition * If the log already exists, just return a copy of the existing log */ - private def createLogIfNotExists(topicAndPartition: TopicAndPartition): Log = { + def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = { logCreationLock synchronized { var log = logs.get(topicAndPartition) @@ -211,12 +199,16 @@ class LogManager(val logDirs: Array[File], val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition) dir.mkdirs() log = new Log(dir, - defaultConfig, + config, needsRecovery = false, scheduler, time) - info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath)) logs.put(topicAndPartition, log) + info("Created log for topic %s partition %d in %s with properties {%s}." + .format(topicAndPartition.topic, + topicAndPartition.partition, + dataDir.getAbsolutePath, + JavaConversions.asMap(config.toProps).mkString(", "))) log } } @@ -289,6 +281,11 @@ class LogManager(val logDirs: Array[File], * Get all the partition logs */ def allLogs(): Iterable[Log] = logs.values + + /** + * Get a map of TopicAndPartition => Log + */ + def logsByTopicPartition = logs.toMap /** * Flush any log which has exceeded its flush interval and has unwritten messages. diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 9b0f7e9..8d71496 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -116,8 +116,9 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } /** Get the next request or block until there is one */ - def receiveRequest(): RequestChannel.Request = + def receiveRequest(): RequestChannel.Request = { requestQueue.take() + } /** Get a response for the given processor if there is one */ def receiveResponse(processor: Int): RequestChannel.Response = diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b056e25..f4f2a79 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -268,7 +268,7 @@ private[kafka] class Processor(val id: Int, debug("Ignoring response for closed socket.") close(key) } - }finally { + } finally { curr = requestChannel.receiveResponse(id) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9b97ca6..32bab9c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.admin.{CreateTopicCommand, AdminUtils} +import kafka.admin.AdminUtils import kafka.api._ import kafka.message._ import kafka.network._ @@ -26,6 +26,7 @@ import kafka.utils.ZKGroupTopicDirs import org.apache.log4j.Logger import scala.collection._ import kafka.network.RequestChannel.Response +import java.util.Properties import java.util.concurrent.TimeUnit import java.util.concurrent.atomic._ import kafka.metrics.KafkaMetricsGroup @@ -362,7 +363,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { - logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match { + logManager.getLog(topicAndPartition) match { case Some(log) => fetchOffsetsBefore(log, timestamp, maxNumOffsets) case None => @@ -437,7 +438,7 @@ class KafkaApis(val requestChannel: RequestChannel, /* check if auto creation of topics is turned on */ if (config.autoCreateTopicsEnable) { try { - CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor) + AdminUtils.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)) } catch { @@ -472,26 +473,24 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling offset commit request " + offsetCommitRequest.toString) - trace("Handling offset commit request " + offsetCommitRequest.toString) val responseInfo = offsetCommitRequest.requestInfo.map( t => { - val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic) + val (topicAndPartition, metadataAndError) = t + val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic) try { - if(t._2.metadata.length > config.offsetMetadataMaxSize) { - (t._1, ErrorMapping.OffsetMetadataTooLargeCode) + if(metadataAndError.metadata.length > config.offsetMetadataMaxSize) { + (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) } else { ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + - t._1.partition, t._2.offset.toString) - (t._1, ErrorMapping.NoError) + topicAndPartition.partition, metadataAndError.offset.toString) + (topicAndPartition, ErrorMapping.NoError) } } catch { case e => - (t._1, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } }) - val response = new OffsetCommitResponse(responseInfo, - offsetCommitRequest.versionId, - offsetCommitRequest.correlationId, - offsetCommitRequest.clientId) + val response = new OffsetCommitResponse(responseInfo, + offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -502,7 +501,6 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling offset fetch request " + offsetFetchRequest.toString) - trace("Handling offset fetch request " + offsetFetchRequest.toString) val responseInfo = offsetFetchRequest.requestInfo.map( t => { val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic) try { @@ -521,9 +519,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), - offsetFetchRequest.versionId, - offsetFetchRequest.correlationId, - offsetFetchRequest.clientId) + offsetFetchRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala new file mode 100644 index 0000000..a078707 --- /dev/null +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils._ +import org.apache.zookeeper.Watcher.Event.KeeperState +import org.I0Itec.zkclient.{IZkStateListener, ZkClient} +import kafka.common._ +import java.net.InetAddress + + +/** + * This class registers the broker in zookeeper to allow + * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: + * /brokers/[0...N] --> host:port + * + * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise + * we are dead. + */ +class KafkaHealthcheck(private val brokerId: Int, + private val host: String, + private val port: Int, + private val zkClient: ZkClient) extends Logging { + + val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId + + def startup() { + zkClient.subscribeStateChanges(new SessionExpireListener) + register() + } + + /** + * Register this broker as "alive" in zookeeper + */ + def register() { + val hostName = + if(host == null || host.trim.isEmpty) + InetAddress.getLocalHost.getCanonicalHostName + else + host + val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt + ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, jmxPort) + } + + /** + * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a + * connection for us. We need to re-register this broker in the broker registry. + */ + class SessionExpireListener() extends IZkStateListener { + @throws(classOf[Exception]) + def handleStateChanged(state: KeeperState) { + // do nothing, since zkclient will do reconnect for us. + } + + /** + * Called after the zookeeper session has expired and a new session has been created. You would have to re-create + * any ephemeral nodes here. + * + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleNewSession() { + info("re-registering broker info in ZK for broker " + brokerId) + register() + info("done re-registering broker") + info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) + } + } + +} diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index f0c05a5..2504157 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -67,7 +67,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, handler.shutdown for(thread <- threads) thread.join - info("shutted down completely") + info("shut down completely") } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index da6f716..08359c7 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.network.SocketServer +import kafka.admin._ import kafka.log.LogConfig import kafka.log.CleanerConfig import kafka.log.LogManager @@ -39,7 +40,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null - var kafkaZookeeper: KafkaZooKeeper = null + var kafkaHealthcheck: KafkaHealthcheck = null + var topicConfigManager: TopicConfigManager = null var replicaManager: ReplicaManager = null var apis: KafkaApis = null var kafkaController: KafkaController = null @@ -57,9 +59,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start scheduler */ kafkaScheduler.startup() + + /* setup zookeeper */ + zkClient = initZk() /* start log manager */ - logManager = createLogManager(config) + logManager = createLogManager(zkClient) logManager.startup() socketServer = new SocketServer(config.brokerId, @@ -68,31 +73,39 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.numNetworkThreads, config.queuedMaxRequests, config.socketRequestMaxBytes) + socketServer.startup() - socketServer.startup + /* tell everyone we are alive */ + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, zkClient) + kafkaHealthcheck.startup() - /* start client */ - kafkaZookeeper = new KafkaZooKeeper(config) - // starting relevant replicas and leader election for partitions assigned to this broker - kafkaZookeeper.startup - - info("Connecting to ZK: " + config.zkConnect) - - replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager) - - kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient) - apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, config) + replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager) + kafkaController = new KafkaController(config, zkClient) + + /* start processing requests */ + apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) - Mx4jLoader.maybeLoad + + Mx4jLoader.maybeLoad() - // start the replica manager replicaManager.startup() - // start the controller + kafkaController.startup() - // register metrics beans + + topicConfigManager = new TopicConfigManager(zkClient, logManager) + topicConfigManager.startup() + registerStats() + info("started") } + + private def initZk(): ZkClient = { + info("Connecting to zookeeper on " + config.zkConnect) + val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + ZkUtils.setupCommonPaths(zkClient) + zkClient + } /** * Forces some dynamic jmx beans to be registered on server startup. @@ -116,15 +129,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(kafkaScheduler.shutdown()) if(apis != null) Utils.swallow(apis.close()) - if(kafkaZookeeper != null) - Utils.swallow(kafkaZookeeper.shutdown()) if(replicaManager != null) Utils.swallow(replicaManager.shutdown()) + if(zkClient != null) + Utils.swallow(zkClient.close()) if(socketServer != null) Utils.swallow(socketServer.shutdown()) if(logManager != null) Utils.swallow(logManager.shutdown()) - + if(kafkaController != null) Utils.swallow(kafkaController.shutdown()) @@ -140,13 +153,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def getLogManager(): LogManager = logManager - private def createLogManager(config: KafkaConfig): LogManager = { - val topics = config.logCleanupPolicyMap.keys ++ - config.logSegmentBytesPerTopicMap.keys ++ - config.logFlushIntervalMsPerTopicMap.keys ++ - config.logRollHoursPerTopicMap.keys ++ - config.logRetentionBytesPerTopicMap.keys ++ - config.logRetentionHoursPerTopicMap.keys + private def createLogManager(zkClient: ZkClient): LogManager = { val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = 60 * 60 * 1000 * config.logRollHours, flushInterval = config.logFlushIntervalMessages, @@ -159,13 +166,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe") - val logConfigs = for(topic <- topics) yield - topic -> defaultLogConfig.copy(segmentSize = config.logSegmentBytesPerTopicMap.getOrElse(topic, config.logSegmentBytes), - segmentMs = 60 * 60 * 1000 * config.logRollHoursPerTopicMap.getOrElse(topic, config.logRollHours), - flushMs = config.logFlushIntervalMsPerTopicMap.getOrElse(topic, config.logFlushIntervalMs).toLong, - retentionSize = config.logRetentionBytesPerTopicMap.getOrElse(topic, config.logRetentionBytes), - retentionMs = 60 * 60 * 1000 * config.logRetentionHoursPerTopicMap.getOrElse(topic, config.logRetentionHours), - dedupe = config.logCleanupPolicyMap.getOrElse(topic, config.logCleanupPolicy).trim.toLowerCase == "dedupe") + val defaultProps = defaultLogConfig.toProps + val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) + // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, ioBufferSize = config.logCleanerIoBufferSize, @@ -174,7 +177,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg backOffMs = config.logCleanerBackoffMs, enableCleaner = config.logCleanerEnable) new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, - topicConfigs = logConfigs.toMap, + topicConfigs = configs, defaultConfig = defaultLogConfig, cleanerConfig = cleanerConfig, flushCheckMs = config.logFlushSchedulerIntervalMs, diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala deleted file mode 100644 index 0e6c656..0000000 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.utils._ -import org.apache.zookeeper.Watcher.Event.KeeperState -import org.I0Itec.zkclient.{IZkStateListener, ZkClient} -import kafka.common._ -import java.net.InetAddress - - -/** - * Handles registering broker with zookeeper in the following path: - * /brokers/[0...N] --> host:port - */ -class KafkaZooKeeper(config: KafkaConfig) extends Logging { - - val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId - private var zkClient: ZkClient = null - - def startup() { - /* start client */ - info("connecting to ZK: " + config.zkConnect) - zkClient = KafkaZookeeperClient.getZookeeperClient(config) - zkClient.subscribeStateChanges(new SessionExpireListener) - registerBrokerInZk() - } - - private def registerBrokerInZk() { - val hostName = - if(config.hostName == null || config.hostName.trim.isEmpty) - InetAddress.getLocalHost.getCanonicalHostName - else - config.hostName - val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, jmxPort) - } - - /** - * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a - * connection for us. We need to re-register this broker in the broker registry. - */ - class SessionExpireListener() extends IZkStateListener { - @throws(classOf[Exception]) - def handleStateChanged(state: KeeperState) { - // do nothing, since zkclient will do reconnect for us. - } - - /** - * Called after the zookeeper session has expired and a new session has been created. You would have to re-create - * any ephemeral nodes here. - * - * @throws Exception - * On any error. - */ - @throws(classOf[Exception]) - def handleNewSession() { - info("re-registering broker info in ZK for broker " + config.brokerId) - registerBrokerInZk() - info("done re-registering broker") - info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) - } - } - - def shutdown() { - if (zkClient != null) { - info("Closing zookeeper client...") - zkClient.close() - } - } - - def getZookeeperClient = { - zkClient - } -} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 710c08b..0ca78ca 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -168,7 +168,7 @@ class ReplicaManager(val config: KafkaConfig, case Some(leaderReplica) => leaderReplica case None => throw new LeaderNotAvailableException("Leader not local for topic %s partition %d on broker %d" - .format(topic, partitionId, config.brokerId)) + .format(topic, partitionId, config.brokerId)) } } } diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala new file mode 100644 index 0000000..e9320c7 --- /dev/null +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.Properties +import scala.collection._ +import kafka.log._ +import kafka.utils._ +import kafka.admin.AdminUtils +import org.I0Itec.zkclient.{IZkChildListener, ZkClient} + +/** + * This class initiates and carries out topic config changes. + * + * It works as follows. + * + * Config is stored under the path + * /brokers/topics/ 0) { + info("Processing %d change notifications...".format(notifications.size)) + val now = time.milliseconds + val logs = logManager.logsByTopicPartition.toBuffer + val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) + val lastChangeId = notifications.map(changeNumber).max + for (notification <- notifications) { + val changeId = changeNumber(notification) + val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification + val (topic, stat) = ZkUtils.readData(zkClient, changeZnode) + if (changeId > lastExecutedChange && logsByTopic.contains(topic)) { + /* combine the default properties with the overrides in zk to create the new LogConfig */ + val props = new Properties(logManager.defaultConfig.toProps) + props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) + val logConfig = LogConfig.fromProps(props) + for (log <- logsByTopic(topic)) + log.config = logConfig + lastExecutedChange = changeId + info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) + } else if (now - stat.getCtime > changeExpirationMs && changeId != lastChangeId) { + /* this change is now obsolete, try to delete it unless it is the last change left */ + val deleted = ZkUtils.deletePath(zkClient, changeZnode) + if (deleted) + debug("Deleted old config change %d for topic %s.".format(changeId, topic)) + } + } + } + } + + /* get the change number from a change notification znode */ + private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong + + /** + * A listener that applies config changes to logs + */ + object ConfigChangeListener extends IZkChildListener { + override def handleChildChange(path: String, chillins: java.util.List[String]) { + try { + processConfigChanges(JavaConversions.asBuffer(chillins)) + } catch { + case e: Exception => error("Error processing config change:", e) + } + } + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala index 21c5d4a..29f1209 100644 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -10,7 +10,7 @@ object CommandLineUtils extends Logging { def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { for(arg <- required) { if(!options.has(arg)) { - error("Missing required argument \"" + arg + "\"") + System.err.println("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) System.exit(1) } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 1c88226..e059e19 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -553,6 +553,25 @@ object Utils extends Logging { } /** + * Turn a properties map into a string + */ + def asString(props: Properties): String = { + val writer = new StringWriter() + props.store(writer, "") + writer.toString + } + + /** + * Read some properties with the given default values + */ + def readProps(s: String, defaults: Properties): Properties = { + val reader = new StringReader(s) + val props = new Properties(defaults) + props.load(reader) + props + } + + /** * Read a big-endian integer from a byte array */ def readInt(bytes: Array[Byte], offset: Int): Int = { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 113ad37..b011dec 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -27,6 +27,8 @@ import kafka.api.LeaderAndIsr import mutable.HashMap import org.apache.zookeeper.data.Stat import java.util.concurrent.locks.{ReentrantLock, Condition} +import java.util.Properties +import java.io.{StringReader, StringWriter} import kafka.admin._ import kafka.common.{TopicAndPartition, KafkaException, NoEpochForPartitionException} import kafka.controller.{LeaderIsrAndControllerEpoch, PartitionAndReplica, ReassignedPartitionsContext} @@ -35,6 +37,8 @@ object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" + val TopicConfigPath = "/config/topics" + val TopicConfigChangesPath = "/config/changes" val ControllerPath = "/controller" val ControllerEpochPath = "/controllerEpoch" val ReassignPartitionsPath = "/admin/reassign_partitions" @@ -48,6 +52,9 @@ object ZkUtils extends Logging { getTopicPath(topic) + "/partitions" } + def getTopicConfigPath(topic: String): String = + TopicConfigPath + "/" + topic + def getController(zkClient: ZkClient): Int= { readDataMaybeNull(zkClient, ControllerPath)._1 match { case Some(controller) => controller.toInt @@ -63,8 +70,8 @@ object ZkUtils extends Logging { getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR" } - def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={ - ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted + def getSortedBrokerList(zkClient: ZkClient): Seq[Int] = { + ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).map(_.toInt).sorted } def getAllLiveBrokerIds(zkClient: ZkClient): Set[Int] = { @@ -90,6 +97,11 @@ object ZkUtils extends Logging { def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } + + def setupCommonPaths(zkClient: ZkClient) { + for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath)) + makeSurePersistentPathExists(zkClient, path) + } def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat) : Option[LeaderIsrAndControllerEpoch] = { @@ -180,7 +192,7 @@ object ZkUtils extends Logging { debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas)) replicas.contains(brokerId.toString) } - + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val broker = new Broker(id, host, port) @@ -306,10 +318,8 @@ object ZkUtils extends Logging { case e: ZkNodeExistsException => stat = client.writeData(path, data) return stat.getVersion - case e2 => throw e2 } } - case e2 => throw e2 } } @@ -611,7 +621,7 @@ object ZkUtils extends Logging { case nne: ZkNoNodeException => ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData)) - case e2 => throw new AdministrationException(e2.toString) + case e2 => throw new AdminOperationException(e2.toString) } } } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 3b61d2a..e6caa45 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -19,6 +19,9 @@ package kafka.admin import junit.framework.Assert._ import org.junit.Test import org.scalatest.junit.JUnit3Suite +import java.util.Properties +import kafka.utils._ +import kafka.log._ import kafka.zk.ZooKeeperTestHarness import kafka.server.KafkaConfig import kafka.utils.{ZkUtils, TestUtils} @@ -29,41 +32,31 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testReplicaAssignment() { - val brokerList = List("0", "1", "2", "3", "4") + val brokerList = List(0, 1, 2, 3, 4) // test 0 replication factor - try { + intercept[AdminOperationException] { AdminUtils.assignReplicasToBrokers(brokerList, 10, 0) - fail("shouldn't allow replication factor 0") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 } // test wrong replication factor - try { + intercept[AdminOperationException] { AdminUtils.assignReplicasToBrokers(brokerList, 10, 6) - fail("shouldn't allow replication factor larger than # of brokers") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 } // correct assignment { val expectedAssignment = Map( - 0 -> List("0", "1", "2"), - 1 -> List("1", "2", "3"), - 2 -> List("2", "3", "4"), - 3 -> List("3", "4", "0"), - 4 -> List("4", "0", "1"), - 5 -> List("0", "2", "3"), - 6 -> List("1", "3", "4"), - 7 -> List("2", "4", "0"), - 8 -> List("3", "0", "1"), - 9 -> List("4", "1", "2") + 0 -> List(0, 1, 2), + 1 -> List(1, 2, 3), + 2 -> List(2, 3, 4), + 3 -> List(3, 4, 0), + 4 -> List(4, 0, 1), + 5 -> List(0, 2, 3), + 6 -> List(1, 3, 4), + 7 -> List(2, 4, 0), + 8 -> List(3, 0, 1), + 9 -> List(4, 1, 2) ) val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0) @@ -74,71 +67,42 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testManualReplicaAssignment() { - val brokerList = Set("0", "1", "2", "3", "4") - - // duplicated brokers - try { - val replicationAssignmentStr = "0,0,1:1,2,3" - CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) - fail("replication assginment shouldn't have duplicated brokers") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 - } + val brokers = List(0, 1, 2, 3, 4) + TestUtils.createBrokersInZk(zkClient, brokers) - // non-exist brokers - try { - val replicationAssignmentStr = "0,1,2:1,2,7" - CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) - fail("replication assginment shouldn't contain non-exist brokers") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 + // duplicate brokers + intercept[IllegalArgumentException] { + AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,0))) } // inconsistent replication factor - try { - val replicationAssignmentStr = "0,1,2:1,2" - CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) - fail("all partitions should have the same replication factor") - } - catch { - case e: AdministrationException => // this is good - case e2 => throw e2 + intercept[IllegalArgumentException] { + AdminUtils.createTopicWithAssignment(zkClient, "test", Map(0->Seq(0,1), 1->Seq(0))) } // good assignment - { - val replicationAssignmentStr = "0:1:2,1:2:3" - val expectedReplicationAssignment = Map( - 0 -> List("0", "1", "2"), - 1 -> List("1", "2", "3") - ) - val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList) - assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size) - for( (part, replicas) <- expectedReplicationAssignment ) { - assertEquals(replicas, actualReplicationAssignment(part)) - } - } + val assignment = Map(0 -> List(0, 1, 2), + 1 -> List(1, 2, 3)) + AdminUtils.createTopicWithAssignment(zkClient, "test", assignment) + val found = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq("test")) + assertEquals(assignment, found("test")) } @Test def testTopicCreationInZK() { val expectedReplicaAssignment = Map( - 0 -> List("0", "1", "2"), - 1 -> List("1", "2", "3"), - 2 -> List("2", "3", "4"), - 3 -> List("3", "4", "0"), - 4 -> List("4", "0", "1"), - 5 -> List("0", "2", "3"), - 6 -> List("1", "3", "4"), - 7 -> List("2", "4", "0"), - 8 -> List("3", "0", "1"), - 9 -> List("4", "1", "2"), - 10 -> List("1", "2", "3"), - 11 -> List("1", "3", "4") + 0 -> List(0, 1, 2), + 1 -> List(1, 2, 3), + 2 -> List(2, 3, 4), + 3 -> List(3, 4, 0), + 4 -> List(4, 0, 1), + 5 -> List(0, 2, 3), + 6 -> List(1, 3, 4), + 7 -> List(2, 4, 0), + 8 -> List(3, 0, 1), + 9 -> List(4, 1, 2), + 10 -> List(1, 2, 3), + 11 -> List(1, 3, 4) ) val leaderForPartitionMap = Map( 0 -> 0, @@ -157,17 +121,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = "test" TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas) - val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList + val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList assertEquals(expectedReplicaAssignment.size, actualReplicaList.size) for(i <- 0 until actualReplicaList.size) assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) try { - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) fail("shouldn't be able to create a topic already exists") } catch { case e: TopicExistsException => // this is good @@ -177,17 +141,17 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testGetTopicMetadata() { + TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3)) val expectedReplicaAssignment = Map( - 0 -> List("0", "1", "2"), - 1 -> List("1", "2", "3") + 0 -> List(0, 1, 2), + 1 -> List(1, 2, 3) ) val leaderForPartitionMap = Map( 0 -> 0, 1 -> 1 ) val topic = "auto-topic" - TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3)) - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) @@ -200,7 +164,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { assertNotNull("partition metadata list cannot be null", newTopicMetadata.partitionsMetadata) assertEquals("partition metadata list length should be 2", 2, newTopicMetadata.partitionsMetadata.size) val actualReplicaAssignment = newTopicMetadata.partitionsMetadata.map(p => p.replicas) - val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList + val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList assertEquals(expectedReplicaAssignment.size, actualReplicaList.size) for(i <- 0 until actualReplicaList.size) { assertEquals(expectedReplicaAssignment(i), actualReplicaList(i)) @@ -210,12 +174,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testPartitionReassignmentWithLeaderInNewReplicas() { - val expectedReplicaAssignment = Map(0 -> List("0", "1", "2")) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 val newReplicas = Seq(0, 2, 3) val partitionToBeReassigned = 0 @@ -235,12 +199,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testPartitionReassignmentWithLeaderNotInNewReplicas() { - val expectedReplicaAssignment = Map(0 -> List("0", "1", "2")) + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 val newReplicas = Seq(1, 2, 3) val partitionToBeReassigned = 0 @@ -261,12 +225,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testPartitionReassignmentNonOverlappingReplicas() { - val expectedReplicaAssignment = Map(0 -> List("0", "1")) + val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -304,10 +268,10 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testResumePartitionReassignmentThatWasCompleted() { - val expectedReplicaAssignment = Map(0 -> List("0", "1")) + val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) // put the partition in the reassigned path as well // reassign partition 0 val newReplicas = Seq(0, 1) @@ -339,14 +303,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testBasicPreferredReplicaElection() { - val expectedReplicaAssignment = Map(1 -> List("0", "1", "2")) + val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) val topic = "test" val partition = 1 val preferredReplica = 0 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get @@ -360,13 +324,13 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testShutdownBroker() { - val expectedReplicaAssignment = Map(1 -> List("0", "1", "2")) + val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) val topic = "test" val partition = 1 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createTopicWithAssignment(zkClient, topic, expectedReplicaAssignment) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first @@ -403,6 +367,50 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { servers.foreach(_.shutdown()) } } + + /** + * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic + * then changes the config and checks that the new values take effect. + */ + @Test + def testTopicConfigChange() { + val partitions = 3 + val topic = "my-topic" + val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0))) + + def makeConfig(messageSize: Int, retentionMs: Long) = { + var props = new Properties() + props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString) + props.setProperty(LogConfig.RententionMsProp, retentionMs.toString) + props + } + + def checkConfig(messageSize: Int, retentionMs: Long) { + TestUtils.retry(10000) { + for(part <- 0 until partitions) { + val logOpt = server.logManager.getLog(TopicAndPartition(topic, part)) + assertTrue(logOpt.isDefined) + assertEquals(retentionMs, logOpt.get.config.retentionMs) + assertEquals(messageSize, logOpt.get.config.maxMessageSize) + } + } + } + + try { + // create a topic with a few config overrides and check that they are applied + val maxMessageSize = 1024 + val retentionMs = 1000*1000 + AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs)) + checkConfig(maxMessageSize, retentionMs) + + // now double the config values for the topic and check that it is applied + AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) + checkConfig(2*maxMessageSize, 2 * retentionMs) + } finally { + server.shutdown() + server.config.logDirs.map(Utils.rm(_)) + } + } private def checkIfReassignPartitionPathExists(): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 8ae30ea..fec17aa 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -20,6 +20,7 @@ package kafka.consumer import java.util.concurrent._ import java.util.concurrent.atomic._ +import java.util.Properties import scala.collection._ import junit.framework.Assert._ @@ -27,7 +28,7 @@ import kafka.message._ import kafka.server._ import kafka.utils.TestUtils._ import kafka.utils._ -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import org.junit.Test import kafka.serializer._ import kafka.cluster.{Broker, Cluster} @@ -60,7 +61,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { override def setUp() { super.setUp - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId)), new Properties) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index f7ee914..c70a435 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -26,7 +26,7 @@ import org.scalatest.junit.JUnit3Suite import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.serializer._ -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import org.I0Itec.zkclient.ZkClient import kafka.utils._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} @@ -298,7 +298,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer) // create topic topic1 with 1 partition on broker 0 - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + AdminUtils.createTopic(zkClient, topic, 1, 1) // send some messages to each broker val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1) diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 4c646f0..c046a39 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -32,8 +32,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L val topic = "test_topic" val group = "default_group" val testConsumer = "consumer" - val BrokerPort = 9892 - val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort))) + val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0))) val NumMessages = 10 val LargeOffset = 10000 val SmallOffset = -1 diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 5a57bd1..845b966 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -30,7 +30,7 @@ import kafka.serializer._ import kafka.producer.{KeyedMessage, Producer} import kafka.utils.TestUtils._ import kafka.utils.TestUtils -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -55,7 +55,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { override def setUp() { super.setUp - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(configs.head.brokerId))) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient) fetcher.stopAllConnections() diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 402fced..ece163a 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -31,7 +31,7 @@ import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ -import kafka.admin.{AdminUtils, CreateTopicCommand} +import kafka.admin.AdminUtils import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} /** @@ -48,7 +48,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with override def setUp() { super.setUp // temporarily set request handler logger to a higher level - requestHandlerLogger.setLevel(Level.FATAL) + //requestHandlerLogger.setLevel(Level.FATAL) } override def tearDown() { @@ -307,7 +307,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testConsumerEmptyTopic() { val newTopic = "new-topic" - CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString) + AdminUtils.createTopic(zkClient, newTopic, 1, 1) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500) @@ -321,7 +321,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with */ def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) { for( topic <- topics ) { - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString) + AdminUtils.createTopic(zkClient, topic, 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) } } diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 5169aea..be94254 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -19,7 +19,7 @@ package kafka.integration import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import java.nio.ByteBuffer import junit.framework.Assert._ import org.easymock.EasyMock @@ -48,7 +48,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testTopicMetadataRequest { // create topic val topic = "test" - CreateTopicCommand.createTopic(zkClient, topic, 1) + AdminUtils.createTopic(zkClient, topic, 1, 1) // create a topic metadata request val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0) @@ -64,7 +64,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testBasicTopicMetadata { // create topic val topic = "test" - CreateTopicCommand.createTopic(zkClient, topic, 1) + AdminUtils.createTopic(zkClient, topic, 1, 1) // set up leader for topic partition 0 val leaderForPartitionMap = Map( 0 -> configs.head.brokerId @@ -83,7 +83,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testGetAllTopicMetadata { // create topic val topic = "test" - CreateTopicCommand.createTopic(zkClient, topic, 1) + AdminUtils.createTopic(zkClient, topic, 1, 1) // set up leader for topic partition 0 val leaderForPartitionMap = Map( 0 -> configs.head.brokerId diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index fad3baa..6916df4 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -58,7 +58,7 @@ class LogManagerTest extends JUnit3Suite { */ @Test def testCreateLog() { - val log = logManager.getOrCreateLog(name, 0) + val log = logManager.createLog(TopicAndPartition(name, 0), logConfig) val logFile = new File(logDir, name + "-0") assertTrue(logFile.exists) log.append(TestUtils.singleMessageSet("test".getBytes())) @@ -69,7 +69,7 @@ class LogManagerTest extends JUnit3Suite { */ @Test def testGetNonExistentLog() { - val log = logManager.getLog(name, 0) + val log = logManager.getLog(TopicAndPartition(name, 0)) assertEquals("No log should be found.", None, log) val logFile = new File(logDir, name + "-0") assertTrue(!logFile.exists) @@ -80,7 +80,7 @@ class LogManagerTest extends JUnit3Suite { */ @Test def testCleanupExpiredSegments() { - val log = logManager.getOrCreateLog(name, 0) + val log = logManager.createLog(TopicAndPartition(name, 0), logConfig) var offset = 0L for(i <- 0 until 200) { var set = TestUtils.singleMessageSet("test".getBytes()) @@ -120,7 +120,7 @@ class LogManagerTest extends JUnit3Suite { logManager.startup // create a log - val log = logManager.getOrCreateLog(name, 0) + val log = logManager.createLog(TopicAndPartition(name, 0), config) var offset = 0L // add a bunch of messages that should be larger than the retentionSize @@ -158,7 +158,7 @@ class LogManagerTest extends JUnit3Suite { val config = logConfig.copy(flushMs = 1000) logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 1000L, time.scheduler, time) logManager.startup - val log = logManager.getOrCreateLog(name, 0) + val log = logManager.createLog(TopicAndPartition(name, 0), config) val lastFlush = log.lastFlushTime for(i <- 0 until 200) { var set = TestUtils.singleMessageSet("test".getBytes()) @@ -182,7 +182,7 @@ class LogManagerTest extends JUnit3Suite { // verify that logs are always assigned to the least loaded partition for(partition <- 0 until 20) { - logManager.getOrCreateLog("test", partition) + logManager.createLog(TopicAndPartition("test", partition), logConfig) assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size) val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size) assertTrue("Load should balance evenly", counts.max <= counts.min + 1) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 792919b..059421d 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -27,7 +27,7 @@ import org.junit.Assert._ import org.junit.Test import kafka.utils._ import java.util -import kafka.admin.{AdminUtils, CreateTopicCommand} +import kafka.admin.AdminUtils import util.Properties import kafka.api.FetchRequestBuilder import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException} @@ -77,17 +77,15 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // restore set request handler logger to a higher level requestHandlerLogger.setLevel(Level.ERROR) server1.shutdown - server1.awaitShutdown() server2.shutdown - server2.awaitShutdown() Utils.rm(server1.config.logDirs) Utils.rm(server2.config.logDirs) super.tearDown() } - + @Test def testUpdateBrokerPartitionInfo() { - CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2) + AdminUtils.createTopic(zkClient, "new-topic", 1, 2) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) @@ -152,7 +150,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val producerConfig2 = new ProducerConfig(props2) // create topic with 1 partition and await leadership - CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2) + AdminUtils.createTopic(zkClient, "new-topic", 1, 2) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) @@ -199,11 +197,13 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", "2000") -// props.put("request.required.acks", "-1") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) // create topic - CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0") + AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0), + 1->Seq(0), + 2->Seq(0), + 3->Seq(0))) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) @@ -213,13 +213,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val config = new ProducerConfig(props) val producer = new Producer[String, String](config) - try { - // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only - // on broker 0 - producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) - } catch { - case e => fail("Unexpected exception: " + e) - } + // Available partition ids should be 0, 1, 2 and 3, all lead and hosted only on broker 0 + producer.send(new KeyedMessage[String, String]("new-topic", "test", "test1")) // kill the broker server1.shutdown @@ -263,7 +258,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ val producer = new Producer[String, String](config) // create topics in ZK - CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1") + AdminUtils.createTopicWithAssignment(zkClient, "new-topic", Map(0->Seq(0,1))) assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() => AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500) diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 89ba944..8397001 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -20,7 +20,7 @@ package kafka.producer import java.net.SocketTimeoutException import java.util.Properties import junit.framework.Assert -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.integration.KafkaServerTestHarness import kafka.message._ import kafka.server.KafkaConfig @@ -98,7 +98,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { props.put("host", "localhost") props.put("port", server.socketServer.port.toString) val producer = new SyncProducer(new SyncProducerConfig(props)) - CreateTopicCommand.createTopic(zkClient, "test", 1, 1) + AdminUtils.createTopic(zkClient, "test", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500) val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) @@ -146,9 +146,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { } // #2 - test that we get correct offsets when partition is owned by broker - CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1) + AdminUtils.createTopic(zkClient, "topic1", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0, 500) - CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1) + AdminUtils.createTopic(zkClient, "topic3", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500) val response2 = producer.send(request) diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 8a3e33b..9963502 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -62,7 +62,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { assertEquals(0L, fooPartition0Hw) val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1) // create leader and follower replicas - val log0 = logManagers(0).getOrCreateLog(topic, 0) + val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig()) val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0)) partition0.addReplicaIfNotExists(leaderReplicaPartition0) val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime) @@ -101,7 +101,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { assertEquals(0L, topic1Partition0Hw) val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1) // create leader log - val topic1Log0 = logManagers(0).getOrCreateLog(topic1, 0) + val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig()) // create a local replica for topic1 val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0)) topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0) @@ -117,7 +117,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { // add another partition and set highwatermark val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1) // create leader log - val topic2Log0 = logManagers(0).getOrCreateLog(topic2, 0) + val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig()) // create a local replica for topic2 val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0)) topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0) diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 129bc56..176718e 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -19,7 +19,7 @@ package kafka.server import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} @@ -61,7 +61,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val partitionId = 0 // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1))) // wait until leader is elected val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -108,7 +108,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val partitionId = 0 // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0 -> Seq(0, 1))) // wait until leader is elected val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index f857171..6801f4e 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -26,7 +26,7 @@ import org.junit.{After, Before, Test} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.utils.TestUtils._ import kafka.common.{ErrorMapping, TopicAndPartition} @@ -82,10 +82,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val part = Integer.valueOf(topicPartition.split("-").last).intValue // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + AdminUtils.createTopic(zkClient, topic, 1, 1) val logManager = server.getLogManager - val log = logManager.getOrCreateLog(topic, part) + val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 20) @@ -120,7 +120,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = topicPartition.split("-").head // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1") + AdminUtils.createTopic(zkClient, topic, 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) var offsetChanged = false @@ -145,10 +145,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val part = Integer.valueOf(topicPartition.split("-").last).intValue // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1") + AdminUtils.createTopic(zkClient, topic, 3, 1) val logManager = server.getLogManager - val log = logManager.getOrCreateLog(topic, part) + val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 20) log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) @@ -174,10 +174,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val part = Integer.valueOf(topicPartition.split("-").last).intValue // setup brokers in zookeeper as owners of partitions for this test - CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1") + AdminUtils.createTopic(zkClient, topic, 3, 1) val logManager = server.getLogManager - val log = logManager.getOrCreateLog(topic, part) + val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig) val message = new Message(Integer.toString(42).getBytes()) for(i <- 0 until 20) log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 48487e8..e322908 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -19,7 +19,7 @@ package kafka.server import org.scalatest.junit.JUnit3Suite import org.junit.Assert._ import java.io.File -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.utils.TestUtils._ import kafka.utils.IntEncoder import kafka.utils.{Utils, TestUtils} @@ -53,7 +53,14 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString) producerProps.put("request.required.acks", "-1") - + + override def tearDown() { + super.tearDown() + for(server <- servers) { + server.shutdown() + Utils.rm(server.config.logDirs(0)) + } + } def testHWCheckpointNoFailuresSingleLogSegment { // start both servers @@ -64,7 +71,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1))) // wait until leader is elected var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -86,7 +93,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(numMessages, leaderHW) val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L) assertEquals(numMessages, followerHW) - servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))}) } def testHWCheckpointWithFailuresSingleLogSegment { @@ -98,7 +104,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1))) // wait until leader is elected var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -148,7 +154,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) - servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointNoFailuresMultipleLogSegments { @@ -163,7 +168,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(0,1))) // wait until leader is elected var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -182,7 +187,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(hw, leaderHW) val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L) assertEquals(hw, followerHW) - servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointWithFailuresMultipleLogSegments { @@ -197,7 +201,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) + AdminUtils.createTopicWithAssignment(zkClient, topic, Map(0->Seq(server1.config.brokerId, server2.config.brokerId))) // wait until leader is elected var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) @@ -241,7 +245,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) - servers.foreach(server => Utils.rm(server.config.logDirs)) } private def sendMessages(n: Int = 1) { diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 48d5647..9d7e730 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -26,7 +26,6 @@ import org.junit.{After, Before, Test} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite -import kafka.admin.CreateTopicCommand import kafka.api.{OffsetCommitRequest, OffsetFetchRequest} import kafka.utils.TestUtils._ import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index d0e3590..5d26788 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -22,9 +22,10 @@ import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ import kafka.producer.KeyedMessage import kafka.serializer.StringEncoder -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.utils.TestUtils import junit.framework.Assert._ +import kafka.common._ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(2) @@ -50,7 +51,7 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { // create a topic and partition and await leadership for (topic <- List(topic1,topic2)) { - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":")) + AdminUtils.createTopic(zkClient, topic, 1, 2) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) } @@ -65,9 +66,10 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { def logsMatch(): Boolean = { var result = true for (topic <- List(topic1, topic2)) { - val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset + val tandp = TopicAndPartition(topic, partition) + val expectedOffset = brokers.head.getLogManager().getLog(tandp).get.logEndOffset result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total && - (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) } + (expectedOffset == item.getLogManager().getLog(tandp).get.logEndOffset) } } result } diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 7afbe54..9512604 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -26,7 +26,7 @@ import kafka.zk.ZooKeeperTestHarness import kafka.producer._ import kafka.utils.IntEncoder import kafka.utils.TestUtils._ -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.api.FetchRequestBuilder import kafka.utils.{TestUtils, Utils} @@ -49,7 +49,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { var producer = new Producer[Int, String](new ProducerConfig(producerConfig)) // create topic - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + AdminUtils.createTopic(zkClient, topic, 1, 1) // send some messages producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 571e2df..1c6f615 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -67,7 +67,7 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes() EasyMock.replay(logManager) val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) @@ -133,7 +133,7 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.getLog(TopicAndPartition(topic, 0))).andReturn(Some(log)).anyTimes() EasyMock.replay(logManager) val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index ec27ef9..b364ac2 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -76,7 +76,9 @@ class SchedulerTest { @Test def testNonPeriodicTask() { scheduler.schedule("test", counter1.getAndIncrement, delay = 0) - retry(30000, () => assertEquals(counter1.get, 1)) + retry(30000) { + assertEquals(counter1.get, 1) + } Thread.sleep(5) assertEquals("Should only run once", 1, counter1.get) } @@ -84,6 +86,8 @@ class SchedulerTest { @Test def testPeriodicTask() { scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5) - retry(30000, () => assertTrue("Should count to 20", counter1.get >= 20)) + retry(30000){ + assertTrue("Should count to 20", counter1.get >= 20) + } } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index d12d24e..1aa68ea 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -23,6 +23,7 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties +import junit.framework.AssertionFailedError import junit.framework.Assert._ import kafka.server._ import kafka.producer._ @@ -122,7 +123,7 @@ object TestUtils extends Logging { /** * Create a test config for the given node id */ - def createBrokerConfig(nodeId: Int, port: Int): Properties = { + def createBrokerConfig(nodeId: Int, port: Int = choosePort()): Properties = { val props = new Properties props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") @@ -439,18 +440,20 @@ object TestUtils extends Logging { * Execute the given block. If it throws an assert error, retry. Repeat * until no error is thrown or the time limit ellapses */ - def retry(maxWaitMs: Long, block: () => Unit) { + def retry(maxWaitMs: Long)(block: => Unit) { var wait = 1L val startTime = System.currentTimeMillis() while(true) { try { - block() + block return } catch { - case e: AssertionError => - if(System.currentTimeMillis - startTime > maxWaitMs) { + case e: AssertionFailedError => + val ellapsed = System.currentTimeMillis - startTime + if(ellapsed > maxWaitMs) { throw e } else { + info("Attempt failed, sleeping for " + wait + ", and then retrying.") Thread.sleep(wait) wait += math.min(wait, 1000) }