diff --git bin/kafka-check-reassignment-status.sh bin/kafka-check-reassignment-status.sh new file mode 100644 index 0000000..d337c9e --- /dev/null +++ bin/kafka-check-reassignment-status.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) +export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" +$base_dir/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@ diff --git bin/kafka-reassign-partitions.sh bin/kafka-reassign-partitions.sh new file mode 100644 index 0000000..8d006ac --- /dev/null +++ bin/kafka-reassign-partitions.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) +export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" +$base_dir/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand $@ diff --git core/src/main/scala/kafka/admin/AdminUtils.scala core/src/main/scala/kafka/admin/AdminUtils.scala index 9e9a428..af4c382 100644 --- core/src/main/scala/kafka/admin/AdminUtils.scala +++ core/src/main/scala/kafka/admin/AdminUtils.scala @@ -50,7 +50,7 @@ object AdminUtils extends Logging { */ def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1) // for testing only - : Map[Int, List[String]] = { + : Map[Int, Seq[String]] = { if (nPartitions <= 0) throw new AdministrationException("number of partitions must be larger than 0") if (replicationFactor <= 0) @@ -74,7 +74,7 @@ object AdminUtils extends Logging { ret.toMap } - def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, List[String]], zkClient: ZkClient) { + def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[String]], zkClient: ZkClient) { try { val zkPath = ZkUtils.getTopicPath(topic) val jsonPartitionMap = Utils.mapToJson(replicaAssignment.map(e => (e._1.toString -> e._2))) diff --git core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala new file mode 100644 index 0000000..981d510 --- /dev/null +++ core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala @@ -0,0 +1,97 @@ + +package kafka.admin + +import joptsimple.OptionParser +import org.I0Itec.zkclient.ZkClient +import kafka.utils._ +import scala.collection.Map + +object CheckReassignmentStatus extends Logging { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val jsonFileOpt = parser.accepts("path to json file", "REQUIRED: The JSON file with the list of partitions and the " + + "new replicas they should be reassigned to") + .withRequiredArg + .describedAs("partition reassignment json file path") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + + "form host:port. Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + + val options = parser.parse(args : _*) + + for(arg <- List(jsonFileOpt, zkConnectOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val jsonFile = options.valueOf(jsonFileOpt) + val zkConnect = options.valueOf(zkConnectOpt) + val jsonString = Utils.readFileIntoString(jsonFile) + val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + + try { + // read the json file into a string + val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match { + case Some(reassignedPartitions) => + val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] + partitions.map { m => + val topic = m.asInstanceOf[Map[String, String]].get("topic").get + val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt + val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get + val newReplicas = replicasList.split(",").map(_.toInt) + ((topic, partition), newReplicas.toSeq) + }.toMap + case None => Map.empty[(String, Int), Seq[Int]] + } + + val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned) + reassignedPartitionsStatus.foreach { partition => + partition._2 match { + case ReassignmentCompleted => + println("Partition [%s,%d] reassignment to %s completed successfully".format(partition._1, partition._2, + partitionsToBeReassigned((partition._1._1, partition._1._2)))) + case ReassignmentFailed => + println("Partition [%s,%d] reassignment to %s failed".format(partition._1, partition._2, + partitionsToBeReassigned((partition._1._1, partition._1._2)))) + case ReassignmentInProgress => + println("Partition [%s,%d] reassignment to %s in progress".format(partition._1, partition._2, + partitionsToBeReassigned((partition._1._1, partition._1._2)))) + } + } + } + } + + def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[(String, Int), Seq[Int]]) + :Map[(String, Int), ReassignmentStatus] = { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas) + // for all partitions whose replica reassignment is complete, check the status + partitionsToBeReassigned.map { topicAndPartition => + (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1._1, topicAndPartition._1._2, + topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned)) + } + } + + def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topic: String, partition: Int, + reassignedReplicas: Seq[Int], + partitionsToBeReassigned: Map[(String, Int), Seq[Int]], + partitionsBeingReassigned: Map[(String, Int), Seq[Int]]): ReassignmentStatus = { + val newReplicas = partitionsToBeReassigned((topic, partition)) + partitionsBeingReassigned.get((topic, partition)) match { + case Some(partition) => ReassignmentInProgress + case None => + // check if AR == RAR + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition) + if(assignedReplicas == newReplicas) + ReassignmentCompleted + else + ReassignmentFailed + } + } +} \ No newline at end of file diff --git core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala new file mode 100644 index 0000000..aa74251 --- /dev/null +++ core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import joptsimple.OptionParser +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import kafka.common.AdminCommandFailedException + +object ReassignPartitionsCommand extends Logging { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val jsonFileOpt = parser.accepts("path to json file", "REQUIRED: The JSON file with the list of partitions and the " + + "new replicas they should be reassigned to in the following format - \n" + + "[{\"topic\": \"foo\", \"partition\": \"1\", \"replicas\": \"1,2,3\" }]") + .withRequiredArg + .describedAs("partition reassignment json file path") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + + "form host:port. Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + + val options = parser.parse(args : _*) + + for(arg <- List(jsonFileOpt, zkConnectOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val jsonFile = options.valueOf(jsonFileOpt) + val zkConnect = options.valueOf(zkConnectOpt) + val jsonString = Utils.readFileIntoString(jsonFile) + var zkClient: ZkClient = null + + try { + // read the json file into a string + val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match { + case Some(reassignedPartitions) => + val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] + partitions.map { m => + val topic = m.asInstanceOf[Map[String, String]].get("topic").get + val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt + val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get + val newReplicas = replicasList.split(",").map(_.toInt) + ((topic, partition), newReplicas.toSeq) + }.toMap + case None => throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile)) + } + + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + + // attach shutdown handler to catch control-c + Runtime.getRuntime().addShutdownHook(new Thread() { + override def run() = { + // delete the admin path so it can be retried + ZkUtils.deletePathRecursive(zkClient, ZkUtils.ReassignPartitionsPath) + } + }) + + if(reassignPartitionsCommand.reassignPartitions()) + println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + else + println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + } catch { + case e => + println("Partitions reassignment failed due to " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + if (zkClient != null) + zkClient.close() + } + } +} + +class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immutable.Map[(String, Int), Seq[Int]]) + extends Logging { + def reassignPartitions(): Boolean = { + try { + // create the /admin/reassign_replicas path, if one doesn't exist + if(ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)) { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) + throw new AdminCommandFailedException("Partition reassignment currently in progress for " + + "%s. Aborting operation".format(partitionsBeingReassigned)) + false + }else { + val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1._1, p._1._2)) + ZkUtils.updatePartitionReassignmentData(zkClient, validPartitions) + true + } + }catch { + case e => error("Admin command failed", e); false + } + } + + def validatePartition(zkClient: ZkClient, topic: String, partition: Int): Boolean = { + // check if partition exists + val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic) + partitionsOpt match { + case Some(partitions) => + if(partitions.contains(partition)) { + true + }else{ + error("Skipping reassignment of partition [%s,%d] ".format(topic, partition) + + "since it doesn't exist") + false + } + case None => error("Skipping reassignment of partition " + + "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic)) + false + } + } +} + +sealed trait ReassignmentStatus { def status: Int } +case object ReassignmentCompleted extends ReassignmentStatus { val status = 1 } +case object ReassignmentInProgress extends ReassignmentStatus { val status = 0 } +case object ReassignmentFailed extends ReassignmentStatus { val status = -1 } diff --git core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 26f2bd8..b507851 100644 --- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -23,7 +23,6 @@ import kafka.utils._ import collection.mutable.Map import collection.mutable.HashMap - object LeaderAndIsr { val initialLeaderEpoch: Int = 0 val initialZKVersion: Int = 0 diff --git core/src/main/scala/kafka/api/StopReplicaRequest.scala core/src/main/scala/kafka/api/StopReplicaRequest.scala index 2f2ba44..99a5f95 100644 --- core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -20,8 +20,6 @@ package kafka.api import java.nio._ import kafka.utils._ -import collection.mutable.HashSet -import collection.mutable.Set object StopReplicaRequest { val CurrentVersion = 1.shortValue() @@ -33,29 +31,30 @@ object StopReplicaRequest { val clientId = Utils.readShortString(buffer) val ackTimeoutMs = buffer.getInt val topicPartitionPairCount = buffer.getInt - val topicPartitionPairSet = new HashSet[(String, Int)]() - for (i <- 0 until topicPartitionPairCount){ - topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt)) + val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]() + for (i <- 0 until topicPartitionPairCount) { + topicPartitionPairSet.add(Utils.readShortString(buffer, "UTF-8"), buffer.getInt) } - new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet) + new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet) } } case class StopReplicaRequest(versionId: Short, clientId: String, ackTimeoutMs: Int, - stopReplicaSet: Set[(String, Int)]) + partitions: Set[(String, Int)]) extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { - def this(stopReplicaSet: Set[(String, Int)]) = { - this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet) + def this(partitions: Set[(String, Int)]) = { + this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, + partitions) } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) Utils.writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) - buffer.putInt(stopReplicaSet.size) - for ((topic, partitionId) <- stopReplicaSet){ + buffer.putInt(partitions.size) + for ((topic, partitionId) <- partitions){ Utils.writeShortString(buffer, topic, "UTF-8") buffer.putInt(partitionId) } @@ -63,7 +62,7 @@ case class StopReplicaRequest(versionId: Short, def sizeInBytes(): Int = { var size = 2 + (2 + clientId.length()) + 4 + 4 - for ((topic, partitionId) <- stopReplicaSet){ + for ((topic, partitionId) <- partitions){ size += (2 + topic.length()) + 4 } size diff --git core/src/main/scala/kafka/common/AdminCommandFailedException.scala core/src/main/scala/kafka/common/AdminCommandFailedException.scala new file mode 100644 index 0000000..94e2864 --- /dev/null +++ core/src/main/scala/kafka/common/AdminCommandFailedException.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +class AdminCommandFailedException(message: String, cause: Throwable) extends RuntimeException(message, cause) { + def this(message: String) = this(message, null) + def this() = this(null, null) +} diff --git core/src/main/scala/kafka/common/PartitionOfflineException.scala core/src/main/scala/kafka/common/PartitionOfflineException.scala index ab7a095..3367708 100644 --- core/src/main/scala/kafka/common/PartitionOfflineException.scala +++ core/src/main/scala/kafka/common/PartitionOfflineException.scala @@ -17,10 +17,12 @@ package kafka.common + /** * This exception is thrown by the leader elector in the controller when leader election fails for a partition since * all the replicas for a partition are offline */ -class PartitionOfflineException(message: String) extends RuntimeException(message) { - def this() = this(null) +class PartitionOfflineException(message: String, cause: Throwable) extends RuntimeException(message, cause) { + def this(message: String) = this(message, null) + def this() = this(null, null) } \ No newline at end of file diff --git core/src/main/scala/kafka/common/StateChangeFailedException.scala core/src/main/scala/kafka/common/StateChangeFailedException.scala index a78ca6b..fd56796 100644 --- core/src/main/scala/kafka/common/StateChangeFailedException.scala +++ core/src/main/scala/kafka/common/StateChangeFailedException.scala @@ -17,7 +17,7 @@ package kafka.common -class StateChangeFailedException(message: String) extends RuntimeException(message) { - def this(message: String, cause: Throwable) = this(message + " Root cause -> " + cause.toString) - def this() = this(null) +class StateChangeFailedException(message: String, cause: Throwable) extends RuntimeException(message, cause) { + def this(message: String) = this(message, null) + def this() = this(null, null) } \ No newline at end of file diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 90cf187..fb6ea76 100644 --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -137,36 +137,49 @@ class RequestSendThread(val controllerId: Int, } } -// TODO: When we add more types of requests, we can generalize this class a bit. Right now, it just handles LeaderAndIsr -// request class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit) extends Logging { - val brokerRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]] + val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]] + val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]] def newBatch() { // raise error if the previous batch is not empty - if(brokerRequestMap.size > 0) + if(leaderAndIsrRequestMap.size > 0 || stopReplicaRequestMap.size > 0) throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " + - "a new one. Some state changes %s might be lost ".format(brokerRequestMap.toString())) - brokerRequestMap.clear() + "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString())) + leaderAndIsrRequestMap.clear() + stopReplicaRequestMap.clear() } - def addRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) { + def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr) { brokerIds.foreach { brokerId => - brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr]) - brokerRequestMap(brokerId).put((topic, partition), leaderAndIsr) + leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr]) + leaderAndIsrRequestMap(brokerId).put((topic, partition), leaderAndIsr) + } + } + + def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int) { + brokerIds.foreach { brokerId => + stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)]) + stopReplicaRequestMap(brokerId) :+ (topic, partition) } } def sendRequestsToBrokers() { - brokerRequestMap.foreach { m => + leaderAndIsrRequestMap.foreach { m => val broker = m._1 val leaderAndIsr = m._2 val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr) - info("Sending to broker %d leaderAndIsr request of %s".format(broker, leaderAndIsrRequest)) + debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest)) sendRequest(broker, leaderAndIsrRequest, null) } - brokerRequestMap.clear() + leaderAndIsrRequestMap.clear() + stopReplicaRequestMap.foreach { r => + val broker = r._1 + debug("The stop replica request sent to broker %d is %s".format(broker, r._2.mkString(","))) + sendRequest(broker, new StopReplicaRequest(Set.empty[(String, Int)] ++ r._2), null) + } + stopReplicaRequestMap.clear() } } diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala index d43af7f..768856c 100644 --- core/src/main/scala/kafka/controller/KafkaController.scala +++ core/src/main/scala/kafka/controller/KafkaController.scala @@ -20,14 +20,17 @@ import collection._ import collection.immutable.Set import kafka.cluster.Broker import kafka.api._ -import org.I0Itec.zkclient.{IZkStateListener, ZkClient} +import kafka.utils.ZkUtils._ import org.apache.zookeeper.Watcher.Event.KeeperState -import kafka.utils.{ZkUtils, Logging} -import java.lang.Object import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import java.util.concurrent.TimeUnit import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import com.yammer.metrics.core.Gauge +import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} +import kafka.utils.{Utils, ZkUtils, Logging} +import org.I0Itec.zkclient.exception.ZkNoNodeException +import java.lang.{IllegalStateException, Object} +import kafka.common.KafkaException class ControllerContext(val zkClient: ZkClient, var controllerChannelManager: ControllerChannelManager = null, @@ -36,16 +39,19 @@ class ControllerContext(val zkClient: ZkClient, var liveBrokerIds: Set[Int] = null, var allTopics: Set[String] = null, var partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null, - var allLeaders: mutable.Map[(String, Int), Int] = null) + var allLeaders: mutable.Map[(String, Int), Int] = null, + var partitionsBeingReassigned: mutable.Map[(String, Int), ReassignedPartitionsContext] = + new mutable.HashMap) class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup { - this.logIdent = "[Controller " + config.brokerId + "], " + this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true val controllerContext = new ControllerContext(zkClient) private val partitionStateMachine = new PartitionStateMachine(this) private val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, config.brokerId) + private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) newGauge( "ActiveControllerCount", @@ -67,6 +73,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks + registerReassignedPartitionsListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() @@ -98,7 +105,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg updateLeaderAndIsrCache() // update partition state machine partitionStateMachine.triggerOnlinePartitionStateChange() - replicaStateMachine.handleStateChanges(newBrokers, OnlineReplica) + replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), + OnlineReplica) + // check if reassignment of some partitions need to be restarted + val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter(p => + p._2.newReplicas.foldLeft(false)((a, replica) => newBrokers.contains(replica) || a)) + partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1._1, p._1._2, p._2)) } /** @@ -121,7 +133,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() // handle dead replicas - replicaStateMachine.handleStateChanges(deadBrokers, OfflineReplica) + replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), + OfflineReplica) } /** @@ -146,29 +159,74 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def onNewPartitionCreation(newPartitions: Seq[(String, Int)]) { info("New partition creation callback for %s".format(newPartitions.mkString(","))) partitionStateMachine.handleStateChanges(newPartitions, NewPartition) + replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica) partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition) + replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), OnlineReplica) } - /* TODO: kafka-330 This API is unused until we introduce the delete topic functionality. - remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/ - def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) { - val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]] - for((topicPartition, brokers) <- replicaAssignment){ - for (broker <- brokers){ - if (!brokerToPartitionToStopReplicaMap.contains(broker)) - brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)]) - brokerToPartitionToStopReplicaMap(broker).add(topicPartition) - } - controllerContext.allLeaders.remove(topicPartition) - ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2)) - } - for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){ - val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica) - info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest)) - sendRequest(broker, stopReplicaRequest) + /** + * This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition + * reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener. + * Reassigning replicas for a partition goes through a few stages - + * RAR = Reassigned replicas + * AR = Original list of replicas for partition + * 1. Register listener for ISR changes to detect when the RAR is a subset of the ISR + * 2. Start new replicas RAR - AR. + * 3. Wait until new replicas are in sync with the leader + * 4. If the leader is not in RAR, elect a new leader from RAR + * 5. Stop old replicas AR - RAR + * 6. Write new AR + * 7. Remove partition from the /admin/reassign_partitions path + */ + def onPartitionReassignment(topic: String, partition: Int, reassignedPartitionContext: ReassignedPartitionsContext) { + val reassignedReplicas = reassignedPartitionContext.newReplicas + areReplicasInIsr(topic, partition, reassignedReplicas) match { + case true => + // mark the new replicas as online + reassignedReplicas.foreach { replica => + replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), + OnlineReplica) + } + // check if current leader is in the new replicas list. If not, controller needs to trigger leader election + moveReassignedPartitionLeaderIfRequired(topic, partition, reassignedPartitionContext) + // stop older replicas + stopOldReplicasOfReassignedPartition(topic, partition, reassignedPartitionContext) + // write the new list of replicas for this partition in zookeeper + updateAssignedReplicasForPartition(topic, partition, reassignedPartitionContext) + // update the /admin/reassign_partitions path to remove this partition + removePartitionFromReassignedPartitions(topic, partition) + info("Removed partition [%s, %d] from the list of reassigned partitions in zookeeper".format(topic, partition)) + controllerContext.partitionsBeingReassigned.remove((topic, partition)) + case false => + info("New replicas %s for partition [%s, %d] being ".format(reassignedReplicas.mkString(","), topic, partition) + + "reassigned not yet caught up with the leader") + // start new replicas + startNewReplicasForReassignedPartition(topic, partition, reassignedPartitionContext) + info("Waiting for new replicas %s for partition [%s, %d] being ".format(reassignedReplicas.mkString(","), topic, partition) + + "reassigned to catch up with the leader") } } + /* TODO: kafka-330 This API is unused until we introduce the delete topic functionality. + remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/ + // def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) { + // val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]] + // for((topicPartition, brokers) <- replicaAssignment){ + // for (broker <- brokers){ + // if (!brokerToPartitionToStopReplicaMap.contains(broker)) + // brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)]) + // brokerToPartitionToStopReplicaMap(broker).add(topicPartition) + // } + // controllerContext.allLeaders.remove(topicPartition) + // ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2)) + // } + // for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){ + // val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica) + // info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest)) + // sendRequest(broker, stopReplicaRequest) + // } + // } + /** * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker * is the controller. It merely registers the session expiration listener and starts the controller leader @@ -207,7 +265,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def registerSessionExpirationListener() = { - zkClient.subscribeStateChanges(new SessionExpireListener()) + zkClient.subscribeStateChanges(new SessionExpirationListener()) } private def initializeControllerContext() { @@ -223,6 +281,23 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg startChannelManager() info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds)) info("Current list of topics in the cluster: %s".format(controllerContext.allTopics)) + initializeReassignedPartitionsContext() + } + + private def initializeReassignedPartitionsContext() { + // read the partitions being reassigned from zookeeper path /admin/reassign_partitions + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) + // check if they are already completed + val reassignedPartitions = partitionsBeingReassigned.filter(partition => + controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1) + reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p._1, p._2)) + controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned + controllerContext.partitionsBeingReassigned --= reassignedPartitions + info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString())) + info("Partitions already reassigned: %s".format(reassignedPartitions.toString())) + info("Resuming reassignment of partitions: %s".format(controllerContext.partitionsBeingReassigned.toString())) + controllerContext.partitionsBeingReassigned.foreach(partition => + onPartitionReassignment(partition._1._1, partition._1._2, partition._2)) } private def startChannelManager() { @@ -244,8 +319,122 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - class SessionExpireListener() extends IZkStateListener with Logging { - this.logIdent = "[Controller " + config.brokerId + "], " + private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = { + getLeaderAndIsrForPartition(zkClient, topic, partition) match { + case Some(leaderAndIsr) => + val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r)) + replicasNotInIsr.isEmpty + case None => false + } + } + + private def moveReassignedPartitionLeaderIfRequired(topic: String, partition: Int, + reassignedPartitionContext: ReassignedPartitionsContext) { + val reassignedReplicas = reassignedPartitionContext.newReplicas + val currentLeader = controllerContext.allLeaders((topic, partition)) + if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) { + info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) + + "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(","))) + // move the leader to one of the alive and caught up new replicas + partitionStateMachine.handleStateChanges(List((topic, partition)), OnlinePartition, + reassignedPartitionLeaderSelector) + }else { + // check if the leader is alive or not + controllerContext.liveBrokerIds.contains(currentLeader) match { + case true => + info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) + + "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(","))) + case false => + info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) + + "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(","))) + partitionStateMachine.handleStateChanges(List((topic, partition)), OnlinePartition, + reassignedPartitionLeaderSelector) + } + } + } + + private def stopOldReplicasOfReassignedPartition(topic: String, partition: Int, + reassignedPartitionContext: ReassignedPartitionsContext) { + val reassignedReplicas = reassignedPartitionContext.newReplicas + // send stop replica state change request to the old replicas + val oldReplicas = controllerContext.partitionReplicaAssignment((topic, partition)).toSet -- reassignedReplicas.toSet + // first move the replica to offline state (the controller removes it from the ISR) + oldReplicas.foreach { replica => + replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), OfflineReplica) + } + // send stop replica command to the old replicas + oldReplicas.foreach { replica => + replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), NonExistentReplica) + } + } + + private def updateAssignedReplicasForPartition(topic: String, partition: Int, + reassignedPartitionContext: ReassignedPartitionsContext) { + val reassignedReplicas = reassignedPartitionContext.newReplicas + val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1._1.equals(topic)) + partitionsAndReplicasForThisTopic.put((topic, partition), reassignedReplicas) + updateAssignedReplicasForPartition(topic, partition, partitionsAndReplicasForThisTopic) + info("Updated assigned replicas for partition [%s, %d] being reassigned ".format(topic, partition) + + "to %s".format(reassignedReplicas.mkString(","))) + // update the assigned replica list after a successful zookeeper write + controllerContext.partitionReplicaAssignment.put((topic, partition), reassignedReplicas) + // stop watching the ISR changes for this partition + zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), + controllerContext.partitionsBeingReassigned((topic, partition)).isrChangeListener) + // update the assigned replica list + controllerContext.partitionReplicaAssignment.put((topic, partition), reassignedReplicas) + } + + private def startNewReplicasForReassignedPartition(topic: String, partition: Int, + reassignedPartitionContext: ReassignedPartitionsContext) { + // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned + // replicas list + val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment((topic, partition)) + val reassignedReplicaSet = Set.empty[Int] ++ reassignedPartitionContext.newReplicas + val newReplicas: Seq[Int] = (reassignedReplicaSet -- assignedReplicaSet).toSeq + newReplicas.foreach { replica => + replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), NewReplica) + } + } + + private def registerReassignedPartitionsListener() = { + zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this)) + } + + def removePartitionFromReassignedPartitions(topic: String, partition: Int) { + // read the current list of reassigned partitions from zookeeper + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) + // remove this partition from that list + val updatedPartitionsBeingReassigned = partitionsBeingReassigned - ((topic, partition)) + // write the new list to zookeeper + ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) + // update the cache + controllerContext.partitionsBeingReassigned.remove((topic, partition)) + } + + def updateAssignedReplicasForPartition(topic: String, partition: Int, + newReplicaAssignmentForTopic: Map[(String, Int), Seq[Int]]) { + try { + val zkPath = ZkUtils.getTopicPath(topic) + val jsonPartitionMap = Utils.mapToJson(newReplicaAssignmentForTopic.map(e => + (e._1._2.toString -> e._2.map(_.toString)))) + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap) + debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) + } catch { + case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topic)) + case e2 => throw new KafkaException(e2.toString) + } + } + + private def getAllReplicasForPartition(partitions: Seq[(String, Int)]): Seq[PartitionAndReplica] = { + partitions.map { p => + val replicas = controllerContext.partitionReplicaAssignment(p) + replicas.map(r => new PartitionAndReplica(p._1, p._2, r)) + }.flatten + } + + class SessionExpirationListener() extends IZkStateListener with Logging { + this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " @throws(classOf[Exception]) def handleStateChanged(state: KeeperState) { // do nothing, since zkclient will do reconnect for us. @@ -274,6 +463,159 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } +/** + * Starts the partition reassignment process unless - + * 1. Partition previously existed + * 2. New replicas are the same as existing replicas + * 3. Any replica in the new set of replicas are dead + * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned + * partitions. + */ +class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging { + this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: " + val zkClient = controller.controllerContext.zkClient + val controllerContext = controller.controllerContext + + /** + * Invoked when some partitions are reassigned by the admin command + * @throws Exception On any error. + */ + @throws(classOf[Exception]) + def handleDataChange(dataPath: String, data: Object) { + debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" + .format(dataPath, data)) + val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString) + val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) + newPartitions.foreach { partitionToBeReassigned => + controllerContext.controllerLock synchronized { + val topic = partitionToBeReassigned._1._1 + val partition = partitionToBeReassigned._1._2 + val newReplicas = partitionToBeReassigned._2 + val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) + try { + val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get((topic, partition)) + assignedReplicasOpt match { + case Some(assignedReplicas) => + if(assignedReplicas == newReplicas) { + throw new KafkaException("Partition [%s, %d] to be reassigned is already assigned to replicas" + .format(topic, partition) + + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) + }else { + if(aliveNewReplicas == newReplicas) { + info("Handling reassignment of partition [%s, %d] to new replicas %s".format(topic, partition, + newReplicas.mkString(","))) + val context = createReassignmentContextForPartition(topic, partition, newReplicas) + controllerContext.partitionsBeingReassigned.put((topic, partition), context) + controller.onPartitionReassignment(topic, partition, context) + }else { + // some replica in RAR is not alive. Fail partition reassignment + throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + + " %s for partition [%s, %d] to be reassigned are alive. ".format(newReplicas.mkString(","), topic, partition) + + "Failing partition reassignment") + } + } + case None => throw new KafkaException("Attempt to reassign partition [%s, %d] that doesn't exist" + .format(topic, partition)) + } + }catch { + case e => error("Error completing reassignment of partition [%s, %d]".format(topic, partition), e) + // remove the partition from the admin path to unblock the admin client + controller.removePartitionFromReassignedPartitions(topic, partition) + } + } + } + } + + /** + * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleDataDeleted(dataPath: String) { + } + + private def createReassignmentContextForPartition(topic: String, + partition: Int, + newReplicas: Seq[Int]): ReassignedPartitionsContext = { + val context = new ReassignedPartitionsContext(newReplicas) + // first register ISR change listener + watchIsrChangesForReassignedPartition(topic, partition, context) + context + } + + private def watchIsrChangesForReassignedPartition(topic: String, partition: Int, + reassignedPartitionContext: ReassignedPartitionsContext) { + val reassignedReplicas = reassignedPartitionContext.newReplicas + val isrChangeListener = new ReassignedPartitionsIsrChangeListener(controller, topic, partition, + reassignedReplicas.toSet) + reassignedPartitionContext.isrChangeListener = isrChangeListener + // register listener on the leader and isr path to wait until they catch up with the current leader + zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) + } +} + +class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int, + reassignedReplicas: Set[Int]) + extends IZkDataListener with Logging { + this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: " + val zkClient = controller.controllerContext.zkClient + val controllerContext = controller.controllerContext + + /** + * Invoked when some partitions are reassigned by the admin command + * @throws Exception On any error. + */ + @throws(classOf[Exception]) + def handleDataChange(dataPath: String, data: Object) { + try { + controllerContext.controllerLock synchronized { + debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data)) + // check if this partition is still being reassigned or not + controllerContext.partitionsBeingReassigned.get((topic, partition)) match { + case Some(reassignedPartitionContext) => + // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object + val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) + newLeaderAndIsrOpt match { + case Some(leaderAndIsr) => // check if new replicas have joined ISR + val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet + if(caughtUpReplicas == reassignedReplicas) { + // resume the partition reassignment process + info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned." + .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) + + "Resuming partition reassignment") + controller.onPartitionReassignment(topic, partition, reassignedPartitionContext) + }else { + info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned." + .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) + + "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(","))) + } + case None => error("Error handling reassignment of partition [%s, %d] to replicas %s as it was never created" + .format(topic, partition, reassignedReplicas.mkString(","))) + } + case None => + } + } + }catch { + case e => error("Error while handling partition reassignment", e) + } + } + + /** + * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleDataDeleted(dataPath: String) { + } +} + +case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty, + var isrChangeListener: ReassignedPartitionsIsrChangeListener = null) + +case class PartitionAndReplica(topic: String, partition: Int, replica: Int) + object ControllerStat extends KafkaMetricsGroup { val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS) val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) diff --git core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala new file mode 100644 index 0000000..2d77d99 --- /dev/null +++ core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.controller + +import kafka.api.LeaderAndIsr +import kafka.utils.Logging +import kafka.common.{StateChangeFailedException, PartitionOfflineException} + +trait PartitionLeaderSelector { + + /** + * @param topic The topic of the partition whose leader needs to be elected + * @param partition The partition whose leader needs to be elected + * @param assignedReplicas The list of replicas assigned to the input partition + * @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper + * @throws PartitionOfflineException If no replica in the assigned replicas list is alive + * @returns The leader and isr request, with the newly selected leader info, to send to the brokers + * Also, returns the list of replicas the returned leader and isr request should be sent to + * This API selects a new leader for the input partition + */ + def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) + +} + +/** + * This API selects a new leader for the input partition - + * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader + * 2. Else, it picks some alive broker from the assigned replica list as the new leader + * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException + * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache + */ +class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { + + def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + controllerContext.partitionReplicaAssignment.get((topic, partition)) match { + case Some(assignedReplicas) => + val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) + val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) + val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch + val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion + debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]" + .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr, + currentLeaderIsrZkPathVersion)) + val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { + case true => + debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s" + .format(liveAssignedReplicasToThisPartition.mkString(","))) + liveAssignedReplicasToThisPartition.isEmpty match { + case true => + ControllerStat.offlinePartitionRate.mark() + throw new PartitionOfflineException(("No replica for partition " + + "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) + + " Assigned replicas are: [%s]".format(assignedReplicas)) + case false => + ControllerStat.uncleanLeaderElectionRate.mark() + val newLeader = liveAssignedReplicasToThisPartition.head + warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) + + "There's potential data loss") + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) + } + case false => + val newLeader = liveBrokersInIsr.head + debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader) + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) + } + info("Selected new leader and ISR %s for offline partition [%s, %d]".format(newLeaderAndIsr.toString(), topic, + partition)) + (newLeaderAndIsr, liveAssignedReplicasToThisPartition) + case None => + ControllerStat.offlinePartitionRate.mark() + throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) + + "replicas assigned to it") + } + } +} + +/** + * Picks one of the alive in-sync reassigned replicas as the new leader + */ +class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { + + def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + val reassignedReplicas = controllerContext.partitionsBeingReassigned((topic, partition)).newReplicas + val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch + val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion + debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]" + .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr, + currentLeaderIsrZkPathVersion)) + // pick any replica from the newly assigned replicas list that is in the ISR + val aliveReassignedReplicas = reassignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) + val newLeaderOpt = aliveReassignedReplicas.headOption + newLeaderOpt match { + case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, + currentLeaderIsrZkPathVersion + 1), reassignedReplicas) + case None => + reassignedReplicas.size match { + case 0 => + throw new StateChangeFailedException("List of reassigned replicas for partition " + + "([%s, %d]) is empty. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr)) + case _ => + throw new StateChangeFailedException("None of the reassigned replicas for partition " + + "([%s, %d]) are alive. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr)) + } + } + } +} \ No newline at end of file diff --git core/src/main/scala/kafka/controller/PartitionStateMachine.scala core/src/main/scala/kafka/controller/PartitionStateMachine.scala index a9c094c..59fbae3 100644 --- core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -21,7 +21,7 @@ import kafka.api.LeaderAndIsr import kafka.utils.{Logging, ZkUtils} import org.I0Itec.zkclient.IZkChildListener import collection.JavaConversions._ -import kafka.common.{StateChangeFailedException, PartitionOfflineException, KafkaException} +import kafka.common.{StateChangeFailedException, PartitionOfflineException} import java.util.concurrent.atomic.AtomicBoolean import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -43,6 +43,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val zkClient = controllerContext.zkClient var partitionState: mutable.Map[(String, Int), PartitionState] = mutable.Map.empty val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest) + val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private var isShuttingDown = new AtomicBoolean(false) /** @@ -82,7 +83,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state partitionState.filter(partitionAndState => partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach { - partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition) + partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition, + offlinePartitionSelector) } brokerRequestBatch.sendRequestsToBrokers() }catch { @@ -95,12 +97,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * @param partitions The list of partitions that need to be transitioned to the target state * @param targetState The state that the partitions should be moved to */ - def handleStateChanges(partitions: Seq[(String, Int)], targetState: PartitionState) { + def handleStateChanges(partitions: Seq[(String, Int)], targetState: PartitionState, + leaderSelector: PartitionLeaderSelector = offlinePartitionSelector) { info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(","))) try { brokerRequestBatch.newBatch() partitions.foreach { topicAndPartition => - handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState) + handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState, leaderSelector) } brokerRequestBatch.sendRequestsToBrokers() }catch { @@ -115,7 +118,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * @param partition The partition for which the state transition is invoked * @param targetState The end state that the partition should be moved to */ - private def handleStateChange(topic: String, partition: Int, targetState: PartitionState) { + private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, + leaderSelector: PartitionLeaderSelector) { try { partitionState.getOrElseUpdate((topic, partition), NonExistentPartition) targetState match { @@ -128,17 +132,18 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) + "%s".format(controllerContext.partitionReplicaAssignment(topic, partition).mkString(","))) case OnlinePartition => - // pre: partition should be in New state - assertValidPreviousStates(topic, partition, List(NewPartition, OfflinePartition), OnlinePartition) + assertValidPreviousStates(topic, partition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) partitionState(topic, partition) match { case NewPartition => // initialize leader and isr path for new partition - initializeLeaderAndIsrForPartition(topic, partition, brokerRequestBatch) + initializeLeaderAndIsrForPartition(topic, partition) case OfflinePartition => - electLeaderForOfflinePartition(topic, partition, brokerRequestBatch) + electLeaderForPartition(topic, partition, leaderSelector) + case OnlinePartition => // invoked when the leader needs to be re-elected + electLeaderForPartition(topic, partition, leaderSelector) case _ => // should never come here since illegal previous states are checked above } - info("Partition [%s, %d] state changed from %s to Online with leader %d".format(topic, partition, + info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition, partitionState(topic, partition), controllerContext.allLeaders(topic, partition))) partitionState.put((topic, partition), OnlinePartition) // post: partition has a leader @@ -215,8 +220,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * @brokerRequestBatch The object that holds the leader and isr requests to be sent to each broker as a result of * this state change */ - private def initializeLeaderAndIsrForPartition(topic: String, partition: Int, - brokerRequestBatch: ControllerBrokerRequestBatch) { + private def initializeLeaderAndIsrForPartition(topic: String, partition: Int) { debug("Initializing leader and isr for partition [%s, %d]".format(topic, partition)) val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition)) val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r)) @@ -234,10 +238,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { try { ZkUtils.createPersistentPath(controllerContext.zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString) - // TODO: the above write can fail only if the current controller lost its zk session and the new controller + // NOTE: the above write can fail only if the current controller lost its zk session and the new controller // took over and initialized this partition. This can happen if the current controller went into a long // GC pause - brokerRequestBatch.addRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr) + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr) controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader) partitionState.put((topic, partition), OnlinePartition) }catch { @@ -257,101 +261,38 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * @brokerRequestBatch The object that holds the leader and isr requests to be sent to each broker as a result of * this state change */ - private def electLeaderForOfflinePartition(topic: String, partition: Int, - brokerRequestBatch: ControllerBrokerRequestBatch) { + def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) { /** handle leader election for the partitions whose leader is no longer alive **/ - info("Electing leader for Offline partition [%s, %d]".format(topic, partition)) + info("Electing leader for partition [%s, %d]".format(topic, partition)) try { - controllerContext.partitionReplicaAssignment.get((topic, partition)) match { - case Some(assignedReplicas) => - val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) - try { - // elect new leader or throw exception - val newLeaderAndIsr = electLeaderForPartition(topic, partition, assignedReplicas) - info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition)) - // store new leader and isr info in cache - brokerRequestBatch.addRequestForBrokers(liveAssignedReplicasToThisPartition, topic, partition, - newLeaderAndIsr) - }catch { - case e => throw new StateChangeFailedException(("Error while electing leader for partition" + - " [%s, %d]").format(topic, partition), e) - } - case None => throw new KafkaException(("While handling broker changes, the " + - "partition [%s, %d] doesn't have assigned replicas. The replica assignment for other partitions is %s") - .format(topic, partition, controllerContext.partitionReplicaAssignment)) + var zookeeperPathUpdateSucceeded: Boolean = false + var newLeaderAndIsr: LeaderAndIsr = null + var replicasForThisPartition: Seq[Int] = Seq.empty[Int] + while(!zookeeperPathUpdateSucceeded) { + val currentLeaderAndIsr = getLeaderAndIsrOrThrowException(topic, partition) + // elect new leader or throw exception + val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr) + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString, currentLeaderAndIsr.zkVersion) + newLeaderAndIsr = leaderAndIsr + newLeaderAndIsr.zkVersion = newVersion + zookeeperPathUpdateSucceeded = updateSucceeded + replicasForThisPartition = replicas } + // update the leader cache + controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader) + info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition)) + // store new leader and isr info in cache + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr) }catch { - case e => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead." - .format(topic, partition) + " Marking this partition offline") + case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead." + .format(topic, partition) + " Marking this partition offline", poe) + case sce => throw new StateChangeFailedException(("Error while electing leader for partition" + + " [%s, %d]").format(topic, partition), sce) } debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2)))) } - /** - * @param topic The topic of the partition whose leader needs to be elected - * @param partition The partition whose leader needs to be elected - * @param assignedReplicas The list of replicas assigned to the input partition - * @throws PartitionOfflineException If no replica in the assigned replicas list is alive - * This API selects a new leader for the input partition - - * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader - * 2. Else, it picks some alive broker from the assigned replica list as the new leader - * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException - * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache - */ - private def electLeaderForPartition(topic: String, partition: Int, assignedReplicas: Seq[Int]):LeaderAndIsr = { - var zookeeperPathUpdateSucceeded: Boolean = false - var newLeaderAndIsr: LeaderAndIsr = null - while(!zookeeperPathUpdateSucceeded) { - newLeaderAndIsr = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match { - case Some(currentLeaderAndIsr) => - var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr - val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) - val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) - val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch - val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion - debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]" - .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr, - currentLeaderIsrZkPathVersion)) - newLeaderAndIsr = liveBrokersInIsr.isEmpty match { - case true => - debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s" - .format(liveAssignedReplicasToThisPartition.mkString(","))) - liveAssignedReplicasToThisPartition.isEmpty match { - case true => - ControllerStat.offlinePartitionRate.mark() - throw new PartitionOfflineException(("No replica for partition " + - "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) + - " Assigned replicas are: [%s]".format(assignedReplicas)) - case false => - ControllerStat.uncleanLeaderElectionRate.mark() - val newLeader = liveAssignedReplicasToThisPartition.head - warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) + - "There's potential data loss") - new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) - } - case false => - val newLeader = liveBrokersInIsr.head - debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader) - new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) - } - info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString())) - // update the new leadership decision in zookeeper or retry - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion) - newLeaderAndIsr.zkVersion = newVersion - zookeeperPathUpdateSucceeded = updateSucceeded - newLeaderAndIsr - case None => - throw new StateChangeFailedException("On broker changes, " + - "there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topic, partition)) - } - } - // update the leader cache - controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader) - newLeaderAndIsr - } - private def registerTopicChangeListener() = { zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener()) } @@ -360,6 +301,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic)) } + private def getLeaderAndIsrOrThrowException(topic: String, partition: Int): LeaderAndIsr = { + ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match { + case Some(currentLeaderAndIsr) => currentLeaderAndIsr + case None => + throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " + + "[%s, %d] in %s state".format(topic, partition, partitionState((topic, partition)))) + } + } + /** * This is the zookeeper listener that triggers all the state transitions for a partition */ diff --git core/src/main/scala/kafka/controller/ReplicaStateMachine.scala core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 3574d3a..fbef90d 100644 --- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -27,9 +27,15 @@ import org.I0Itec.zkclient.IZkChildListener /** * This class represents the state machine for replicas. It defines the states that a replica can be in, and * transitions to move the replica to another legal state. The different states that a replica can be in are - - * 1. OnlineReplica : Once a replica is started, it is in this state. Valid previous state are OnlineReplica or - * OfflineReplica - * 2. OfflineReplica : If a replica dies, it moves to this state. Valid previous state is OnlineReplica + * 1. NewReplica : The controller can create new replicas during partition reassignment. In this state, a + * replica can only get become follower state change request. Valid previous + * state is NonExistentReplica + * 2. OnlineReplica : Once a replica is started and part of the assigned replicas for its partition, it is in this + * state. In this state, it can get either become leader or become follower state change requests. + * Valid previous state are NewReplica, OnlineReplica or OfflineReplica + * 3. OfflineReplica : If a replica dies, it moves to this state. This happens when the broker hosting the replica + * is down. Valid previous state are NewReplica, OnlineReplica + * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica */ class ReplicaStateMachine(controller: KafkaController) extends Logging { this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: " @@ -49,7 +55,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { // initialize replica state initializeReplicaState() // move all Online replicas to Online - handleStateChanges(controllerContext.liveBrokerIds.toSeq, OnlineReplica) + handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, + controllerContext.liveBrokerIds.toSeq), OnlineReplica) info("Started replica state machine with initial state -> " + replicaState.toString()) } @@ -72,20 +79,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { * @param targetState The state that the replicas should be moved to * The controller's allLeaders cache should have been updated before this */ - def handleStateChanges(brokerIds: Seq[Int], targetState: ReplicaState) { - info("Invoking state change to %s for brokers %s".format(targetState, brokerIds.mkString(","))) + def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState) { + info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(","))) try { brokerRequestBatch.newBatch() - brokerIds.foreach { brokerId => - // read all the partitions and their assigned replicas into a map organized by - // { replica id -> partition 1, partition 2... - val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(controllerContext.allTopics.toSeq, brokerId) - partitionsAssignedToThisBroker.foreach { topicAndPartition => - handleStateChange(topicAndPartition._1, topicAndPartition._2, brokerId, targetState) - } - if(partitionsAssignedToThisBroker.size == 0) - info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(","))) - } + replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState)) brokerRequestBatch.sendRequestsToBrokers() }catch { case e => error("Error while moving some replicas to %s state".format(targetState), e) @@ -100,28 +98,62 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { * @param replicaId The replica for which the state transition is invoked * @param targetState The end state that the replica should be moved to */ - private def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) { + def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) { try { + replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica) targetState match { - case OnlineReplica => - assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica, OfflineReplica), targetState) - // check if the leader for this partition is alive or even exists - // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper - // for the ISR anyways + case NewReplica => + assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState) + // start replica as a follower to the current leader for its partition val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) leaderAndIsrOpt match { case Some(leaderAndIsr) => - controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { - case true => // leader is alive - brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr) - replicaState.put((topic, partition, replicaId), OnlineReplica) - info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) - case false => // ignore partitions whose leader is not alive + if(leaderAndIsr.leader == replicaId) + throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica" + .format(replicaId, topic, partition) + "state as it is being requested to become leader") + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr) + case None => // new leader request will be sent to this replica when one gets elected + } + replicaState.put((topic, partition, replicaId), NewReplica) + info("Replica %d for partition [%s, %d] state changed to NewReplica".format(replicaId, topic, partition)) + case NonExistentReplica => + assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState) + // send stop replica command + brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition) + // remove this replica from the assigned replicas list for its partition + val currentAssignedReplicas = controllerContext.partitionReplicaAssignment((topic, partition)) + controllerContext.partitionReplicaAssignment.put((topic, partition), + currentAssignedReplicas.filterNot(_ == replicaId)) + info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition)) + replicaState.remove((topic, partition, replicaId)) + case OnlineReplica => + assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState) + replicaState((topic, partition, replicaId)) match { + case NewReplica => + // add this replica to the assigned replicas list for its partition + val currentAssignedReplicas = controllerContext.partitionReplicaAssignment((topic, partition)) + controllerContext.partitionReplicaAssignment.put((topic, partition), currentAssignedReplicas :+ replicaId) + info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) + case _ => + // check if the leader for this partition is alive or even exists + // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper + // for the ISR anyways + val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) + leaderAndIsrOpt match { + case Some(leaderAndIsr) => + controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { + case true => // leader is alive + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderAndIsr) + replicaState.put((topic, partition, replicaId), OnlineReplica) + info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) + case false => // ignore partitions whose leader is not alive + } + case None => // ignore partitions who don't have a leader yet } - case None => // ignore partitions who don't have a leader yet } + replicaState.put((topic, partition, replicaId), OnlineReplica) case OfflineReplica => - assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica), targetState) + assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) // As an optimization, the controller removes dead replicas from the ISR var zookeeperPathUpdateSucceeded: Boolean = false var newLeaderAndIsr: LeaderAndIsr = null @@ -144,7 +176,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } } // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr) + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition, newLeaderAndIsr) // update the local leader and isr cache controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader) replicaState.put((topic, partition, replicaId), OfflineReplica) @@ -226,7 +258,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } sealed trait ReplicaState { def state: Byte } -case object OnlineReplica extends ReplicaState { val state: Byte = 1 } -case object OfflineReplica extends ReplicaState { val state: Byte = 2 } +case object NewReplica extends ReplicaState { val state: Byte = 1 } +case object OnlineReplica extends ReplicaState { val state: Byte = 2 } +case object OfflineReplica extends ReplicaState { val state: Byte = 3 } +case object NonExistentReplica extends ReplicaState { val state: Byte = 4 } diff --git core/src/main/scala/kafka/producer/ProducerPool.scala core/src/main/scala/kafka/producer/ProducerPool.scala index cf3e229..eb8ead3 100644 --- core/src/main/scala/kafka/producer/ProducerPool.scala +++ core/src/main/scala/kafka/producer/ProducerPool.scala @@ -21,7 +21,7 @@ import kafka.cluster.Broker import java.util.Properties import collection.mutable.HashMap import java.lang.Object -import kafka.utils.{Utils, Logging} +import kafka.utils.Logging import kafka.api.TopicMetadata import kafka.common.UnavailableProducerException diff --git core/src/main/scala/kafka/server/AbstractFetcherThread.scala core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 819baec..5bcb3c4 100644 --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -112,7 +112,7 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket warn("current offset %d for topic %s partition %d out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) case _ => - error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), + error("error for %s %d to broker %s ".format(topic, partitionId, sourceBroker.host), ErrorMapping.exceptionFor(partitionData.error)) partitionsWithError += topicAndPartition fetchMap.remove(topicAndPartition) diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index e6de51f..8a691a8 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,7 +17,6 @@ package kafka.server -import java.io.IOException import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.api._ import kafka.message._ @@ -75,7 +74,6 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse))) } - def handleStopReplicaRequest(request: RequestChannel.Request){ val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] if(requestLogger.isTraceEnabled) @@ -84,7 +82,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseMap = new HashMap[(String, Int), Short] - for((topic, partitionId) <- stopReplicaRequest.stopReplicaSet){ + for((topic, partitionId) <- stopReplicaRequest.partitions){ val errorCode = replicaManager.stopReplica(topic, partitionId) responseMap.put((topic, partitionId), errorCode) } diff --git core/src/main/scala/kafka/server/KafkaConfig.scala core/src/main/scala/kafka/server/KafkaConfig.scala index b6bab26..b43fa44 100644 --- core/src/main/scala/kafka/server/KafkaConfig.scala +++ core/src/main/scala/kafka/server/KafkaConfig.scala @@ -21,7 +21,7 @@ import java.util.Properties import kafka.message.Message import kafka.consumer.ConsumerConfig import java.net.InetAddress -import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig} +import kafka.utils.{Utils, VerifiableProperties, ZKConfig} /** * Configuration settings for the kafka server diff --git core/src/main/scala/kafka/server/ReplicaFetcherManager.scala core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 69db208..05f5233 100644 --- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -20,7 +20,8 @@ package kafka.server import kafka.cluster.Broker class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) - extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) { + extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", + brokerConfig.numReplicaFetchers) { override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr) diff --git core/src/main/scala/kafka/server/ReplicaManager.scala core/src/main/scala/kafka/server/ReplicaManager.scala index f078b99..e061924 100644 --- core/src/main/scala/kafka/server/ReplicaManager.scala +++ core/src/main/scala/kafka/server/ReplicaManager.scala @@ -159,19 +159,6 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient } responseMap.put(partitionInfo, errorCode) } - - /** - * If IsInit flag is on, this means that the controller wants to treat topics not in the request - * as deleted. - * TODO: Handle this properly as part of KAFKA-330 - */ -// if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){ -// startHighWaterMarksCheckPointThread -// val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1) -// info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove)) -// partitionsToRemove.foreach(p => stopReplica(p._1, p._2)) -// } - responseMap } diff --git core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 76ecebe..fdcc690 100644 --- core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -22,7 +22,6 @@ import joptsimple._ import org.I0Itec.zkclient.ZkClient import kafka.utils.{ZkUtils, ZKStringSerializer, Logging} import kafka.consumer.SimpleConsumer -import collection.mutable.Map import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} import kafka.common.TopicAndPartition import scala.collection._ diff --git core/src/main/scala/kafka/utils/Utils.scala core/src/main/scala/kafka/utils/Utils.scala index 70d3b02..f030d30 100644 --- core/src/main/scala/kafka/utils/Utils.scala +++ core/src/main/scala/kafka/utils/Utils.scala @@ -19,6 +19,7 @@ package kafka.utils import java.io._ import java.nio._ +import charset.Charset import java.nio.channels._ import java.lang.management._ import java.util.zip.CRC32 @@ -638,7 +639,7 @@ object Utils extends Logging { builder.toString } - def mapToJson[T <: Any](map: Map[String, List[String]]): String = { + def mapToJson[T <: Any](map: Map[String, Seq[String]]): String = { val builder = new StringBuilder builder.append("{ ") var numElements = 0 @@ -715,6 +716,18 @@ object Utils extends Logging { for (forever <- Stream.continually(1); t <- coll) yield t stream.iterator } + + def readFileIntoString(path: String): String = { + val stream = new FileInputStream(new File(path)) + try { + val fc = stream.getChannel() + val bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()) + Charset.defaultCharset().decode(bb).toString() + } + finally { + stream.close() + } + } } /** diff --git core/src/main/scala/kafka/utils/ZkUtils.scala core/src/main/scala/kafka/utils/ZkUtils.scala index 66332a4..dcc0cda 100644 --- core/src/main/scala/kafka/utils/ZkUtils.scala +++ core/src/main/scala/kafka/utils/ZkUtils.scala @@ -27,12 +27,15 @@ import kafka.api.LeaderAndIsr import org.apache.zookeeper.data.Stat import java.util.concurrent.locks.{ReentrantLock, Condition} import kafka.common.{KafkaException, NoEpochForPartitionException} +import kafka.controller.{PartitionAndReplica, ReassignedPartitionsContext} +import kafka.admin._ object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" val ControllerPath = "/controller" + val ReassignPartitionsPath = "/admin/reassign_partitions" def getTopicPath(topic: String): String ={ BrokerTopicsPath + "/" + topic @@ -76,20 +79,23 @@ object ZkUtils extends Logging { val leaderAndIsrOpt = leaderAndIsrInfo._1 val stat = leaderAndIsrInfo._2 leaderAndIsrOpt match { - case Some(leaderAndIsrStr) => - SyncJSON.parseFull(leaderAndIsrStr) match { - case Some(m) => - val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt - val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt - val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get - val isr = Utils.getCSVList(isrString).map(r => r.toInt) - val zkPathVersion = stat.getVersion - debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, - isr.toString(), zkPathVersion, topic, partition)) - Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion)) - case None => None - } - case None => None // TODO: Handle if leader and isr info is not available in zookeeper + case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, topic, partition, stat) + case None => None + } + } + + def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = { + SyncJSON.parseFull(leaderAndIsrStr) match { + case Some(m) => + val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt + val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt + val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get + val isr = Utils.getCSVList(isrString).map(r => r.toInt) + val zkPathVersion = stat.getVersion + debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, + isr.toString(), zkPathVersion, topic, partition)) + Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion)) + case None => None } } @@ -295,11 +301,13 @@ object ZkUtils extends Logging { def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { val stat = client.writeData(path, data, expectVersion) - info("Conditional update the zkPath %s with expected version %d succeed and return the new version: %d".format(path, expectVersion, stat.getVersion)) + info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d" + .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch { case e: Exception => - info("Conditional update the zkPath %s with expected version %d failed".format(path, expectVersion)) + error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path, data, + expectVersion), e) (false, -1) } } @@ -511,6 +519,66 @@ object ZkUtils extends Logging { }.flatten[(String, Int)].toSeq } + def getPartitionsBeingReassigned(zkClient: ZkClient): Map[(String, Int), ReassignedPartitionsContext] = { + // read the partitions and their new replica list + val jsonPartitionMapOpt = readDataMaybeNull(zkClient, ReassignPartitionsPath)._1 + jsonPartitionMapOpt match { + case Some(jsonPartitionMap) => + val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap) + reassignedPartitions.map { p => + val newReplicas = p._2 + (p._1 -> new ReassignedPartitionsContext(newReplicas)) + } + case None => Map.empty[(String, Int), ReassignedPartitionsContext] + } + } + + def parsePartitionReassignmentData(jsonData: String):Map[(String, Int), Seq[Int]] = { + SyncJSON.parseFull(jsonData) match { + case Some(m) => + val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] + replicaMap.map { reassignedPartitions => + val topic = reassignedPartitions._1.split(",").head + val partition = reassignedPartitions._1.split(",").last.toInt + val newReplicas = reassignedPartitions._2.map(_.toInt) + (topic, partition) -> newReplicas + } + case None => Map.empty[(String, Int), Seq[Int]] + } + } + + def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[(String, Int), Seq[Int]]) { + val zkPath = ZkUtils.ReassignPartitionsPath + partitionsToBeReassigned.size match { + case 0 => // need to delete the /admin/reassign_partitions path + deletePath(zkClient, zkPath) + info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath)) + case _ => + val jsonData = Utils.mapToJson(partitionsToBeReassigned.map(p => ("%s,%s".format(p._1._1, p._1._2)) -> p._2.map(_.toString))) + try { + updatePersistentPath(zkClient, zkPath, jsonData) + info("Updated partition reassignment path with %s".format(jsonData)) + }catch { + case nne: ZkNoNodeException => + ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) + debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData)) + case e2 => throw new AdministrationException(e2.toString) + } + } + } + + def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): Seq[PartitionAndReplica] = { + brokerIds.map { brokerId => + // read all the partitions and their assigned replicas into a map organized by + // { replica id -> partition 1, partition 2... + val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(zkClient, topics, brokerId) + if(partitionsAssignedToThisBroker.size == 0) + info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(","))) + partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId)) + }.flatten + } + + def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) { val brokerIdPath = BrokerIdsPath + "/" + brokerId zkClient.delete(brokerIdPath) diff --git core/src/test/scala/unit/kafka/admin/AdminTest.scala core/src/test/scala/unit/kafka/admin/AdminTest.scala index c9e2229..bd3a6d9 100644 --- core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -20,8 +20,10 @@ import junit.framework.Assert._ import org.junit.Test import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness +import kafka.server.{KafkaServer, KafkaConfig} +import collection.mutable.ListBuffer import kafka.common.ErrorMapping -import kafka.utils.TestUtils +import kafka.utils.{Utils, ZkUtils, TestUtils} class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -207,4 +209,182 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { } } } + + @Test + def testPartitionReassignmentWithLeaderInNewReplicas() { + val expectedReplicaAssignment = Map(0 -> List("0", "1", "2")) + val topic = "test" + // create brokers + val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + // create the topic + AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + // reassign partition 0 + val newReplicas = Seq(0, 2, 3) + val partitionToBeReassigned = 0 + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas)) + assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions()) + // wait until reassignment is completed + TestUtils.waitUntilTrue(() => { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned, newReplicas, + Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; + }, 1000) + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) + servers.foreach(_.shutdown()) + } + + @Test + def testPartitionReassignmentWithLeaderNotInNewReplicas() { + val expectedReplicaAssignment = Map(0 -> List("0", "1", "2")) + val topic = "test" + // create brokers + val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + // create the topic + AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + // reassign partition 0 + val newReplicas = Seq(1, 2, 3) + val partitionToBeReassigned = 0 + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas)) + assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) + // wait until reassignment is completed + TestUtils.waitUntilTrue(() => { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned, newReplicas, + Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; + }, 1000) + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) + // leader should be 2 + servers.foreach(_.shutdown()) + } + + @Test + def testPartitionReassignmentNonOverlappingReplicas() { + val expectedReplicaAssignment = Map(0 -> List("0", "1")) + val topic = "test" + // create brokers + val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + // create the topic + AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + // reassign partition 0 + val newReplicas = Seq(2, 3) + val partitionToBeReassigned = 0 + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas)) + assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) + // wait until reassignment is completed + TestUtils.waitUntilTrue(() => { + val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas); + CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned, newReplicas, + Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; + }, 1000) + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) + // leader should be 2 + servers.foreach(_.shutdown()) + } + + @Test + def testReassigningNonExistingPartition() { + val topic = "test" + // create brokers + val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) + // reassign partition 0 + val newReplicas = Seq(2, 3) + val partitionToBeReassigned = 0 + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas)) + assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) + val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient) + assertFalse("Partition should not be reassigned", reassignedPartitions.contains((topic, partitionToBeReassigned))) + // leader should be 2 + servers.foreach(_.shutdown()) + } + + @Test + def testResumePartitionReassignmentThatWasCompleted() { + val expectedReplicaAssignment = Map(0 -> List("0", "1")) + val topic = "test" + // create the topic + AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + // put the partition in the reassigned path as well + // reassign partition 0 + val newReplicas = Seq(0, 1) + val partitionToBeReassigned = 0 + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas)) + reassignPartitionsCommand.reassignPartitions + // create brokers + val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new KafkaConfig(b))) + TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000) + val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) + servers.foreach(_.shutdown()) + } + + @Test + def testResumePartitionReassignmentAfterLeaderWasMoved() { + var expectedReplicaAssignment = Map(0 -> List(1, 0, 2, 3)) + val leaderForPartitionMap = Map(0 -> 2) + val topic = "test" + val serverConfigs = TestUtils.createBrokerConfigs(4).map(b => new KafkaConfig(b)) + val servers = new ListBuffer[KafkaServer] + // create the topic + AdminUtils.createTopicPartitionAssignmentPathInZK(topic, + expectedReplicaAssignment.map(r => (r._1) -> r._2.map(_.toString)), zkClient) + // bring up just brokers 0 and 1 + servers.append(TestUtils.createServer(serverConfigs(0))) + servers.append(TestUtils.createServer(serverConfigs(1))) + val newReplicas = Seq(2, 3) + val partitionToBeReassigned = 0 + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas)) + reassignPartitionsCommand.reassignPartitions + // this partition reassignment request should be ignored since replicas 2 and 3 are not online + // and the admin path should be deleted as well + TestUtils.waitUntilTrue(checkIfReassignmentPathIsDeleted, 1000) + var assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + assertEquals("Partition should not be reassigned to 2, 3 yet", expectedReplicaAssignment(0), assignedReplicas) + // create brokers + servers.append(TestUtils.createServer(serverConfigs(2))) + servers.append(TestUtils.createServer(serverConfigs(3))) + // wait until new replicas catch up with leader + TestUtils.waitUntilTrue(checkIfNewReplicasInIsr, 2000) + // change the assigned replicas to 0 and 1 + updateAssignedReplicasForPartition("test", 0, List(0, 1)) + // reissue the partition reassignment + reassignPartitionsCommand.reassignPartitions + // create leaders for the partition to be reassigned + TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap) + // bounce controller + servers.head.shutdown() + servers.head.startup() + TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000) + assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) + servers.foreach(_.shutdown()) + } + + private def checkIfReassignPartitionPathExists(): Boolean = { + ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) + } + + private def checkIfReassignmentPathIsDeleted(): Boolean = { + !ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) + } + + private def checkIfNewReplicasInIsr(): Boolean = { + val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, "test", 0) + leaderAndIsrOpt match { + case Some(leaderAndIsr) => + if(leaderAndIsr.isr.contains(2) && leaderAndIsr.isr.contains(3)) + true + else + false + case None => false + } + } + + private def updateAssignedReplicasForPartition(topic: String, partition: Int, newAssignedReplicas: Seq[Int]) { + val zkPath = ZkUtils.getTopicPath(topic) + val jsonPartitionMap = Utils.mapToJson(Map(partition.toString -> newAssignedReplicas.map(_.toString))) + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap) + } } diff --git core/src/test/scala/unit/kafka/log/LogOffsetTest.scala core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index fd248ee..c04ec0d 100644 --- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -30,7 +30,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.admin.CreateTopicCommand import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.utils.TestUtils._ -import kafka.common.{ErrorMapping, TopicAndPartition, UnknownTopicOrPartitionException} +import kafka.common.{ErrorMapping, TopicAndPartition} object LogOffsetTest { val random = new Random() diff --git core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala index 8c48fd4..e36bcca 100644 --- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala +++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala @@ -95,7 +95,7 @@ object RpcDataSerializationTestUtils{ } def createTestStopReplicaRequest() : StopReplicaRequest = { - new StopReplicaRequest(Set((topic1, 0), (topic2, 0))) + new StopReplicaRequest(collection.immutable.Set((topic1, 0), (topic2, 0))) } def createTestStopReplicaResponse() : StopReplicaResponse = { diff --git core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index d926813..8239b64 100644 --- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -70,20 +70,22 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // kill the server hosting the preferred replica servers.last.shutdown() // check if leader moves to the other server - val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader1.get == 0) None else leader1) + val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, + if(leader1.get == 0) None else leader1) val leaderEpoch2 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId) debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) debug("leader Epoc: " + leaderEpoch2) assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1)) if(leader1.get == leader2.get) - assertEquals("Second epoch value should be " + leaderEpoch1, leaderEpoch1, leaderEpoch2) + assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2) else assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2) servers.last.startup() servers.head.shutdown() Thread.sleep(zookeeper.tickTime) - val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader2.get == 1) None else leader2) + val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, + if(leader2.get == 1) None else leader2) val leaderEpoch3 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId) debug("leader Epoc: " + leaderEpoch3) debug("Leader is elected to be: %s".format(leader3.getOrElse(-1))) diff --git core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 4f61f84..f44cc0b 100644 --- core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -13,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ +*/ package kafka.server @@ -69,9 +69,10 @@ class RequestPurgatoryTest extends JUnit3Suite { purgatory.watch(r2) purgatory.awaitExpiration(r1) val elapsed = System.currentTimeMillis - start + println("Start = %d, Elapsed = %d".format(start, elapsed)) assertTrue("r1 expired", purgatory.expired.contains(r1)) assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2)) - assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) + assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L) } class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] { diff --git core/src/test/scala/unit/kafka/utils/TestUtils.scala core/src/test/scala/unit/kafka/utils/TestUtils.scala index 7fee744..581b558 100644 --- core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -389,7 +389,8 @@ object TestUtils extends Logging { newLeaderAndISR.leaderEpoch += 1 newLeaderAndISR.zkVersion += 1 } - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath( topic, partition), newLeaderAndISR.toString) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), + newLeaderAndISR.toString) } catch { case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe) }