diff --git core/src/main/scala/kafka/admin/ShutdownBroker.scala core/src/main/scala/kafka/admin/ShutdownBroker.scala new file mode 100644 index 0000000..9571fd5 --- /dev/null +++ core/src/main/scala/kafka/admin/ShutdownBroker.scala @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + + +import joptsimple.OptionParser +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import javax.management.remote.{JMXServiceURL, JMXConnectorFactory} +import javax.management.ObjectName +import kafka.controller.KafkaController +import scala.Some + + +object ShutdownBroker extends Logging { + + private case class ShutdownParams(zkConnect: String, brokerId: java.lang.Integer, jmxUrl: String) + + private def invokeShutdown(params: ShutdownParams): Boolean = { + var zkClient: ZkClient = null + try { + zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer) + val controllerBrokerId = ZkUtils.getController(zkClient) + val controllerOpt = ZkUtils.getBrokerInfo(zkClient, controllerBrokerId) + controllerOpt match { + case Some(controller) => + val jmxUrl = new JMXServiceURL(params.jmxUrl) + val jmxc = JMXConnectorFactory.connect(jmxUrl, null) + val mbsc = jmxc.getMBeanServerConnection + val leaderPartitionsRemaining = mbsc.invoke(new ObjectName(KafkaController.MBeanName), + "shutdownBroker", + Array(params.brokerId), + Array(classOf[Int].getName)).asInstanceOf[Int] + val shutdownComplete = (leaderPartitionsRemaining == 0) + info("Shutdown status: " + (if (shutdownComplete) + "complete" else + "incomplete (broker still leads %d partitions)".format(leaderPartitionsRemaining))) + shutdownComplete + case None => + error("Operation failed due to controller failure on %d.".format(controllerBrokerId)) + false + } + } + catch { + case t: Throwable => + error("Operation failed due to %s.".format(t.getMessage), t) + false + } + finally { + if (zkClient != null) + zkClient.close() + } + } + + def main(args: Array[String]) { + val parser = new OptionParser + val brokerOpt = parser.accepts("broker", "REQUIRED: The broker to shutdown.") + .withRequiredArg + .describedAs("Broker Id") + .ofType(classOf[java.lang.Integer]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val numRetriesOpt = parser.accepts("num.retries", "Number of attempts to retry if shutdown does not complete.") + .withRequiredArg + .describedAs("number of retries") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val retryIntervalOpt = parser.accepts("retry.interval.ms", "Retry interval if retries requested.") + .withRequiredArg + .describedAs("retry interval in ms (> 1000)") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) + val jmxUrlOpt = parser.accepts("jmx.url", "Controller's JMX URL.") + .withRequiredArg + .describedAs("JMX url.") + .ofType(classOf[String]) + .defaultsTo("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi") + + val options = parser.parse(args : _*) + CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt) + + val retryIntervalMs = options.valueOf(retryIntervalOpt).intValue.max(1000) + val numRetries = options.valueOf(numRetriesOpt).intValue + + val shutdownParams = + ShutdownParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt), options.valueOf(jmxUrlOpt)) + + if (!invokeShutdown(shutdownParams)) { + (1 to numRetries).takeWhile(attempt => { + info("Retry " + attempt) + try { + Thread.sleep(retryIntervalMs) + } + catch { + case ie: InterruptedException => // ignore + } + !invokeShutdown(shutdownParams) + }) + } + } + +} + diff --git core/src/main/scala/kafka/api/StopReplicaRequest.scala core/src/main/scala/kafka/api/StopReplicaRequest.scala index c3db6f9..7993b3e 100644 --- core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -20,8 +20,11 @@ package kafka.api import java.nio._ import kafka.api.ApiUtils._ +import kafka.utils.Logging +import kafka.network.InvalidRequestException -object StopReplicaRequest { + +object StopReplicaRequest extends Logging { val CurrentVersion = 1.shortValue() val DefaultClientId = "" val DefaultAckTimeout = 100 @@ -30,28 +33,38 @@ object StopReplicaRequest { val versionId = buffer.getShort val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt + val deletePartitions = buffer.get match { + case 1 => true + case 0 => false + case x => + throw new InvalidRequestException("Invalid byte %d in delete partitions field. (Assuming false.)".format(x)) + } val topicPartitionPairCount = buffer.getInt val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]() - for (i <- 0 until topicPartitionPairCount) + (1 to topicPartitionPairCount) foreach { _ => topicPartitionPairSet.add(readShortString(buffer), buffer.getInt) - new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet) + } + StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet) } } case class StopReplicaRequest(versionId: Short, clientId: String, ackTimeoutMs: Int, + deletePartitions: Boolean, partitions: Set[(String, Int)]) extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { - def this(partitions: Set[(String, Int)]) = { + + def this(deletePartitions: Boolean, partitions: Set[(String, Int)]) = { this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, - partitions) + deletePartitions, partitions) } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) + buffer.put(if (deletePartitions) 1.toByte else 0.toByte) buffer.putInt(partitions.size) for ((topic, partitionId) <- partitions){ writeShortString(buffer, topic) @@ -60,9 +73,15 @@ case class StopReplicaRequest(versionId: Short, } def sizeInBytes(): Int = { - var size = 2 + (2 + clientId.length()) + 4 + 4 + var size = + 2 + /* versionId */ + ApiUtils.shortStringLength(clientId) + + 4 + /* ackTimeoutMs */ + 1 + /* deletePartitions */ + 4 /* partition count */ for ((topic, partitionId) <- partitions){ - size += (2 + topic.length()) + 4 + size += (ApiUtils.shortStringLength(topic)) + + 4 /* partition id */ } size } diff --git core/src/main/scala/kafka/common/BrokerNotExistException.scala core/src/main/scala/kafka/common/BrokerNotExistException.scala deleted file mode 100644 index e69de29..0000000 diff --git core/src/main/scala/kafka/common/TopicAndPartition.scala core/src/main/scala/kafka/common/TopicAndPartition.scala index 4b1f3a3..63596b7 100644 --- core/src/main/scala/kafka/common/TopicAndPartition.scala +++ core/src/main/scala/kafka/common/TopicAndPartition.scala @@ -25,5 +25,7 @@ case class TopicAndPartition(topic: String, partition: Int) { def this(tuple: (String, Int)) = this(tuple._1, tuple._2) def asTuple = (topic, partition) + + override def toString = "[%s,%d]".format(topic, partition) } diff --git core/src/main/scala/kafka/controller/ControllerChannelManager.scala core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 3791a03..922be48 100644 --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -49,7 +49,13 @@ class ControllerChannelManager private (config: KafkaConfig) extends Logging { def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) { brokerLock synchronized { - brokerStateInfo(brokerId).messageQueue.put((request, callback)) + val stateInfoOpt = brokerStateInfo.get(brokerId) + stateInfoOpt match { + case Some(stateInfo) => + stateInfo.messageQueue.put((request, callback)) + case None => + warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId)) + } } } @@ -123,7 +129,7 @@ class RequestSendThread(val controllerId: Int, case RequestKeys.StopReplicaKey => response = StopReplicaResponse.readFrom(receive.buffer) } - trace("got a response %s".format(controllerId, response, toBrokerId)) + trace("Controller %d request to broker %d got a response %s".format(controllerId, toBrokerId, response)) if(callback != null){ callback(response) @@ -141,6 +147,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques extends Logging { val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]] val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]] + val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]] def newBatch() { // raise error if the previous batch is not empty @@ -149,6 +156,7 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString())) leaderAndIsrRequestMap.clear() stopReplicaRequestMap.clear() + stopAndDeleteReplicaRequestMap.clear() } def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr, replicationFactor: Int) { @@ -160,10 +168,18 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques } } - def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int) { + def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean) { brokerIds.foreach { brokerId => stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)]) - stopReplicaRequestMap(brokerId) :+ (topic, partition) + stopAndDeleteReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)]) + if (deletePartition) { + val v = stopAndDeleteReplicaRequestMap(brokerId) + stopAndDeleteReplicaRequestMap(brokerId) = v :+ (topic, partition) + } + else { + val v = stopReplicaRequestMap(brokerId) + stopReplicaRequestMap(brokerId) = v :+ (topic, partition) + } } } @@ -176,12 +192,19 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques sendRequest(broker, leaderAndIsrRequest, null) } leaderAndIsrRequestMap.clear() - stopReplicaRequestMap.foreach { r => - val broker = r._1 - debug("The stop replica request sent to broker %d is %s".format(broker, r._2.mkString(","))) - sendRequest(broker, new StopReplicaRequest(Set.empty[(String, Int)] ++ r._2), null) + Seq((stopReplicaRequestMap, false), (stopAndDeleteReplicaRequestMap, true)) foreach { + case(m, deletePartitions) => { + m.foreach(r => { + val broker = r._1 + if (r._2.size > 0) { + debug("The stop replica request (delete = %s) sent to broker %d is %s" + .format(deletePartitions, broker, r._2.mkString(","))) + sendRequest(broker, new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ r._2), null) + } + }) + m.clear() + } } - stopReplicaRequestMap.clear() } } diff --git core/src/main/scala/kafka/controller/KafkaController.scala core/src/main/scala/kafka/controller/KafkaController.scala index 2022c9f..932fb66 100644 --- core/src/main/scala/kafka/controller/KafkaController.scala +++ core/src/main/scala/kafka/controller/KafkaController.scala @@ -31,21 +31,46 @@ import kafka.utils.{Utils, ZkUtils, Logging} import org.I0Itec.zkclient.exception.ZkNoNodeException import java.lang.{IllegalStateException, Object} import kafka.admin.PreferredReplicaLeaderElectionCommand -import kafka.common.{TopicAndPartition, KafkaException} +import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException} class ControllerContext(val zkClient: ZkClient, var controllerChannelManager: ControllerChannelManager = null, val controllerLock: Object = new Object, - var liveBrokers: Set[Broker] = null, - var liveBrokerIds: Set[Int] = null, - var allTopics: Set[String] = null, - var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = null, - var allLeaders: mutable.Map[TopicAndPartition, Int] = null, + var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty, + val brokerShutdownLock: Object = new Object, + var allTopics: Set[String] = Set.empty, + var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty, + var allLeaders: mutable.Map[TopicAndPartition, Int] = mutable.Map.empty, var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = - new mutable.HashMap, - var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet) + new mutable.HashMap, + var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = + new mutable.HashSet) { -class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup { + private var liveBrokersUnderlying: Set[Broker] = Set.empty + private var liveBrokerIdsUnderlying: Set[Int] = Set.empty + + // setter + def liveBrokers_=(brokers: Set[Broker]) { + liveBrokersUnderlying = brokers + liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id) + } + + // getter + def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id)) + def liveBrokerIds = liveBrokerIdsUnderlying.filter(brokerId => !shuttingDownBrokerIds.contains(brokerId)) + + def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying ++ shuttingDownBrokerIds +} + +trait KafkaControllerMBean { + def shutdownBroker(id: Int): Int +} + +object KafkaController { + val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps" +} + +class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true val controllerContext = new ControllerContext(zkClient) @@ -55,6 +80,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg config.brokerId) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) + private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) + private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest) newGauge( "ActiveControllerCount", @@ -63,6 +90,90 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } ) + def shutdownBroker(id: Int) = { + + controllerContext.brokerShutdownLock synchronized { + info("Shutting down broker " + id) + + controllerContext.controllerLock synchronized { + if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) + throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) + + controllerContext.shuttingDownBrokerIds.add(id) + + debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(",")) + debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(",")) + } + + val allPartitionsAndReplicationFactorOnBroker = controllerContext.controllerLock synchronized { + getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map { + case(topic, partition) => + val topicAndPartition = TopicAndPartition(topic, partition) + (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size) + } + } + + def replicatedPartitionsBrokerLeads = controllerContext.controllerLock.synchronized { + trace("All leaders = " + controllerContext.allLeaders.mkString(",")) + controllerContext.allLeaders.filter { + case (topicAndPartition, leader) => + leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 + }.map(_._1) + } + debug("Partitions to move leadership from broker %d: %s".format(id, replicatedPartitionsBrokerLeads.mkString(","))) + + val partitionsToMove = replicatedPartitionsBrokerLeads.toSet + + partitionsToMove.foreach(topicAndPartition => { + val (topic, partition) = topicAndPartition.asTuple + // move leadership serially to relinquish lock. + controllerContext.controllerLock synchronized { + controllerContext.allLeaders.get(topicAndPartition).foreach(currLeader => { + if (currLeader == id) { + partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, + controlledShutdownPartitionLeaderSelector) + val newLeader = controllerContext.allLeaders(topicAndPartition) + + // mark replica offline only if leadership was moved successfully + if (newLeader != currLeader) + replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica) + } + else + debug("Partition %s moved from leader %d to new leader %d during shutdown." + .format(topicAndPartition, id, currLeader)) + }) + } + }) + + /* + * Force the shutting down broker out of the ISR of partitions that it + * follows, and shutdown the corresponding replica fetcher threads. + * This is really an optimization, so no need to register any callback + * to wait until completion. + */ + brokerRequestBatch.newBatch() + allPartitionsAndReplicationFactorOnBroker foreach { + case(topicAndPartition, replicationFactor) => + val (topic, partition) = topicAndPartition.asTuple + if (controllerContext.allLeaders(topicAndPartition) != id) { + brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false) + removeReplicaFromIsr(topic, partition, id) match { + case Some(updatedLeaderAndIsr) => + brokerRequestBatch.addLeaderAndIsrRequestForBrokers( + Seq(updatedLeaderAndIsr.leader), topic, partition, updatedLeaderAndIsr, replicationFactor) + case None => + // ignore + } + } + } + brokerRequestBatch.sendRequestsToBrokers() + + val partitionsRemaining = replicatedPartitionsBrokerLeads.toSet + debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(","))) + partitionsRemaining.size + } + } + /** * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller. * It does the following things on the become-controller state change - @@ -84,7 +195,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg partitionStateMachine.startup() replicaStateMachine.startup() info("Broker %d is ready to serve as the new controller".format(config.brokerId)) - }else + } + else info("Controller has been shut down, aborting startup/failover") } @@ -105,6 +217,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ def onBrokerStartup(newBrokers: Seq[Int]) { info("New broker startup callback for %s".format(newBrokers.mkString(","))) + // update leader and isr cache for broker updateLeaderAndIsrCache() // update partition state machine @@ -128,6 +241,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ def onBrokerFailure(deadBrokers: Seq[Int]) { info("Broker failure callback for %s".format(deadBrokers.mkString(","))) + + val deadBrokersThatWereShuttingDown = + deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id)) + info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown)) + // update leader and isr cache for broker updateLeaderAndIsrCache() // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers @@ -228,6 +346,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg registerSessionExpirationListener() isRunning = true controllerElector.startup + Utils.registerMBean(this, KafkaController.MBeanName) info("Controller startup complete") } } @@ -260,7 +379,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private def initializeControllerContext() { controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet - controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id) controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq) @@ -270,6 +388,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // start the channel manager startChannelManager() info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds)) + info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds)) info("Current list of topics in the cluster: %s".format(controllerContext.allTopics)) initializeAndMaybeTriggerPartitionReassignment() initializeAndMaybeTriggerPreferredReplicaElection() @@ -341,7 +460,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(","))) // move the leader to one of the alive and caught up new replicas partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector) - }else { + } + else { // check if the leader is alive or not controllerContext.liveBrokerIds.contains(currentLeader) match { case true => @@ -440,7 +560,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head if(currentLeader == preferredReplica) { info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica)) - }else { + } + else { warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader)) } } @@ -455,6 +576,55 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg }.flatten } + /** + * Removes a given partition replica from the ISR; if it is not the current + * leader and there are sufficient remaining replicas in ISR. + * @param topic topic + * @param partition partition + * @param replicaId replica Id + * @return the new leaderAndIsr (with the replica removed if it was present), + * or None if leaderAndIsr is empty. + */ + def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderAndIsr] = { + val topicAndPartition = TopicAndPartition(topic, partition) + debug("Removing replica %d from ISR of %s.".format(replicaId, topicAndPartition)) + var finalLeaderAndIsr: Option[LeaderAndIsr] = None + var zkWriteCompleteOrUnnecessary = false + while (!zkWriteCompleteOrUnnecessary) { + // refresh leader and isr from zookeeper again + val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) + zkWriteCompleteOrUnnecessary = leaderAndIsrOpt match { + case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes + if (leaderAndIsr.isr.contains(replicaId)) { + val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, + leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) + // update the new leadership decision in zookeeper or retry + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( + zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(), + leaderAndIsr.zkVersion) + newLeaderAndIsr.zkVersion = newVersion + + finalLeaderAndIsr = Some(newLeaderAndIsr) + if (updateSucceeded) + info("New leader and ISR for partition [%s, %d] is %s" + .format(topic, partition, newLeaderAndIsr.toString())) + updateSucceeded + } + else { + warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s" + .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr)) + finalLeaderAndIsr = Some(leaderAndIsr) + true + } + case None => + warn("Cannot remove replica %d from ISR of %s - leaderAndIsr is empty.".format(replicaId, topicAndPartition)) + true + } + } + finalLeaderAndIsr + } + class SessionExpirationListener() extends IZkStateListener with Logging { this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " @throws(classOf[Exception]) @@ -523,14 +693,16 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL throw new KafkaException("Partition %s to be reassigned is already assigned to replicas" .format(topicAndPartition) + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) - }else { + } + else { if(aliveNewReplicas == newReplicas) { info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) val context = createReassignmentContextForPartition(topic, partition, newReplicas) controllerContext.partitionsBeingReassigned.put(topicAndPartition, context) controller.onPartitionReassignment(topicAndPartition, context) - }else { + } + else { // some replica in RAR is not alive. Fail partition reassignment throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + @@ -609,7 +781,8 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) + "Resuming partition reassignment") controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext) - }else { + } + else { info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned." .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) + "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(","))) diff --git core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index cb19edd..a378c5e 100644 --- core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -25,10 +25,9 @@ trait PartitionLeaderSelector { /** * @param topic The topic of the partition whose leader needs to be elected * @param partition The partition whose leader needs to be elected - * @param assignedReplicas The list of replicas assigned to the input partition * @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper * @throws PartitionOfflineException If no replica in the assigned replicas list is alive - * @returns The leader and isr request, with the newly selected leader info, to send to the brokers + * @return The leader and isr request, with the newly selected leader info, to send to the brokers * Also, returns the list of replicas the returned leader and isr request should be sent to * This API selects a new leader for the input partition */ @@ -146,4 +145,41 @@ with Logging { } } } -} \ No newline at end of file +} + +/** + * Picks one of the alive replicas (other than the current leader) in ISR as + * new leader, fails if there are no other replicas in ISR. + */ +class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) + extends PartitionLeaderSelector + with Logging { + + this.logIdent = "[ControlledShutdownLeaderSelector]: " + + def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch + val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion + + val currentLeader = currentLeaderAndIsr.leader + + val assignedReplicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)) + val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds + val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) + + val newIsr = currentLeaderAndIsr.isr.filter(brokerId => brokerId != currentLeader && + !controllerContext.shuttingDownBrokerIds.contains(brokerId)) + val newLeaderOpt = newIsr.headOption + newLeaderOpt match { + case Some(newLeader) => + debug("Partition [%s,%d] : current leader = %d, new leader = %d" + .format(topic, partition, currentLeader, newLeader)) + (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), + liveAssignedReplicas) + case None => + throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition)) + } + } + +} + diff --git core/src/main/scala/kafka/controller/PartitionStateMachine.scala core/src/main/scala/kafka/controller/PartitionStateMachine.scala index cd1becd..e813da7 100644 --- core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -121,8 +121,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, leaderSelector: PartitionLeaderSelector) { val topicAndPartition = TopicAndPartition(topic, partition) + val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) try { - partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) targetState match { case NewPartition => // pre: partition did not exist before this @@ -163,8 +163,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // post: partition state is deleted from all brokers and zookeeper } }catch { - case e => error("State change for partition [%s, %d] ".format(topic, partition) + - "from %s to %s failed".format(partitionState(topicAndPartition), targetState), e) + case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) + + "from %s to %s failed".format(currState, targetState), t) } } @@ -203,8 +203,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { /** * Invoked on the NonExistentPartition->NewPartition state transition to update the controller's cache with the * partition's replica assignment. - * @topic The topic of the partition whose replica assignment is to be cached - * @partition The partition whose replica assignment is to be cached + * @param topic The topic of the partition whose replica assignment is to be cached + * @param partition The partition whose replica assignment is to be cached */ private def assignReplicasToPartitions(topic: String, partition: Int) { val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient, topic, partition) @@ -216,10 +216,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition state, it's leader and isr * path gets initialized and it never goes back to the NewPartition state. From here, it can only go to the * OfflinePartition state. - * @topic The topic of the partition whose leader and isr path is to be initialized - * @partition The partition whose leader and isr path is to be initialized - * @brokerRequestBatch The object that holds the leader and isr requests to be sent to each broker as a result of - * this state change + * @param topicAndPartition The topic/partition whose leader and isr path is to be initialized */ private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) { debug("Initializing leader and isr for partition %s".format(topicAndPartition)) @@ -258,10 +255,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { /** * Invoked on the OfflinePartition->OnlinePartition state change. It invokes the leader election API to elect a leader * for the input offline partition - * @topic The topic of the offline partition - * @partition The offline partition - * @brokerRequestBatch The object that holds the leader and isr requests to be sent to each broker as a result of - * this state change + * @param topic The topic of the offline partition + * @param partition The offline partition + * @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.) */ def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) { /** handle leader election for the partitions whose leader is no longer alive **/ @@ -291,8 +287,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { }catch { case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead." .format(topic, partition) + " Marking this partition offline", poe) - case sce => throw new StateChangeFailedException(("Error while electing leader for partition" + - " [%s, %d]").format(topic, partition), sce) + case sce => throw new StateChangeFailedException(("Error while electing leader for partition " + + " [%s, %d] due to: %s.").format(topic, partition, sce.getMessage), sce) } debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2)))) } diff --git core/src/main/scala/kafka/controller/ReplicaStateMachine.scala core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index f0ccb89..474c326 100644 --- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -19,7 +19,6 @@ package kafka.controller import collection._ import kafka.utils.{ZkUtils, Logging} import collection.JavaConversions._ -import kafka.api.LeaderAndIsr import java.util.concurrent.atomic.AtomicBoolean import org.I0Itec.zkclient.IZkChildListener import kafka.common.{TopicAndPartition, StateChangeFailedException} @@ -75,7 +74,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { /** * This API is invoked by the broker change controller callbacks and the startup API of the state machine - * @param brokerIds The list of brokers that need to be transitioned to the target state + * @param replicas The list of replicas (brokers) that need to be transitioned to the target state * @param targetState The state that the replicas should be moved to * The controller's allLeaders cache should have been updated before this */ @@ -122,7 +121,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case NonExistentReplica => assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState) // send stop replica command - brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition) + brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true) // remove this replica from the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) controllerContext.partitionReplicaAssignment.put(topicAndPartition, @@ -159,38 +158,38 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case OfflineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) // As an optimization, the controller removes dead replicas from the ISR - var zookeeperPathUpdateSucceeded: Boolean = false - var newLeaderAndIsr: LeaderAndIsr = null - while(!zookeeperPathUpdateSucceeded) { - // refresh leader and isr from zookeeper again - val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) - leaderAndIsrOpt match { - case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes - newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, - leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) - info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString())) - // update the new leadership decision in zookeeper or retry - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(), - leaderAndIsr.zkVersion) - newLeaderAndIsr.zkVersion = newVersion - zookeeperPathUpdateSucceeded = updateSucceeded - case None => throw new StateChangeFailedException("Failed to change state of replica %d".format(replicaId) + - " for partition [%s, %d] since the leader and isr path in zookeeper is empty".format(topic, partition)) - } + val currLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) + val leaderAndIsrIsEmpty: Boolean = currLeaderAndIsrOpt match { + case Some(currLeaderAndIsr) => + if (currLeaderAndIsr.isr.contains(replicaId)) + controller.removeReplicaFromIsr(topic, partition, replicaId) match { + case Some(updatedLeaderAndIsr) => + // send the shrunk ISR state change request only to the leader + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader), + topic, partition, updatedLeaderAndIsr, + replicaAssignment.size) + replicaState.put((topic, partition, replicaId), OfflineReplica) + info("Replica %d for partition [%s, %d] state changed to OfflineReplica" + .format(replicaId, topic, partition)) + info("Removed offline replica %d from ISR for partition [%s, %d]" + .format(replicaId, topic, partition)) + false + case None => + true + } + else false + case None => + true } - // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader), - topic, partition, newLeaderAndIsr, replicaAssignment.size) - // update the local leader and isr cache - controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr.leader) - replicaState.put((topic, partition, replicaId), OfflineReplica) - info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition)) - info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition)) + if (leaderAndIsrIsEmpty) + throw new StateChangeFailedException( + "Failed to change state of replica %d for partition [%s, %d] since the leader and isr path in zookeeper is empty" + .format(replicaId, topic, partition)) } - }catch { - case e => error("Error while changing state of replica %d for partition ".format(replicaId) + - "[%s, %d] to %s".format(topic, partition, targetState), e) + } + catch { + case t: Throwable => error("Error while changing state of replica %d for partition ".format(replicaId) + + "[%s, %d] to %s".format(topic, partition, targetState), t) } } @@ -239,12 +238,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { controllerContext.controllerLock synchronized { try { val curBrokerIds = currentBrokerList.map(_.toInt).toSet - val newBrokerIds = curBrokerIds -- controllerContext.liveBrokerIds + val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) - val deadBrokerIds = controllerContext.liveBrokerIds -- curBrokerIds + val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) - controllerContext.liveBrokerIds = controllerContext.liveBrokers.map(_.id) - info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s" + info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s" .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(","))) newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_)) deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_)) diff --git core/src/main/scala/kafka/server/AbstractFetcherManager.scala core/src/main/scala/kafka/server/AbstractFetcherManager.scala index c956a02..66728e3 100644 --- core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -25,7 +25,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I // map of (source brokerid, fetcher Id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread] private val mapLock = new Object - this.logIdent = "[" + name + "], " + this.logIdent = "[" + name + "] " private def getFetcherId(topic: String, partitionId: Int) : Int = { (topic.hashCode() + 31 * partitionId) % numFetchers diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index 97fbe76..f192ed2 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -88,11 +88,11 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Handling stop replica request " + stopReplicaRequest) val responseMap = new HashMap[(String, Int), Short] - for((topic, partitionId) <- stopReplicaRequest.partitions) { - val errorCode = replicaManager.stopReplica(topic, partitionId) + val errorCode = replicaManager.stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions) responseMap.put((topic, partitionId), errorCode) } + val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) } @@ -455,11 +455,20 @@ class KafkaApis(val requestChannel: RequestChannel, * When a request expires just answer it with whatever data is present */ def expire(delayed: DelayedFetch) { - val topicData = readMessageSets(delayed.fetch) - val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) - val fromFollower = delayed.fetch.isFromFollower - delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) - requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) + debug("Expiring fetch request %s.".format(delayed.fetch)) + try { + val topicData = readMessageSets(delayed.fetch) + val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) + val fromFollower = delayed.fetch.isFromFollower + delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) + requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) + } + catch { + case e1: LeaderNotAvailableException => + debug("Leader changed before fetch request %s expired.".format(delayed.fetch)) + case e2: UnknownTopicOrPartitionException => + debug("Replica went offline before fetch request %s expired.".format(delayed.fetch)) + } } } diff --git core/src/main/scala/kafka/server/ReplicaFetcherManager.scala core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 69db208..9f696dd 100644 --- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -20,7 +20,7 @@ package kafka.server import kafka.cluster.Broker class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) - extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) { + extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) { override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr) diff --git core/src/main/scala/kafka/server/ReplicaManager.scala core/src/main/scala/kafka/server/ReplicaManager.scala index 8461dbe..11d08f4 100644 --- core/src/main/scala/kafka/server/ReplicaManager.scala +++ core/src/main/scala/kafka/server/ReplicaManager.scala @@ -91,14 +91,15 @@ class ReplicaManager(val config: KafkaConfig, kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs) } - def stopReplica(topic: String, partitionId: Int): Short = { + def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId)) val errorCode = ErrorMapping.NoError getReplica(topic, partitionId) match { case Some(replica) => replicaFetcherManager.removeFetcher(topic, partitionId) /* TODO: handle deleteLog in a better way */ - //logManager.deleteLog(topic, partition) + //if (deletePartition) + // logManager.deleteLog(topic, partition) leaderPartitionsLock synchronized { leaderPartitions -= replica.partition } diff --git core/src/main/scala/kafka/utils/ZkUtils.scala core/src/main/scala/kafka/utils/ZkUtils.scala index 114bc98..c87ea69 100644 --- core/src/main/scala/kafka/utils/ZkUtils.scala +++ core/src/main/scala/kafka/utils/ZkUtils.scala @@ -650,7 +650,7 @@ object ZkUtils extends Logging { * or throws an exception if the broker dies before the query to zookeeper finishes * @param brokerId The broker id * @param zkClient The zookeeper client connection - * @returns An optional Broker object encapsulating the broker metadata + * @return An optional Broker object encapsulating the broker metadata */ def getBrokerInfo(zkClient: ZkClient, brokerId: Int): Option[Broker] = { ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { diff --git core/src/test/scala/unit/kafka/admin/AdminTest.scala core/src/test/scala/unit/kafka/admin/AdminTest.scala index e5cfc33..69973b8 100644 --- core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -22,7 +22,8 @@ import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.server.KafkaConfig import kafka.utils.{ZkUtils, TestUtils} -import kafka.common.{TopicAndPartition, ErrorMapping} +import kafka.common.{ErrorMapping, TopicAndPartition} + class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -357,6 +358,53 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness { servers.foreach(_.shutdown()) } + @Test + def testShutdownBroker() { + val expectedReplicaAssignment = Map(1 -> List("0", "1", "2")) + val topic = "test" + val partition = 1 + // create brokers + val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) + // create the topic + AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) + + // broker 2 should be the leader since it was started first + var leaderBeforeShutdown = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get + var controllerId = ZkUtils.getController(zkClient) + var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController + var partitionsRemaining = controller.shutdownBroker(2) + assertEquals(0, partitionsRemaining) + var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) + var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id + assertTrue(leaderAfterShutdown != leaderBeforeShutdown) + assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size) + + leaderBeforeShutdown = leaderAfterShutdown + controllerId = ZkUtils.getController(zkClient) + controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController + partitionsRemaining = controller.shutdownBroker(1) + assertEquals(0, partitionsRemaining) + topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) + leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id + assertTrue(leaderAfterShutdown != leaderBeforeShutdown) + assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size) + + leaderBeforeShutdown = leaderAfterShutdown + controllerId = ZkUtils.getController(zkClient) + controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController + partitionsRemaining = controller.shutdownBroker(0) + assertEquals(1, partitionsRemaining) + topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) + leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id + assertTrue(leaderAfterShutdown == leaderBeforeShutdown) + assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size) + + servers.foreach(_.shutdown()) + + + } + private def checkIfReassignPartitionPathExists(): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } diff --git core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index c50f91a..9d212bf 100644 --- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -97,7 +97,7 @@ object SerializationTestUtils{ } def createTestStopReplicaRequest() : StopReplicaRequest = { - new StopReplicaRequest(collection.immutable.Set((topic1, 0), (topic2, 0))) + new StopReplicaRequest(deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0))) } def createTestStopReplicaResponse() : StopReplicaResponse = {