diff --git a/bin/kafka-check-reassignment-status.sh b/bin/kafka-check-reassignment-status.sh new file mode 100755 index 0000000..1f21858 --- /dev/null +++ b/bin/kafka-check-reassignment-status.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +$(dirname $0)/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@ diff --git a/config/test-log4j.properties b/config/test-log4j.properties deleted file mode 100644 index a3ae33f..0000000 --- a/config/test-log4j.properties +++ /dev/null @@ -1,68 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -log4j.rootLogger=INFO, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.kafkaAppender.File=logs/server.log -log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.stateChangeAppender.File=logs/state-change.log -log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.requestAppender.File=logs/kafka-request.log -log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender -log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.controllerAppender.File=logs/controller.log -log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n - -# Turn on all our debugging info -#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender -#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender -log4j.logger.kafka.perf=DEBUG, kafkaAppender -log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender -#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG -log4j.logger.kafka=INFO, kafkaAppender - -log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender -log4j.additivity.kafka.network.RequestChannel$=false - -#log4j.logger.kafka.network.Processor=TRACE, requestAppender -#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender -#log4j.additivity.kafka.server.KafkaApis=false -log4j.logger.kafka.request.logger=TRACE, requestAppender -log4j.additivity.kafka.request.logger=false - -log4j.logger.kafka.controller=TRACE, controllerAppender -log4j.additivity.kafka.controller=false - -log4j.logger.state.change.logger=TRACE, stateChangeAppender -log4j.additivity.state.change.logger=false - - diff --git a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala new file mode 100644 index 0000000..7e85f87 --- /dev/null +++ b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import joptsimple.OptionParser +import org.I0Itec.zkclient.ZkClient +import kafka.utils._ +import scala.collection.Map +import kafka.common.TopicAndPartition + +object CheckReassignmentStatus extends Logging { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " + + "new replicas they should be reassigned to") + .withRequiredArg + .describedAs("partition reassignment json file path") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + + "form host:port. Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + + val options = parser.parse(args : _*) + + for(arg <- List(jsonFileOpt, zkConnectOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val jsonFile = options.valueOf(jsonFileOpt) + val zkConnect = options.valueOf(zkConnectOpt) + val jsonString = Utils.readFileAsString(jsonFile) + val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + + try { + // read the json file into a string + val partitionsToBeReassigned = Json.parseFull(jsonString) match { + case Some(reassignedPartitions) => + val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] + partitions.map { m => + val topic = m.asInstanceOf[Map[String, String]].get("topic").get + val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt + val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get + val newReplicas = replicasList.split(",").map(_.toInt) + (TopicAndPartition(topic, partition), newReplicas.toSeq) + }.toMap + case None => Map.empty[TopicAndPartition, Seq[Int]] + } + + val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) + reassignedPartitionsStatus.foreach { partition => + partition._2 match { + case ReassignmentCompleted => + println("Partition %s reassignment completed successfully".format(partition._1)) + case ReassignmentFailed => + println("Partition %s reassignment failed".format(partition._1)) + case ReassignmentInProgress => + println("Partition %s reassignment in progress".format(partition._1)) + } + } + } + } + + def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) + :Map[TopicAndPartition, ReassignmentStatus] = { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) + // for all partitions whose replica reassignment is complete, check the status + partitionsToBeReassigned.map { topicAndPartition => + (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, + topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) + } + } + + def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, + reassignedReplicas: Seq[Int], + partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], + partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { + val newReplicas = partitionsToBeReassigned(topicAndPartition) + partitionsBeingReassigned.get(topicAndPartition) match { + case Some(partition) => ReassignmentInProgress + case None => + // check if AR == RAR + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) + if(assignedReplicas == newReplicas) + ReassignmentCompleted + else + ReassignmentFailed + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index c6fc4ab..f333d29 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -58,12 +58,6 @@ object ReassignPartitionsCommand extends Logging { .describedAs("execute") .ofType(classOf[String]) - val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The JSON file with the list of partitions and the " + - "new replicas they should be reassigned to, which can be obtained from the output of a dry run.") - .withRequiredArg - .describedAs("partition reassignment json file path") - .ofType(classOf[String]) - val options = parser.parse(args : _*) for(arg <- List(zkConnectOpt)) { @@ -86,24 +80,7 @@ object ReassignPartitionsCommand extends Logging { var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() - if(options.has(statusCheckJsonFileOpt)) { - val jsonFile = options.valueOf(statusCheckJsonFileOpt) - val jsonString = Utils.readFileAsString(jsonFile) - val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) - - println("Status of partition reassignment:") - val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) - reassignedPartitionsStatus.foreach { partition => - partition._2 match { - case ReassignmentCompleted => - println("Reassignment of partition %s completed successfully".format(partition._1)) - case ReassignmentFailed => - println("Reassignment of partition %s failed".format(partition._1)) - case ReassignmentInProgress => - println("Reassignment of partition %s is still in progress".format(partition._1)) - } - } - } else if(options.has(topicsToMoveJsonFileOpt)) { + if(options.has(topicsToMoveJsonFileOpt)) { val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) val brokerList = options.valueOf(brokerListOpt) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) @@ -130,19 +107,16 @@ object ReassignPartitionsCommand extends Logging { System.exit(1) } - if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt)) { - if (options.has(executeOpt)) { - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) - - if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) - else - println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) - } else { - System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + - "The following is the replica assignment. Save it for the status check option.\n" + - ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)) - } + if (options.has(executeOpt)) { + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + + if(reassignPartitionsCommand.reassignPartitions()) + println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + else + println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + } else { + System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + + "The replica assignment is \n" + partitionsToBeReassigned.toString()) } } catch { case e: Throwable => @@ -153,32 +127,6 @@ object ReassignPartitionsCommand extends Logging { zkClient.close() } } - - private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) - :Map[TopicAndPartition, ReassignmentStatus] = { - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) - partitionsToBeReassigned.map { topicAndPartition => - (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1, - topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) - } - } - - private def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition, - reassignedReplicas: Seq[Int], - partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], - partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = { - val newReplicas = partitionsToBeReassigned(topicAndPartition) - partitionsBeingReassigned.get(topicAndPartition) match { - case Some(partition) => ReassignmentInProgress - case None => - // check if the current replica assignment matches the expected one after reassignment - val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) - if(assignedReplicas == newReplicas) - ReassignmentCompleted - else - ReassignmentFailed - } - } } class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]]) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 08b4b72..72e5dd5 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -25,7 +25,7 @@ import kafka.cluster._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException import java.net.InetAddress -import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID import kafka.serializer._ @@ -90,6 +90,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val messageStreamCreated = new AtomicBoolean(false) private var sessionExpirationListener: ZKSessionExpireListener = null + private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null private var loadBalancerListener: ZKRebalancerListener = null private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null @@ -268,8 +269,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - - class ZKSessionExpireListener(val dirs: ZKGroupDirs, val consumerIdString: String, val topicCount: TopicCount, @@ -306,6 +305,29 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } + class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener) + extends IZkDataListener { + + def handleDataChange(dataPath : String, data: Object) { + try { + info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") + // explicitly trigger load balancing for this consumer + + // There is no need to resubscribe to data changes. + // The data change watchers will be set inside rebalance when we read the path to get the partition info. + loadBalancerListener.syncedRebalance() + } catch { + case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e ) + } + } + + @throws(classOf[Exception]) + def handleDataDeleted(dataPath : String) { + // TODO: This need to be implemented when we support delete topic + warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time") + } + } + class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { @@ -626,11 +648,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) } - // register listener for session expired event + // create listener for session expired event if not exist yet if (sessionExpirationListener == null) sessionExpirationListener = new ZKSessionExpireListener( dirs, consumerIdString, topicCount, loadBalancerListener) + // create listener for topic partition change event if not exist yet + if (topicPartitionChangeListenner == null) + topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener) + val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams // map of {topic -> Set(thread-1, thread-2, ...)} @@ -686,8 +712,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes - val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1 - zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) + val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 + zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner) } // explicitly trigger load balancing for this consumer diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 1b473eb..35fc383 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -1010,7 +1010,6 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, - "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, kafkaRunClassBin + " kafka.perf.ProducerPerformance", "--broker-list " + brokerListStr, "--initial-message-id " + str(initMsgId),