diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 842c110..e1cf393 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -42,7 +42,8 @@ object TopicCommand { } CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) - if (!opts.options.has(opts.listOpt)) CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) + if (!opts.options.has(opts.listOpt) && !opts.options.has(opts.describeOpt)) + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) @@ -67,10 +68,13 @@ object TopicCommand { } private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = { - val topicsSpec = opts.options.valueOf(opts.topicOpt) - val topicsFilter = new Whitelist(topicsSpec) - val allTopics = ZkUtils.getAllTopics(zkClient) - allTopics.filter(topicsFilter.isTopicAllowed).sorted + val allTopics = ZkUtils.getAllTopics(zkClient).sorted + if (opts.options.has(opts.topicOpt)) { + val topicsSpec = opts.options.valueOf(opts.topicOpt) + val topicsFilter = new Whitelist(topicsSpec) + allTopics.filter(topicsFilter.isTopicAllowed) + } else + allTopics } def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { @@ -107,7 +111,7 @@ object TopicCommand { val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) - println("adding partitions succeeded!") + println("Adding partitions succeeded!") } if(opts.options.has(opts.replicationFactorOpt)) Utils.croak("Changing the replication factor is not supported.") @@ -123,53 +127,51 @@ object TopicCommand { } def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { - if(opts.options.has(opts.topicsWithOverridesOpt)) { - ZkUtils.getAllTopics(zkClient).sorted.foreach { topic => - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) - if(configs.size() != 0) { - val replicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) - val numPartitions = replicaAssignment.size - val replicationFactor = replicaAssignment.head._2.size - println("\nTopic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s".format(topic, numPartitions, - replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) - } - } - } else { - for(topic <- ZkUtils.getAllTopics(zkClient).sorted) + val topics = getTopics(zkClient, opts) + for(topic <- topics) println(topic) - } } def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + CommandLineUtils.checkExclusiveArgs(opts.parser, opts.options, + opts.reportUnderReplicatedPartitionsOpt, opts.reportUnavailablePartitionsOpt, opts.topicsWithOverridesOpt) val topics = getTopics(zkClient, opts) val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false + val reportOverriddenConfigs = if (opts.options.has(opts.topicsWithOverridesOpt)) true else false val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet for (topic <- topics) { ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match { case Some(topicPartitionAssignment) => + val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions + val describePartitions: Boolean = !reportOverriddenConfigs val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) - if (!reportUnavailablePartitions && !reportUnderReplicatedPartitions) { - println(topic) - val config = AdminUtils.fetchTopicConfig(zkClient, topic) - println("\tconfigs: " + config.map(kv => kv._1 + " = " + kv._2).mkString(", ")) - println("\tpartitions: " + sortedPartitions.size) + if (describeConfigs) { + val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + if (!reportOverriddenConfigs || configs.size() != 0) { + val numPartitions = topicPartitionAssignment.size + val replicationFactor = topicPartitionAssignment.head._2.size + println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s" + .format(topic, numPartitions, replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) + } } - for ((partitionId, assignedReplicas) <- sortedPartitions) { - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId) - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) - if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) || - (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) || - (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) { - print("\t\ttopic: " + topic) - print("\tpartition: " + partitionId) - print("\tleader: " + (if(leader.isDefined) leader.get else "none")) - print("\treplicas: " + assignedReplicas.mkString(",")) - println("\tisr: " + inSyncReplicas.mkString(",")) + if (describePartitions) { + for ((partitionId, assignedReplicas) <- sortedPartitions) { + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId) + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId) + if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) || + (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) || + (reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) { + print("\tTopic: " + topic) + print("\tPartition: " + partitionId) + print("\tLeader: " + (if(leader.isDefined) leader.get else "none")) + print("\tReplicas: " + assignedReplicas.mkString(",")) + println("\tIsr: " + inSyncReplicas.mkString(",")) + } } } case None => - println("topic " + topic + " doesn't exist!") + println("Topic " + topic + " doesn't exist!") } } } @@ -255,7 +257,7 @@ object TopicCommand { val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions", "if set when describing topics, only show partitions whose leader is not available") val topicsWithOverridesOpt = parser.accepts("topics-with-overrides", - "if set when listing topics, only show topics that have overridden configs") + "if set when describing topics, only show topics that have overridden configs") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index e6875d6..c8c4212 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -28,7 +28,6 @@ object ConsumerConfig extends Config { val SocketBufferSize = 64*1024 val FetchSize = 1024 * 1024 val MaxFetchSize = 10*FetchSize - val NumConsumerFetchers = 1 val DefaultFetcherBackoffMs = 1000 val AutoCommit = true val AutoCommitInterval = 60 * 1000 @@ -94,9 +93,6 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** the number of byes of messages to attempt to fetch */ val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize) - - /** the number threads used to fetch data */ - val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers) /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b9e2bea..e4451bb 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -41,7 +41,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, private val config: ConsumerConfig, private val zkClient : ZkClient) extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), - config.clientId, config.numConsumerFetchers) { + config.clientId, 1) { private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null private var cluster: Cluster = null private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala index 5f563ca..26083fe 100644 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -18,7 +18,7 @@ import joptsimple.{OptionSpec, OptionSet, OptionParser} -/** + /** * Helper functions for dealing with command line utilities */ object CommandLineUtils extends Logging { @@ -33,4 +33,12 @@ object CommandLineUtils extends Logging { } } + def checkExclusiveArgs(parser: OptionParser, options: OptionSet, exclusiveOptions: OptionSpec[_]*) { + val usedOptions = exclusiveOptions.filter(options.has(_)).toSet + if (usedOptions.size > 1) { + System.err.println("Arguments " + usedOptions.map(_.toString).mkString(",") + " are mutually exclusive") + parser.printHelpOn(System.err) + System.exit(1) + } + } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 426b1a7..d88b6c3 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -150,7 +150,6 @@ object TestUtils extends Logging { props.put("auto.commit.interval.ms", "1000") props.put("rebalance.max.retries", "4") props.put("auto.offset.reset", "smallest") - props.put("num.consumer.fetchers", "2") props } diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index 55ee01b..ec3cd29 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -112,11 +112,6 @@ object ConsumerPerformance { .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(10) - val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) val options = parser.parse(args : _*) @@ -135,7 +130,6 @@ object ConsumerPerformance { props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest") props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) props.put("consumer.timeout.ms", "5000") - props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) val consumerConfig = new ConsumerConfig(props) val numThreads = options.valueOf(numThreadsOpt).intValue val topic = options.valueOf(topicOpt)