diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 2022c9f..ed4380f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -18,20 +18,20 @@ package kafka.controller import collection._ import collection.immutable.Set -import kafka.cluster.Broker +import com.yammer.metrics.core.Gauge +import java.lang.{IllegalStateException, Object} +import java.util.concurrent.TimeUnit +import kafka.admin.PreferredReplicaLeaderElectionCommand import kafka.api._ +import kafka.cluster.Broker +import kafka.common.{TopicAndPartition, KafkaException} +import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} +import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ +import kafka.utils.{Utils, ZkUtils, Logging} import org.apache.zookeeper.Watcher.Event.KeeperState -import kafka.server.{ZookeeperLeaderElector, KafkaConfig} -import java.util.concurrent.TimeUnit -import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} -import com.yammer.metrics.core.Gauge import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} -import kafka.utils.{Utils, ZkUtils, Logging} import org.I0Itec.zkclient.exception.ZkNoNodeException -import java.lang.{IllegalStateException, Object} -import kafka.admin.PreferredReplicaLeaderElectionCommand -import kafka.common.{TopicAndPartition, KafkaException} class ControllerContext(val zkClient: ZkClient, var controllerChannelManager: ControllerChannelManager = null, @@ -42,7 +42,7 @@ class ControllerContext(val zkClient: ZkClient, var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = null, var allLeaders: mutable.Map[TopicAndPartition, Int] = null, var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = - new mutable.HashMap, + new mutable.HashMap, var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet) class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup { @@ -84,7 +84,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg partitionStateMachine.startup() replicaStateMachine.startup() info("Broker %d is ready to serve as the new controller".format(config.brokerId)) - }else + } else info("Controller has been shut down, aborting startup/failover") } @@ -98,41 +98,46 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg /** * This callback is invoked by the replica state machine's broker change listener, with the list of newly started * brokers as input. It does the following - - * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update - * leader and ISR for every partition as they take place - * 2. Triggers the OnlinePartition state change for all new/offline partitions - * 3. Invokes the OnlineReplica state change on the input list of newly started brokers + * 1. Triggers the OnlinePartition state change for all new/offline partitions + * 2. It checks whether there are reassigned replicas assigned to any newly started brokers. If + * so, it performs the reassignment logic for each topic/partition. + * + * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons: + * 1. The partition state machine, when triggering online state change, will refresh leader and ISR for only those + * partitions currently new or offline (rather than every partition this controller is aware of) + * 2. Even if we do refresh the cache, there is no guarantee that by the time the leader and ISR request reaches + * every broker that it is still valid. Brokers check the leader epoch to determine validity of the request. */ def onBrokerStartup(newBrokers: Seq[Int]) { info("New broker startup callback for %s".format(newBrokers.mkString(","))) - // update leader and isr cache for broker - updateLeaderAndIsrCache() + val newBrokersSet = newBrokers.toSet // update partition state machine partitionStateMachine.triggerOnlinePartitionStateChange() replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica) // check if reassignment of some partitions need to be restarted - val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter(p => - p._2.newReplicas.foldLeft(false)((a, replica) => newBrokers.contains(replica) || a)) + val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter(partitionAndContext => + partitionAndContext._2.newReplicas.exists(newBrokersSet.contains(_))) partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2)) } /** * This callback is invoked by the replica state machine's broker change listener with the list of failed brokers * as input. It does the following - - * 1. Updates the leader and ISR cache. We have to do this since we don't register zookeeper listeners to update - * leader and ISR for every partition as they take place - * 2. Mark partitions with dead leaders offline - * 3. Triggers the OnlinePartition state change for all new/offline partitions - * 4. Invokes the OfflineReplica state change on the input list of newly started brokers + * 1. Mark partitions with dead leaders as offline + * 2. Triggers the OnlinePartition state change for all new/offline partitions + * 3. Invokes the OfflineReplica state change on the input list of newly started brokers + * + * Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because + * the partition state machine will refresh our cache for us when performing leader election for all new/offline + * partitions coming online. */ def onBrokerFailure(deadBrokers: Seq[Int]) { info("Broker failure callback for %s".format(deadBrokers.mkString(","))) - // update leader and isr cache for broker - updateLeaderAndIsrCache() + val deadBrokersSet = deadBrokers.toSet // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader => - deadBrokers.contains(partitionAndLeader._2)).keySet + deadBrokersSet.contains(partitionAndLeader._2)).keySet partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition) // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index cd1becd..da3bfd9 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -17,13 +17,13 @@ package kafka.controller import collection._ +import collection.JavaConversions._ +import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr +import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException} import kafka.utils.{Logging, ZkUtils} import org.I0Itec.zkclient.IZkChildListener -import collection.JavaConversions._ -import java.util.concurrent.atomic.AtomicBoolean import org.I0Itec.zkclient.exception.ZkNodeExistsException -import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException} /** * This class represents the state machine for partitions. It defines the states that a partition can be in, and @@ -81,13 +81,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { try { brokerRequestBatch.newBatch() // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state - partitionState.filter(partitionAndState => - partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach { - partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition, OnlinePartition, - offlinePartitionSelector) + for((topicAndPartition, partitionState) <- partitionState) { + if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) + handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector) } brokerRequestBatch.sendRequestsToBrokers() - }catch { + } catch { case e => error("Error while moving some partitions to the online state", e) } } @@ -106,7 +105,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector) } brokerRequestBatch.sendRequestsToBrokers() - }catch { + } catch { case e => error("Error while moving some partitions to %s state".format(targetState), e) } } @@ -162,7 +161,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { partitionState.put(topicAndPartition, NonExistentPartition) // post: partition state is deleted from all brokers and zookeeper } - }catch { + } catch { case e => error("State change for partition [%s, %d] ".format(topic, partition) + "from %s to %s failed".format(partitionState(topicAndPartition), targetState), e) } @@ -246,7 +245,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { topicAndPartition.partition, leaderAndIsr, replicaAssignment.size) controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr.leader) partitionState.put(topicAndPartition, OnlinePartition) - }catch { + } catch { case e: ZkNodeExistsException => ControllerStat.offlinePartitionRate.mark() throw new StateChangeFailedException("Error while changing partition %s's state from New to Online" @@ -264,7 +263,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * this state change */ def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) { - /** handle leader election for the partitions whose leader is no longer alive **/ + // handle leader election for the partitions whose leader is no longer alive info("Electing leader for partition [%s, %d]".format(topic, partition)) try { var zookeeperPathUpdateSucceeded: Boolean = false @@ -284,11 +283,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // update the leader cache controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr.leader) info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition)) - // store new leader and isr info in cache - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, - topic, partition, newLeaderAndIsr, + // notify all replicas of the new leader + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size) - }catch { + } catch { case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead." .format(topic, partition) + " Marking this partition offline", poe) case sce => throw new StateChangeFailedException(("Error while electing leader for partition" +