diff --git a/config/server.properties b/config/server.properties index 2ffe0eb..d36279e 100644 --- a/config/server.properties +++ b/config/server.properties @@ -57,9 +57,8 @@ socket.request.max.bytes=104857600 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. num.partitions=2 ############################# Log Flush Policy ############################# diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index f3fb3fd..4b1d117 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -86,7 +86,8 @@ public class DataGenerator { Long timestamp = RANDOM.nextLong(); if (timestamp < 0) timestamp = -timestamp; byte[] bytes = timestamp.toString().getBytes("UTF8"); - list.add(new KeyedMessage(_topic, null, bytes)); + Message message = new Message(bytes); + list.add(new KeyedMessage(_topic, null, message)); } // send events System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri); diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 2637586..70d1b81 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -26,27 +26,127 @@ import kafka.common.{TopicAndPartition, AdminCommandFailedException} object ReassignPartitionsCommand extends Logging { def main(args: Array[String]): Unit = { + val parser = new OptionParser + val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "The JSON file with the list of topics to reassign." + + "This option or manual-assignment-json-file needs to be specified. The format to use is - \n" + + "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}") + .withRequiredArg + .describedAs("topics to reassign json file path") + .ofType(classOf[String]) + + val manualAssignmentJsonFileOpt = parser.accepts("manual-assignment-json-file", "The JSON file with the list of manual reassignments" + + "This option or topics-to-move-json-file needs to be specified. The format to use is - \n" + + "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}") + .withRequiredArg + .describedAs("manual assignment json file path") + .ofType(classOf[String]) - val opts = new ReassignPartitionsCommandOptions(args) + val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" + + " in the form \"0,1,2\". This is required for automatic topic reassignment.") + .withRequiredArg + .describedAs("brokerlist") + .ofType(classOf[String]) - // should have exactly one action - val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _) - if(actions != 1) { - opts.parser.printHelpOn(System.err) - Utils.croak("Command must include exactly one action: --generate, --execute or --verify") + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + + "form host:port. Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + + val executeOpt = parser.accepts("execute", "This option does the actual reassignment. By default, the tool does a dry run") + .withOptionalArg() + .describedAs("execute") + .ofType(classOf[String]) + + val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The JSON file with the list of partitions and the " + + "new replicas they should be reassigned to, which can be obtained from the output of a dry run.") + .withRequiredArg + .describedAs("partition reassignment json file path") + .ofType(classOf[String]) + + val options = parser.parse(args : _*) + + for(arg <- List(zkConnectOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } } - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) + if (options.has(topicsToMoveJsonFileOpt) && options.has(manualAssignmentJsonFileOpt)) { + System.err.println("Only one of the json files should be specified") + parser.printHelpOn(System.err) + System.exit(1) + } - val zkConnect = opts.options.valueOf(opts.zkConnectOpt) + val zkConnect = options.valueOf(zkConnectOpt) var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) try { - if(opts.options.has(opts.verifyOpt)) - verifyAssignment(zkClient, opts) - else if(opts.options.has(opts.generateOpt)) - generateAssignment(zkClient, opts) - else if (opts.options.has(opts.executeOpt)) - executeAssignment(zkClient, opts) + + var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() + + if(options.has(statusCheckJsonFileOpt)) { + val jsonFile = options.valueOf(statusCheckJsonFileOpt) + val jsonString = Utils.readFileAsString(jsonFile) + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) + + println("Status of partition reassignment:") + val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) + reassignedPartitionsStatus.foreach { partition => + partition._2 match { + case ReassignmentCompleted => + println("Reassignment of partition %s completed successfully".format(partition._1)) + case ReassignmentFailed => + println("Reassignment of partition %s failed".format(partition._1)) + case ReassignmentInProgress => + println("Reassignment of partition %s is still in progress".format(partition._1)) + } + } + } else if(options.has(topicsToMoveJsonFileOpt)) { + if(!options.has(brokerListOpt)) { + System.err.println("broker-list is required if topics-to-move-json-file is used") + parser.printHelpOn(System.err) + System.exit(1) + } + val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) + val brokerListToReassign = options.valueOf(brokerListOpt).split(',').map(_.toInt) + val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) + val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) + val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) + + val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) + groupedByTopic.foreach { topicInfo => + val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, + topicInfo._2.head._2.size) + partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) + } + } else if (options.has(manualAssignmentJsonFileOpt)) { + val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt) + val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile) + partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(manualAssignmentJsonString) + if (partitionsToBeReassigned.isEmpty) + throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(manualAssignmentJsonFileOpt)) + } else { + System.err.println("Missing json file. One of the file needs to be specified") + parser.printHelpOn(System.err) + System.exit(1) + } + + if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt)) { + if (options.has(executeOpt)) { + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + + if(reassignPartitionsCommand.reassignPartitions()) + println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + else + println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + } else { + System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + + "The following is the replica assignment. Save it for the status check option.\n" + + ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)) + } + } } catch { case e: Throwable => println("Partitions reassignment failed due to " + e.getMessage) @@ -57,76 +157,6 @@ object ReassignPartitionsCommand extends Logging { } } - def verifyAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { - if(!opts.options.has(opts.reassignmentJsonFileOpt)) { - opts.parser.printHelpOn(System.err) - Utils.croak("If --verify option is used, command must include --reassignment-json-file that was used during the --execute option") - } - val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) - val jsonString = Utils.readFileAsString(jsonFile) - val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) - - println("Status of partition reassignment:") - val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) - reassignedPartitionsStatus.foreach { partition => - partition._2 match { - case ReassignmentCompleted => - println("Reassignment of partition %s completed successfully".format(partition._1)) - case ReassignmentFailed => - println("Reassignment of partition %s failed".format(partition._1)) - case ReassignmentInProgress => - println("Reassignment of partition %s is still in progress".format(partition._1)) - } - } - } - - def generateAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { - if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) { - opts.parser.printHelpOn(System.err) - Utils.croak("If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") - } - val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) - val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) - val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) - val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) - val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) - - var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() - val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) - groupedByTopic.foreach { topicInfo => - val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, - topicInfo._2.head._2.size) - partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) - } - val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) - println("Current partition replica assignment\n\n%s" - .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) - println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))) - } - - def executeAssignment(zkClient: ZkClient, opts: ReassignPartitionsCommandOptions) { - if(!opts.options.has(opts.reassignmentJsonFileOpt)) { - opts.parser.printHelpOn(System.err) - Utils.croak("If --execute option is used, command must include --reassignment-json-file that was output " + - "during the --generate option") - } - val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) - val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile) - val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) - if (partitionsToBeReassigned.isEmpty) - throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) - // before starting assignment, output the current replica assignment to facilitate rollback - val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) - println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback" - .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) - // start the reassignment - if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))) - else - println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) - } - private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) :Map[TopicAndPartition, ReassignmentStatus] = { val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) @@ -155,39 +185,6 @@ object ReassignPartitionsCommand extends Logging { } } } - - class ReassignPartitionsCommandOptions(args: Array[String]) { - val parser = new OptionParser - - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + - "form host:port. Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - val generateOpt = parser.accepts("generate", "Generate a candidate partition reassignment configuration." + - " Note that this only generates a candidate assignment, it does not execute it.") - val executeOpt = parser.accepts("execute", "Kick off the reassignment as specified by the --reassignment-json-file option.") - val verifyOpt = parser.accepts("verify", "Verify if the reassignment completed as specified by the --reassignment-json-file option.") - val reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", "The JSON file with the partition reassignment configuration" + - "The format to use is - \n" + - "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}") - .withRequiredArg - .describedAs("manual assignment json file path") - .ofType(classOf[String]) - val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "Generate a reassignment configuration to move the partitions" + - " of the specified topics to the list of brokers specified by the --broker-list option. The format to use is - \n" + - "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}") - .withRequiredArg - .describedAs("topics to reassign json file path") - .ofType(classOf[String]) - val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" + - " in the form \"0,1,2\". This is required if --topics-to-move-json-file is used to generate reassignment configuration") - .withRequiredArg - .describedAs("brokerlist") - .ofType(classOf[String]) - - val options = parser.parse(args : _*) - } } class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]]) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 083fd63..3c08dee 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -25,7 +25,6 @@ import scala.collection._ import scala.collection.JavaConversions._ import kafka.cluster.Broker import kafka.log.LogConfig -import kafka.consumer.Whitelist object TopicCommand { @@ -44,104 +43,81 @@ object TopicCommand { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + + if(opts.options.has(opts.createOpt)) + createTopic(zkClient, opts) + else if(opts.options.has(opts.alterOpt)) + alterTopic(zkClient, opts) + else if(opts.options.has(opts.deleteOpt)) + deleteTopic(zkClient, opts) + else if(opts.options.has(opts.listOpt)) + listTopics(zkClient) + else if(opts.options.has(opts.describeOpt)) + describeTopic(zkClient, opts) - try { - if(opts.options.has(opts.createOpt)) - createTopic(zkClient, opts) - else if(opts.options.has(opts.alterOpt)) - alterTopic(zkClient, opts) - else if(opts.options.has(opts.deleteOpt)) - deleteTopic(zkClient, opts) - else if(opts.options.has(opts.listOpt)) - listTopics(zkClient, opts) - else if(opts.options.has(opts.describeOpt)) - describeTopic(zkClient, opts) - } catch { - case e => println("Error while executing topic command", e) - } finally { - zkClient.close() - } - } - - private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = { - val topicsSpec = opts.options.valueOf(opts.topicOpt) - val topicsFilter = new Whitelist(topicsSpec) - val allTopics = ZkUtils.getAllTopics(zkClient) - allTopics.filter(topicsFilter.isTopicAllowed).sorted + zkClient.close() } def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) - val topic = opts.options.valueOf(opts.topicOpt) + val topics = opts.options.valuesOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) - if (opts.options.has(opts.replicaAssignmentOpt)) { - val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs) - } else { - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) - val partitions = opts.options.valueOf(opts.partitionsOpt).intValue - val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue - AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs) + for (topic <- topics) { + if (opts.options.has(opts.replicaAssignmentOpt)) { + val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs) + } else { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) + val partitions = opts.options.valueOf(opts.partitionsOpt).intValue + val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue + AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs) + } + println("Created topic \"%s\".".format(topic)) } - println("Created topic \"%s\".".format(topic)) } - + def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) - val topics = getTopics(zkClient, opts) - topics.foreach { topic => - if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { - val configsToBeAdded = parseTopicConfigsToBeAdded(opts) - val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) - // compile the final set of configs - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) - configs.putAll(configsToBeAdded) - configsToBeDeleted.foreach(config => configs.remove(config)) - AdminUtils.changeTopicConfig(zkClient, topic, configs) - println("Updated config for topic \"%s\".".format(topic)) - } - if(opts.options.has(opts.partitionsOpt)) { - println("WARNING: If partitions are increased for a topic that has a key, the partition " + - "logic or ordering of the messages will be affected") - val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue - val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) - AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) - println("adding partitions succeeded!") - } - if(opts.options.has(opts.replicationFactorOpt)) - Utils.croak("Changing the replication factor is not supported.") + val topic = opts.options.valueOf(opts.topicOpt) + if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { + val configsToBeAdded = parseTopicConfigsToBeAdded(opts) + val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) + // compile the final set of configs + val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + configs.putAll(configsToBeAdded) + configsToBeDeleted.foreach(config => configs.remove(config)) + AdminUtils.changeTopicConfig(zkClient, topic, configs) + println("Updated config for topic \"%s\".".format(topic)) + } + if(opts.options.has(opts.partitionsOpt)) { + println("WARNING: If partitions are increased for a topic that has a key, the partition " + + "logic or ordering of the messages will be affected") + val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue + val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) + AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) + println("adding partitions succeeded!") } + if(opts.options.has(opts.replicationFactorOpt)) + Utils.croak("Changing the replication factor is not supported.") } def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) - val topics = getTopics(zkClient, opts) - topics.foreach { topic => + for(topic <- opts.options.valuesOf(opts.topicOpt)) { AdminUtils.deleteTopic(zkClient, topic) println("Topic \"%s\" deleted.".format(topic)) } } - def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { - if(opts.options.has(opts.topicsWithOverridesOpt)) { - ZkUtils.getAllTopics(zkClient).sorted.foreach { topic => - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) - if(configs.size() != 0) { - val replicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) - val numPartitions = replicaAssignment.size - val replicationFactor = replicaAssignment.head._2.size - println("\nTopic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s".format(topic, numPartitions, - replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) - } - } - } else { - for(topic <- ZkUtils.getAllTopics(zkClient).sorted) - println(topic) - } + def listTopics(zkClient: ZkClient) { + for(topic <- ZkUtils.getAllTopics(zkClient).sorted) + println(topic) } def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - var topics = getTopics(zkClient, opts) + var topics: Seq[String] = opts.options.valuesOf(opts.topicOpt).toSeq.sorted + if (topics.size <= 0) + topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet @@ -223,8 +199,7 @@ object TopicCommand { val deleteOpt = parser.accepts("delete", "Delete the topic.") val describeOpt = parser.accepts("describe", "List details for the given topics.") val helpOpt = parser.accepts("help", "Print usage information.") - val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe. Can also accept a regular " + - "expression except for --create option") + val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) @@ -254,8 +229,6 @@ object TopicCommand { "if set when describing topics, only show under replicated partitions") val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions", "if set when describing topics, only show partitions whose leader is not available") - val topicsWithOverridesOpt = parser.accepts("topics-with-overrides", - "if set when listing topics, only show topics that have overridden configs") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index d41a705..fb2a230 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -132,7 +132,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV }) } - def isFromFollower = Request.isReplicaIdFromFollower(replicaId) + def isFromFollower = replicaId != Request.OrdinaryConsumerId && replicaId != Request.DebuggingConsumerId def isFromOrdinaryConsumer = replicaId == Request.OrdinaryConsumerId diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index ba59c31..b62330b 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -24,9 +24,6 @@ import kafka.utils.Logging object Request { val OrdinaryConsumerId: Int = -1 val DebuggingConsumerId: Int = -2 - - // Followers use broker id as the replica id, which are non-negative int. - def isReplicaIdFromFollower(replicaId: Int): Boolean = (replicaId >= 0) } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5c9307d..02ccc17 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -72,7 +72,7 @@ class Partition(val topic: String, leaderIsrUpdateLock synchronized { leaderReplicaIfLocal() match { case Some(_) => - inSyncReplicas.size < assignedReplicas.size + inSyncReplicas.size < replicationFactor case None => false } @@ -310,15 +310,19 @@ class Partition(val topic: String, def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = { /** * there are two cases that need to be handled here - - * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms, - * the follower is stuck and should be removed from the ISR + * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated + * for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the * follower is not catching up and should be removed from the ISR **/ val leaderLogEndOffset = leaderReplica.logEndOffset val candidateReplicas = inSyncReplicas - leaderReplica // Case 1 above - val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs) + val possiblyStuckReplicas = candidateReplicas.filter(r => r.logEndOffset < leaderLogEndOffset) + if(possiblyStuckReplicas.size > 0) + debug("Possibly stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, + possiblyStuckReplicas.map(_.brokerId).mkString(","))) + val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs)) if(stuckReplicas.size > 0) debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index ac491b4..a4227a4 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -101,7 +101,10 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk item.message.ensureValid() // validate checksum of message to ensure it is valid - new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) + val keyBuffer = item.message.key + val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer)) + val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload)) + new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset) } def clearCurrentChunk() { diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala index 4f20823..cf3853b 100644 --- a/core/src/main/scala/kafka/consumer/TopicFilter.scala +++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala @@ -41,10 +41,14 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging { override def toString = regex + def requiresTopicEventWatcher: Boolean + def isTopicAllowed(topic: String): Boolean } case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { + override def requiresTopicEventWatcher = !regex.matches("""[\p{Alnum}-|]+""") + override def isTopicAllowed(topic: String) = { val allowed = topic.matches(regex) @@ -58,6 +62,8 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { } case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) { + override def requiresTopicEventWatcher = true + override def isTopicAllowed(topic: String) = { val allowed = !topic.matches(regex) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 703b2e2..6d0cfa6 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -91,7 +91,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val messageStreamCreated = new AtomicBoolean(false) private var sessionExpirationListener: ZKSessionExpireListener = null - private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null + private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null private var loadBalancerListener: ZKRebalancerListener = null private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null @@ -302,6 +302,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, registerConsumerInZK(dirs, consumerIdString, topicCount) // explicitly trigger load balancing for this consumer loadBalancerListener.syncedRebalance() + // There is no need to resubscribe to child and state changes. // The child change watchers will be set inside rebalance when we read the children list. } @@ -314,8 +315,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def handleDataChange(dataPath : String, data: Object) { try { info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") - // queue up the rebalance event - loadBalancerListener.rebalanceEventTriggered() + // explicitly trigger load balancing for this consumer + loadBalancerListener.syncedRebalance() + // There is no need to re-subscribe the watcher since it will be automatically // re-registered upon firing of this event by zkClient } catch { @@ -333,6 +335,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { + private val correlationId = new AtomicInteger(0) private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() @@ -364,10 +367,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - rebalanceEventTriggered() - } - - def rebalanceEventTriggered() { inLock(lock) { isWatcherTriggered = true cond.signalAll() @@ -656,8 +655,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, dirs, consumerIdString, topicCount, loadBalancerListener) // create listener for topic partition change event if not exist yet - if (topicPartitionChangeListener == null) - topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener) + if (topicPartitionChangeListenner == null) + topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener) val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams @@ -715,7 +714,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 - zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) + zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner) } // explicitly trigger load balancing for this consumer @@ -755,11 +754,19 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount) reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams) - /* - * Topic events will trigger subsequent synced rebalances. - */ - info("Creating topic event watcher for topics " + topicFilter) - wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkClient, this) + if (!topicFilter.requiresTopicEventWatcher) { + info("Not creating event watcher for trivial whitelist " + topicFilter) + } + else { + info("Creating topic event watcher for whitelist " + topicFilter) + wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this) + + /* + * Topic events will trigger subsequent synced rebalances. Also, the + * consumer will get registered only after an allowed topic becomes + * available. + */ + } def handleTopicEvent(allTopics: Seq[String]) { debug("Handling topic event") diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index 38f4ec0..a67c193 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -22,11 +22,14 @@ import kafka.utils.{ZkUtils, ZKStringSerializer, Logging} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState -class ZookeeperTopicEventWatcher(val zkClient: ZkClient, +class ZookeeperTopicEventWatcher(val config:ConsumerConfig, val eventHandler: TopicEventHandler[String]) extends Logging { val lock = new Object() + private var zkClient: ZkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, ZKStringSerializer) + startWatchingTopicEvents() private def startWatchingTopicEvents() { @@ -50,10 +53,11 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient, info("Shutting down topic event watcher.") if (zkClient != null) { stopWatchingTopicEvents() + zkClient.close() + zkClient = null } - else { - warn("Cannot shutdown since the embedded zookeeper client has already closed.") - } + else + warn("Cannot shutdown already shutdown topic event watcher.") } } @@ -66,6 +70,7 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient, if (zkClient != null) { val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList debug("all topics: %s".format(latestTopics)) + eventHandler.handleTopicEvent(latestTopics) } } @@ -88,8 +93,10 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient, def handleNewSession() { lock.synchronized { if (zkClient != null) { - info("ZK expired: resubscribing topic event listener to topic registry") - zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) + info( + "ZK expired: resubscribing topic event listener to topic registry") + zkClient.subscribeChildChanges( + ZkUtils.BrokerTopicsPath, topicEventListener) } } } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 7991e42..beca460 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -90,7 +90,6 @@ class ControllerChannelManager (private val controllerContext: ControllerContext private def removeExistingBroker(brokerId: Int) { try { brokerStateInfo(brokerId).channel.disconnect() - brokerStateInfo(brokerId).messageQueue.clear() brokerStateInfo(brokerId).requestSendThread.shutdown() brokerStateInfo.remove(brokerId) }catch { @@ -239,9 +238,8 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) for (p <- partitionStateInfos) { val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" - stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + - "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, - p._2.leaderIsrAndControllerEpoch, correlationId, broker, + stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " + + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, correlationId, broker, p._1._1, p._1._2)) } sendRequest(broker, leaderAndIsrRequest, null) @@ -252,9 +250,8 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq val partitionStateInfos = m._2.toMap val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId, partitionStateInfos, controllerContext.liveOrShuttingDownBrokers) - partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + - "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, - correlationId, broker, p._1))) + partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request with " + + "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, correlationId, broker, p._1))) sendRequest(broker, updateMetadataRequest, null) } updateMetadataRequestMap.clear() diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 965d0e5..8017abb 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -21,14 +21,14 @@ import collection.immutable.Set import com.yammer.metrics.core.Gauge import java.lang.{IllegalStateException, Object} import java.util.concurrent.TimeUnit -import kafka.admin.PreferredReplicaLeaderElectionCommand +import kafka.admin.{AdminOperationException, PreferredReplicaLeaderElectionCommand} import kafka.api._ import kafka.cluster.Broker import kafka.common._ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ -import kafka.utils.{Json, Utils, ZkUtils, Logging} +import kafka.utils.{Json, Utils, ZkUtils, Logging, KafkaScheduler} import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} @@ -112,6 +112,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, config.brokerId) + // have a separate scheduler for the controller to be able to start and stop independently of the + // kafka server + private val autoRebalanceScheduler = new KafkaScheduler(1) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) @@ -134,7 +137,23 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg if (!isActive()) 0 else - controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader)) + controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader)) + } + } + } + ) + + newGauge( + "PreferredReplicaImbalanceCount", + new Gauge[Int] { + def value(): Int = { + controllerContext.controllerLock synchronized { + if (!isActive()) + 0 + else + controllerContext.partitionReplicaAssignment.count { + case (topicPartition, replicas) => controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head + } } } } @@ -250,6 +269,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg initializeAndMaybeTriggerPreferredReplicaElection() /* send partition leadership info to all live brokers */ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + if (config.autoLeaderRebalanceEnable) { + info("starting the partition rebalance scheduler") + autoRebalanceScheduler.startup() + autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, + 5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS) + } } else info("Controller has been shut down, aborting startup/failover") @@ -361,19 +386,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * Reassigning replicas for a partition goes through a few stages - * RAR = Reassigned replicas * AR = Original list of replicas for partition - * 1. Write new AR = AR + RAR. At this time, update the leader epoch in zookeeper and send a LeaderAndIsr request with - * AR = AR + RAR to all replicas in (AR + RAR) + * 1. Write new AR = AR + RAR * 2. Start new replicas RAR - AR. * 3. Wait until new replicas are in sync with the leader - * 4. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr - * will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent. - * In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in - * RAR - AR back in the ISR - * 5. Stop old replicas AR - RAR. As part of this, we make 2 state changes OfflineReplica and NonExistentReplica. As part - * of OfflineReplica state change, we shrink the ISR to remove RAR - AR in zookeeper and sent a LeaderAndIsr ONLY to - * the Leader to notify it of the shrunk ISR. After that, we send a StopReplica (delete = false) to the replicas in - * RAR - AR. Currently, NonExistentReplica state change is a NO-OP - * 6. Write new AR = RAR. As part of this, we finally change the AR in zookeeper to RAR. + * 4. If the leader is not in RAR, elect a new leader from RAR + * 5. Stop old replicas AR - RAR + * 6. Write new AR = RAR * 7. Remove partition from the /admin/reassign_partitions path */ def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { @@ -500,6 +518,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg isRunning = false partitionStateMachine.shutdown() replicaStateMachine.shutdown() + if (config.autoLeaderRebalanceEnable) + autoRebalanceScheduler.shutdown() if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null @@ -684,6 +704,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg brokerRequestBatch.newBatch() updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match { case Some(updatedLeaderIsrAndControllerEpoch) => + // send the shrunk assigned replica list to all the replicas, including the leader, so that it no longer + // allows old replicas to enter ISR brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) @@ -710,18 +732,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) { - if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) { - // stop watching the ISR changes for this partition - zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), - controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) - } + // stop watching the ISR changes for this partition + zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), + controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) // read the current list of reassigned partitions from zookeeper val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) - // update the cache. NO-OP if the partition's reassignment was never started + // update the cache controllerContext.partitionsBeingReassigned.remove(topicAndPartition) } @@ -817,7 +837,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString())) updateSucceeded } else { - warn("Cannot remove replica %d from ISR of partition %s since it is not in the ISR. Leader = %d ; ISR = %s" + warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s" .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr)) finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)) controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get) @@ -893,7 +913,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ @throws(classOf[Exception]) def handleNewSession() { - info("ZK expired; shut down all controller components and try to re-elect") controllerContext.controllerLock synchronized { Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() @@ -906,6 +925,67 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } } + + private def checkAndTriggerPartitionRebalance(): Unit = { + if (isActive()) { + trace("checking need to trigger partition rebalance") + // get all the active brokers + var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null + controllerContext.controllerLock synchronized { + preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy { + case(topicAndPartition, assignedReplicas) => assignedReplicas.head + } + } + debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers) + // for each broker, check if a preferred replica election needs to be triggered + preferredReplicasForTopicsByBrokers.foreach { + case(leaderBroker, topicAndPartitionsForBroker) => { + var imbalanceRatio: Double = 0 + var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null + controllerContext.controllerLock synchronized { + topicsNotInPreferredReplica = + topicAndPartitionsForBroker.filter { + case(topicPartition, replicas) => { + controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker + } + } + debug("topics not in preferred replica " + topicsNotInPreferredReplica) + val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size + val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size + imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker + trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio)) + } + // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions + // that need to be on this broker + if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { + controllerContext.controllerLock synchronized { + // do this check only if the broker is live and there are no partitions being reassigned currently + // and preferred replica election is not in progress + if (controllerContext.liveBrokerIds.contains(leaderBroker) && + controllerContext.partitionsBeingReassigned.size == 0 && + controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) { + val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath + val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition)) + val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList)) + try { + ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) + info("Created preferred replica election path with %s".format(jsonData)) + } catch { + case e2: ZkNodeExistsException => + val partitionsUndergoingPreferredReplicaElection = + PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) + error("Preferred replica leader election currently in progress for " + + "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)); + case e3: Throwable => + error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys)) + } + } + } + } + } + } + } + } } /** diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 5859ce7..829163a 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -401,8 +401,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1)) info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) - if (partitionsRemainingToBeAdded.size > 0) - controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) + controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) } catch { case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e ) } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index ad4ee53..c52225a 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -171,17 +171,24 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(currLeaderIsrAndControllerEpoch) => - controller.removeReplicaFromIsr(topic, partition, replicaId) match { - case Some(updatedLeaderIsrAndControllerEpoch) => - // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), - topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) - replicaState.put((topic, partition, replicaId), OfflineReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) - false - case None => - true + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId)) + controller.removeReplicaFromIsr(topic, partition, replicaId) match { + case Some(updatedLeaderIsrAndControllerEpoch) => + // send the shrunk ISR state change request only to the leader + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), + topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) + replicaState.put((topic, partition, replicaId), OfflineReplica) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + false + case None => + true + } + else { + replicaState.put((topic, partition, replicaId), OfflineReplica) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + false } case None => true diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index e1f8b97..6c099da 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -123,8 +123,6 @@ class FileMessageSet private[kafka](@volatile var file: File, if(offset >= targetOffset) return OffsetPosition(offset, position) val messageSize = buffer.getInt() - if(messageSize < Message.MessageOverhead) - throw new IllegalStateException("Invalid message size: " + messageSize) position += MessageSet.LogOverhead + messageSize } null diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index beda421..9205128 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -155,19 +155,26 @@ class Log(val dir: File, activeSegment.index.resize(config.maxIndexSize) } - // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment - for (s <- logSegments) - s.index.sanityCheck() + // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset. + for (s <- logSegments) { + require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, + "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" + .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset)) + } } private def recoverLog() { - // if we have the clean shutdown marker, skip recovery - if(hasCleanShutdownFile) { - this.recoveryPoint = activeSegment.nextOffset + val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L} + val needsRecovery = !(new File(dir.getParentFile, CleanShutdownFile)).exists() + if(!needsRecovery) { + this.recoveryPoint = lastOffset + return + } + if(lastOffset <= this.recoveryPoint) { + info("Log '%s' is fully intact, skipping recovery".format(name)) + this.recoveryPoint = lastOffset return } - - // okay we need to actually recovery this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { val curr = unflushed.next @@ -189,11 +196,6 @@ class Log(val dir: File, } } } - - /** - * Check if we have the "clean shutdown" file - */ - private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists() /** * The number of segments in the log. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 81be88a..390b759 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -175,8 +175,6 @@ class LogManager(val logDirs: Array[File], allLogs.foreach(_.close()) // update the last flush point checkpointRecoveryPointOffsets() - // mark that the shutdown was clean by creating the clean shutdown marker file - logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())) } finally { // regardless of whether the close succeeded, we need to unlock the data directories dirLocks.foreach(_.destroy()) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 8a62dfa..2f4e303 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -69,8 +69,12 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi raf.setLength(roundToExactMultiple(maxIndexSize, 8)) } + val len = raf.length() + if(len < 0 || len % 8 != 0) + throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 8.") + /* memory-map the file */ - val len = raf.length() val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) /* set the position in the index for the next entry */ @@ -95,20 +99,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi var maxEntries = mmap.limit / 8 /* the last offset in the index */ - var lastOffset = readLastEntry.offset + var lastOffset = readLastOffset() debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) /** - * The last entry in the index + * The last offset written to the index */ - def readLastEntry(): OffsetPosition = { + private def readLastOffset(): Long = { inLock(lock) { - size.get match { - case 0 => OffsetPosition(baseOffset, 0) - case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1)) - } + val offset = + size.get match { + case 0 => 0 + case s => relativeOffset(this.mmap, s-1) + } + baseOffset + offset } } @@ -173,7 +179,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /* return the nth offset relative to the base offset */ private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) - /* return the nth physical position */ + /* return the nth physical offset */ private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) /** @@ -252,7 +258,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi inLock(lock) { this.size.set(entries) mmap.position(this.size.get * 8) - this.lastOffset = readLastEntry.offset + this.lastOffset = readLastOffset } } @@ -345,20 +351,6 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } /** - * Do a basic sanity check on this index to detect obvious problems - * @throw IllegalArgumentException if any problems are found - */ - def sanityCheck() { - require(entries == 0 || lastOffset > baseOffset, - "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" - .format(file.getAbsolutePath, lastOffset, baseOffset)) - val len = file.length() - require(len % 8 == 0, - "Index file " + file.getName + " is corrupt, found " + len + - " bytes which is not positive or not a multiple of 8.") - } - - /** * Round a number to the greatest exact multiple of the given factor less than the given number. * E.g. roundToExactMultiple(67, 8) == 64 */ diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala index d693abc..20c0e70 100644 --- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala +++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala @@ -17,18 +17,5 @@ package kafka.message -import kafka.serializer.Decoder -import kafka.utils.Utils - -case class MessageAndMetadata[K, V](topic: String, partition: Int, - private val rawMessage: Message, offset: Long, - keyDecoder: Decoder[K], valueDecoder: Decoder[V]) { - - /** - * Return the decoded message key and payload - */ - def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(rawMessage.key)) - - def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload)) -} +case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int, offset: Long) diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 42239b2..88ae784 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -87,19 +87,15 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { } override def append(event: LoggingEvent) { - val message = subAppend(event) + val message : String = if( this.layout == null) { + event.getRenderedMessage + } + else this.layout.format(event) LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message) val messageData = new KeyedMessage[String, String](topic, message) producer.send(messageData); } - def subAppend(event: LoggingEvent): String = { - if(this.layout == null) - event.getRenderedMessage - else - this.layout.format(event) - } - override def close() { if(!this.closed) { this.closed = true diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 9390edf..394e981 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -20,7 +20,7 @@ package kafka.server import scala.collection.mutable import scala.collection.Set import scala.collection.Map -import kafka.utils.{Utils, Logging} +import kafka.utils.Logging import kafka.cluster.Broker import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition @@ -63,7 +63,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: ) private def getFetcherId(topic: String, partitionId: Int) : Int = { - Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers + (31 * topic.hashCode() + partitionId) % numFetchers } // to be defined in subclass to create a specific fetcher diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 29abc46..80a70f1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -392,10 +392,10 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) val maxOffsetOpt = - if (Request.isReplicaIdFromFollower(fromReplicaId)) - None - else + if (fromReplicaId == Request.OrdinaryConsumerId) Some(localReplica.highWatermark) + else + None val messages = localReplica.log match { case Some(log) => log.read(offset, maxSize, maxOffsetOpt) @@ -519,11 +519,8 @@ class KafkaApis(val requestChannel: RequestChannel, uniqueTopics = { if(metadataRequest.topics.size > 0) metadataRequest.topics.toSet - else { - partitionMetadataLock synchronized { - leaderCache.keySet.map(_.topic) - } - } + else + leaderCache.keySet.map(_.topic) } val topicMetadataList = partitionMetadataLock synchronized { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a7e5b73..921f456 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -104,15 +104,27 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum size of a single log file */ val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) + /* the maximum size of a single log file for some specific topic */ + val logSegmentBytesPerTopicMap = props.getMap("log.segment.bytes.per.topic", _.toInt > 0).mapValues(_.toInt) + /* the maximum time before a new log segment is rolled out */ val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) + /* the number of hours before rolling out a new log segment for some specific topic */ + val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt) + /* the number of hours to keep a log file before deleting it */ val logRetentionTimeMillis = getLogRetentionTimeMillis + /* the number of hours to keep a log file before deleting it for some specific topic*/ + val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt > 0).mapValues(_.toInt) + /* the maximum size of the log before deleting it */ val logRetentionBytes = props.getLong("log.retention.bytes", -1) + /* the maximum size of the log for some specific topic before deleting it */ + val logRetentionBytesPerTopicMap = props.getMap("log.retention.bytes.per.topic", _.toLong > 0).mapValues(_.toLong) + /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue)) @@ -198,11 +210,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the number of byes of messages to attempt to fetch */ val replicaFetchMaxBytes = props.getIntInRange("replica.fetch.max.bytes", ConsumerConfig.FetchSize, (messageMaxBytes, Int.MaxValue)) - /* max wait time for each fetcher request issued by follower replicas. This value should always be less than the - * replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics */ + /* max wait time for each fetcher request issued by follower replicas*/ val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500) - require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be at least replica.lag.time.max.ms" + - " to prevent frequent changes in ISR") /* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */ val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1) @@ -220,6 +229,18 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the purge interval (in number of requests) of the producer request purgatory */ val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000) + /* Enables auto leader balancing. A background thread checks and triggers leader + * balance if required at regular intervals */ + val autoLeaderRebalanceEnable = props.getBoolean("auto.leader.rebalance.enable", false) + + /* the ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above + * this value per broker. The value is specified in percentage. */ + val leaderImbalancePerBrokerPercentage = props.getInt("leader.imbalance.per.broker.percentage", 10) + + /* the frequency with which the partition rebalance check is triggered by the controller */ + val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds", 300) + + /*********** Controlled shutdown configuration ***********/ /** Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens */ diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 242c18d..161f581 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -77,17 +77,16 @@ class ReplicaManager(val config: KafkaConfig, newGauge( "UnderReplicatedPartitions", new Gauge[Int] { - def value = underReplicatedPartitionCount() + def value = { + leaderPartitionsLock synchronized { + leaderPartitions.count(_.isUnderReplicated) + } + } } ) val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) - def underReplicatedPartitionCount(): Int = { - leaderPartitionsLock synchronized { - leaderPartitions.count(_.isUnderReplicated) - } - } def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) @@ -205,19 +204,17 @@ class ReplicaManager(val config: KafkaConfig, } def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { - leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => - stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId, - leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition)) - } + leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partition), stateInfo) => + stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d received from controller %d epoch %d for partition [%s,%d]" + .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, + leaderAndISRRequest.controllerEpoch, topic, partition))} + info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest)) + replicaStateChangeLock synchronized { val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { - leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => - stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." + - " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId, - leaderAndISRRequest.controllerEpoch, controllerEpoch)) - } + stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d. Latest known controller epoch is %d" + .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch)) (responseMap, ErrorMapping.StaleControllerEpochCode) } else { val controllerId = leaderAndISRRequest.controllerId @@ -229,22 +226,15 @@ class ReplicaManager(val config: KafkaConfig, leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) val partitionLeaderEpoch = partition.getLeaderEpoch() - // If the leader epoch is valid record the epoch of the controller that made the leadership decision. - // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) { - if(partitionStateInfo.allReplicas.contains(config.brokerId)) - partitionState.put(partition, partitionStateInfo) - else { - stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " + - "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]") - .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, - partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId)) - } + // If the leader epoch is valid record the epoch of the controller that made the leadership decision. + // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path + partitionState.put(partition, partitionStateInfo) } else { // Otherwise record the error code in response stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") - .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, + .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } @@ -257,6 +247,7 @@ class ReplicaManager(val config: KafkaConfig, if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap) if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) + info("Handled leader and isr request %s".format(leaderAndISRRequest)) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions if (!hwThreadInitialized) { @@ -283,10 +274,10 @@ class ReplicaManager(val config: KafkaConfig, private def makeLeaders(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = { - partitionState.foreach(state => - stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-leader transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) + stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "starting the become-leader transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, + partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) @@ -294,11 +285,9 @@ class ReplicaManager(val config: KafkaConfig, try { // First stop fetchers for all the partitions replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " + - "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) + // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => partition.makeLeader(controllerId, partitionStateInfo, correlationId)} @@ -309,31 +298,27 @@ class ReplicaManager(val config: KafkaConfig, } } catch { case e: Throwable => - partitionState.foreach { state => - val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" + - " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, - TopicAndPartition(state._1.topic, state._1.partitionId)) - stateChangeLogger.error(errorMsg, e) - } + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) + stateChangeLogger.error(errorMsg, e) // Re-throw the exception for it to be caught in KafkaApis throw e } - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "for the become-leader transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-leader transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, + partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) } /* * Make the current broker to become follower for a given set of partitions by: * - * 1. Remove these partitions from the leader partitions set. - * 2. Mark the replicas as followers so that no more data can be added from the producer clients. - * 3. Stop fetchers for these partitions so that no more data can be added by the replica fetcher threads. - * 4. Truncate the log and checkpoint offsets for these partitions. - * 5. If the broker is not shutting down, add the fetcher to the new leaders. + * 1. Stop fetchers for these partitions + * 2. Truncate the log and checkpoint offsets for these partitions. + * 3. If the broker is not shutting down, add the fetcher to the new leaders + * 4. Update the partition metadata in cache + * 5. Remove these partitions from the leader partitions set * * The ordering of doing these steps make sure that the replicas in transition will not * take any more messages before checkpointing offsets so that all messages before the checkpoint @@ -341,79 +326,61 @@ class ReplicaManager(val config: KafkaConfig, * * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where * the error message will be set on each partition since we do not know which partition caused it + * TODO: the above may need to be fixed later */ - private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], + private def makeFollowers(controllerId: Int, epoch: Int, + partitionState: Map[Partition, PartitionStateInfo], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { - partitionState.foreach(state => - stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) + stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "starting the become-follower transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, + partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) try { - leaderPartitionsLock synchronized { - leaderPartitions --= partitionState.keySet - } - - partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => - partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} - replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + - "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark }) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + - "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, - TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch)) - } + stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) + if (!isShuttingDown.get()) { - val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() - partitionState.foreach { - case (partition, partitionStateInfo) => - val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader - leaders.find(_.id == leader) match { - case Some(leaderBroker) => - partitionAndOffsets.put(new TopicAndPartition(partition), - BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) - case None => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " + - "controller %d epoch %d for partition %s since the designated leader %d " + - "cannot be found in live or shutting down brokers %s").format(localBrokerId, - correlationId, controllerId, epoch, partition, leader, leaders.mkString(","))) - } - } - replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) + replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) => + new TopicAndPartition(partition) -> + BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get, + partition.getReplica().get.logEndOffset)} + ) } else { - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, - controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d since it is shutting down") + .format(localBrokerId, correlationId, controllerId, epoch)) + } + + partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => + partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} + + leaderPartitionsLock synchronized { + leaderPartitions --= partitionState.keySet } } catch { case e: Throwable => - val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " + + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) stateChangeLogger.error(errorMsg, e) // Re-throw the exception for it to be caught in KafkaApis throw e } - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "for the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-follower transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) } private def maybeShrinkIsr(): Unit = { diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 5e8c56d..f1f139e 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -269,40 +269,34 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa while (isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None for ( (replicaId, messageIterator) <- messageIteratorMap) { - try { - if (messageIterator.hasNext) { - val messageAndOffset = messageIterator.next() - - // only verify up to the high watermark - if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw) - isMessageInAllReplicas = false - else { - messageInfoFromFirstReplicaOpt match { - case None => - messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) - case Some(messageInfoFromFirstReplica) => - if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { - println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition - + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " - + messageInfoFromFirstReplica.offset + " doesn't match replica " - + replicaId + "'s offset " + messageAndOffset.offset) - System.exit(1) - } - if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum) - println(ReplicaVerificationTool.getCurrentTimeString + ": partition " - + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset + "; replica " - + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum - + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum) - } - } - } else + if (messageIterator.hasNext) { + val messageAndOffset = messageIterator.next() + + // only verify up to the high watermark + if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw) isMessageInAllReplicas = false - } catch { - case t => - throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." - .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) - } + else { + messageInfoFromFirstReplicaOpt match { + case None => + messageInfoFromFirstReplicaOpt = Some( + MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) + case Some(messageInfoFromFirstReplica) => + if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { + println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition + + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " + + messageInfoFromFirstReplica.offset + " doesn't match replica " + + replicaId + "'s offset " + messageAndOffset.offset) + System.exit(1) + } + if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum) + println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset + "; replica " + + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum + + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum) + } + } + } else + isMessageInAllReplicas = false } if (isMessageInAllReplicas) { val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 52d35a3..c30069e 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -23,7 +23,7 @@ import java.util.Properties import kafka.utils._ import kafka.log._ import kafka.zk.ZooKeeperTestHarness -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.server.KafkaConfig import kafka.utils.{Logging, ZkUtils, TestUtils} import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition} @@ -153,10 +153,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) - // in sync replicas should not have any replica that is not in the new assigned replicas - checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) - ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) servers.foreach(_.shutdown()) } @@ -182,8 +179,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) - checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) - ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) + // leader should be 2 servers.foreach(_.shutdown()) } @@ -209,8 +205,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 2000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) - checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) - ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) + // leader should be 2 servers.foreach(_.shutdown()) } @@ -227,6 +222,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient) assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition)) + // leader should be 2 servers.foreach(_.shutdown()) } @@ -248,9 +244,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) - checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) - // ensure that there are no under replicated partitions - ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) servers.foreach(_.shutdown()) } @@ -333,7 +326,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { servers.foreach(_.shutdown()) } } - + /** * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic * then changes the config and checks that the new values take effect. @@ -343,14 +336,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partitions = 3 val topic = "my-topic" val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0))) - + def makeConfig(messageSize: Int, retentionMs: Long) = { var props = new Properties() props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString) props.setProperty(LogConfig.RententionMsProp, retentionMs.toString) props } - + def checkConfig(messageSize: Int, retentionMs: Long) { TestUtils.retry(10000) { for(part <- 0 until partitions) { @@ -361,14 +354,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } } } - + try { // create a topic with a few config overrides and check that they are applied val maxMessageSize = 1024 val retentionMs = 1000*1000 AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs)) checkConfig(maxMessageSize, retentionMs) - + // now double the config values for the topic and check that it is applied AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) checkConfig(2*maxMessageSize, 2 * retentionMs) @@ -378,26 +371,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } } - private def checkForPhantomInSyncReplicas(topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) { - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) - // in sync replicas should not have any replica that is not in the new assigned replicas - val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet - assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas), - phantomInSyncReplicas.size == 0) - } - - private def ensureNoUnderReplicatedPartitions(topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int], - servers: Seq[KafkaServer]) { - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) - assertFalse("Reassigned partition [%s,%d] is underreplicated".format(topic, partitionToBeReassigned), - inSyncReplicas.size < assignedReplicas.size) - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned) - assertTrue("Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned), leader.isDefined) - val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head - assertTrue("Reassigned partition [%s,%d] is underreplicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get), - leaderBroker.replicaManager.underReplicatedPartitionCount() == 0) - } - private def checkIfReassignPartitionPathExists(): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 9347ea6..ef1de83 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -88,40 +88,4 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload)) assertEquals(unconsumed, receivedMessages) } - - @Test - def testConsumerIteratorDecodingFailure() { - val messageStrings = (0 until 10).map(_.toString).toList - val messages = messageStrings.map(s => new Message(s.getBytes)) - val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages:_*) - - topicInfos(0).enqueue(messageSet) - assertEquals(1, queue.size) - - val iter = new ConsumerIterator[String, String](queue, - ConsumerConfig.ConsumerTimeoutMs, - new FailDecoder(), - new FailDecoder(), - clientId = "") - - val receivedMessages = (0 until 5).map{ i => - assertTrue(iter.hasNext) - val message = iter.next - assertEquals(message.offset, i + consumedOffset) - - try { - message.message // should fail - } - catch { - case e: UnsupportedOperationException => // this is ok - case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage) - } - } - } - - class FailDecoder(props: VerifiableProperties = null) extends Decoder[String] { - def fromBytes(bytes: Array[Byte]): String = { - throw new UnsupportedOperationException("This decoder does not work at all..") - } - } } diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index cf2724b..40a2bf7 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -29,13 +29,16 @@ class TopicFilterTest extends JUnitSuite { def testWhitelists() { val topicFilter1 = new Whitelist("white1,white2") + assertFalse(topicFilter1.requiresTopicEventWatcher) assertTrue(topicFilter1.isTopicAllowed("white2")) assertFalse(topicFilter1.isTopicAllowed("black1")) val topicFilter2 = new Whitelist(".+") + assertTrue(topicFilter2.requiresTopicEventWatcher) assertTrue(topicFilter2.isTopicAllowed("alltopics")) val topicFilter3 = new Whitelist("white_listed-topic.+") + assertTrue(topicFilter3.requiresTopicEventWatcher) assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1")) assertFalse(topicFilter3.isTopicAllowed("black1")) } @@ -43,5 +46,6 @@ class TopicFilterTest extends JUnitSuite { @Test def testBlacklists() { val topicFilter1 = new Blacklist("black1") + assertTrue(topicFilter1.requiresTopicEventWatcher) } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 6b76037..5f2c2e8 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -212,14 +212,15 @@ class LogSegmentTest extends JUnit3Suite { */ @Test def testRecoveryWithCorruptMessage() { + val rand = new Random(1) val messagesAppended = 20 for(iteration <- 0 until 10) { val seg = createSegment(0) for(i <- 0 until messagesAppended) seg.append(i, messages(i, i.toString)) - val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended) + val offsetToBeginCorruption = rand.nextInt(messagesAppended) // start corrupting somewhere in the middle of the chosen record all the way to the end - val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15) + val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15) TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position) seg.recover(64*1024) assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1da1393..1571f1e 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -592,29 +592,29 @@ class LogTest extends JUnitSuite { val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) val set = TestUtils.singleMessageSet("test".getBytes()) val recoveryPoint = 50L - for(iteration <- 0 until 50) { + for(iteration <- 0 until 10) { // create a log and write some messages to it - logDir.mkdirs() var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) - val numMessages = 50 + TestUtils.random.nextInt(50) - for(i <- 0 until numMessages) + for(i <- 0 until 100) log.append(set) - val messages = log.logSegments.flatMap(_.log.iterator.toList) + val seg = log.logSegments(0, recoveryPoint).last + val index = seg.index + val messages = seg.log + val filePosition = messages.searchFor(recoveryPoint, 0).position + val indexPosition = index.lookup(recoveryPoint).position log.close() - // corrupt index and log by appending random bytes - TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1) - TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) + // corrupt file + TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition) + TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition) // attempt recovery log = new Log(logDir, config, recoveryPoint, time.scheduler, time) - assertEquals(numMessages, log.logEndOffset) - assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) - Utils.rm(logDir) + assertEquals(recoveryPoint, log.logEndOffset) } } diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 2cd3a3f..7026432 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -31,7 +31,6 @@ class IsrExpirationTest extends JUnit3Suite { var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 100L - override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L }) val topic = "foo" diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 17a99f1..34e39e7 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -32,7 +32,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 5000L override val replicaLagMaxMessages = 10L - override val replicaFetchWaitMaxMs = 1000 override val replicaFetchMinBytes = 20 }) val topic = "new-topic" diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 03e6266..bab436d 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -34,7 +34,6 @@ class SimpleFetchTest extends JUnit3Suite { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 100L - override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L }) val topic = "foo" diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index d88b6c3..777b315 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -518,15 +518,9 @@ object TestUtils extends Logging { def writeNonsenseToFile(fileName: File, position: Long, size: Int) { val file = new RandomAccessFile(fileName, "rw") file.seek(position) + val rand = new Random for(i <- 0 until size) - file.writeByte(random.nextInt(255)) - file.close() - } - - def appendNonsenseToFile(fileName: File, size: Int) { - val file = new FileOutputStream(fileName, true) - for(i <- 0 until size) - file.write(random.nextInt(255)) + file.writeByte(rand.nextInt(255)) file.close() } diff --git a/kafka-patch-review.py b/kafka-patch-review.py index dc6664d..7fa6cb5 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -37,21 +37,6 @@ def main(): st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d_%H:%M:%S') patch_file=tempfile.gettempdir() + "/" + opt.jira + '_' + st + '.patch' - # first check if rebase is needed - git_branch_hash="git rev-parse " + opt.branch - p_now=os.popen(git_branch_hash) - branch_now=p_now.read() - p_now.close() - - git_common_ancestor="git merge-base " + opt.branch + " HEAD" - p_then=os.popen(git_common_ancestor) - branch_then=p_then.read() - p_then.close() - - if branch_now != branch_then: - print 'ERROR: Your current working branch is from an older version of ' + opt.branch + '. Please rebase first by using git pull --rebase' - sys.exit(1) - git_configure_reviewboard="git config reviewboard.url https://reviews.apache.org" print "Configuring reviewboard url to https://reviews.apache.org" p=os.popen(git_configure_reviewboard) diff --git a/project/Build.scala b/project/Build.scala index 098e874..500040b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -71,9 +71,7 @@ object KafkaBuild extends Build { - , - mappings in packageBin in Compile += file("LICENSE") -> "LICENSE", - mappings in packageBin in Compile += file("NOTICE") -> "NOTICE" + ) val hadoopSettings = Seq( diff --git a/system_test/migration_tool_testsuite/config/server.properties b/system_test/migration_tool_testsuite/config/server.properties index 54144a2..6ecbb71 100644 --- a/system_test/migration_tool_testsuite/config/server.properties +++ b/system_test/migration_tool_testsuite/config/server.properties @@ -51,9 +51,8 @@ socket.request.max.bytes=104857600 # The directory under which to store log files log.dir=/tmp/kafka_server_logs -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. num.partitions=5 # Overrides for for the default given by num.partitions on a per-topic basis diff --git a/system_test/mirror_maker_testsuite/config/server.properties b/system_test/mirror_maker_testsuite/config/server.properties index c628412..36dd68d 100644 --- a/system_test/mirror_maker_testsuite/config/server.properties +++ b/system_test/mirror_maker_testsuite/config/server.properties @@ -51,9 +51,8 @@ socket.request.max.bytes=104857600 # The directory under which to store log files log.dir=/tmp/kafka_server_logs -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. num.partitions=5 # Overrides for for the default given by num.partitions on a per-topic basis diff --git a/system_test/replication_testsuite/config/server.properties b/system_test/replication_testsuite/config/server.properties index c628412..36dd68d 100644 --- a/system_test/replication_testsuite/config/server.properties +++ b/system_test/replication_testsuite/config/server.properties @@ -51,9 +51,8 @@ socket.request.max.bytes=104857600 # The directory under which to store log files log.dir=/tmp/kafka_server_logs -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. num.partitions=5 # Overrides for for the default given by num.partitions on a per-topic basis