diff --git a/config/server.properties b/config/server.properties index bc6a521..0589c71 100644 --- a/config/server.properties +++ b/config/server.properties @@ -52,7 +52,7 @@ log.dir=/tmp/kafka-logs # The number of logical partitions per topic per server. More partitions allow greater parallelism # for consumption, but also mean more files. -num.partitions=1 +num.partitions=2 ############################# Log Flush Policy ############################# diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 0e6d9b8..33d7c2c 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -74,7 +74,6 @@ object ConsumerOffsetChecker extends Logging { val lag = logSize - offset println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offset, logSize, lag, owner match {case Some(ownerStr) => ownerStr case None => "none"})) - consumer.close() case None => // ignore } case None => @@ -157,6 +156,11 @@ object ConsumerOffsetChecker extends Logging { if (options.has("broker-info")) printBrokerInfo(); + for ((_, consumerOpt) <- consumerMap) + consumerOpt match { + case Some(consumer) => consumer.close() + case None => // ignore + } } finally { for (consumerOpt <- consumerMap.values) {