diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 65c04ed..2637586 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -122,7 +122,7 @@ object ReassignPartitionsCommand extends Logging { .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) // start the reassignment if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))) else println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 13f48ba..5c9307d 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -72,7 +72,7 @@ class Partition(val topic: String, leaderIsrUpdateLock synchronized { leaderReplicaIfLocal() match { case Some(_) => - inSyncReplicas.size < replicationFactor + inSyncReplicas.size < assignedReplicas.size case None => false } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index fd92c65..0c3faad 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -677,8 +677,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg brokerRequestBatch.newBatch() updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match { case Some(updatedLeaderIsrAndControllerEpoch) => - // send the shrunk assigned replica list to all the replicas, including the leader, so that it no longer - // allows old replicas to enter ISR brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 394e981..9390edf 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -20,7 +20,7 @@ package kafka.server import scala.collection.mutable import scala.collection.Set import scala.collection.Map -import kafka.utils.Logging +import kafka.utils.{Utils, Logging} import kafka.cluster.Broker import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition @@ -63,7 +63,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: ) private def getFetcherId(topic: String, partitionId: Int) : Int = { - (31 * topic.hashCode() + partitionId) % numFetchers + Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers } // to be defined in subclass to create a specific fetcher diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b0a3962..242c18d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -77,16 +77,17 @@ class ReplicaManager(val config: KafkaConfig, newGauge( "UnderReplicatedPartitions", new Gauge[Int] { - def value = { - leaderPartitionsLock synchronized { - leaderPartitions.count(_.isUnderReplicated) - } - } + def value = underReplicatedPartitionCount() } ) val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) + def underReplicatedPartitionCount(): Int = { + leaderPartitionsLock synchronized { + leaderPartitions.count(_.isUnderReplicated) + } + } def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) @@ -206,7 +207,7 @@ class ReplicaManager(val config: KafkaConfig, def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, stateInfo.leaderIsrAndControllerEpoch, leaderAndISRRequest.correlationId, + .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition)) } replicaStateChangeLock synchronized { @@ -228,10 +229,17 @@ class ReplicaManager(val config: KafkaConfig, leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) val partitionLeaderEpoch = partition.getLeaderEpoch() + // If the leader epoch is valid record the epoch of the controller that made the leadership decision. + // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) { - // If the leader epoch is valid record the epoch of the controller that made the leadership decision. - // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path - partitionState.put(partition, partitionStateInfo) + if(partitionStateInfo.allReplicas.contains(config.brokerId)) + partitionState.put(partition, partitionStateInfo) + else { + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " + + "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]") + .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, + partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId)) + } } else { // Otherwise record the error code in response stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + @@ -303,7 +311,7 @@ class ReplicaManager(val config: KafkaConfig, case e: Throwable => partitionState.foreach { state => val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" + - "epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, + " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)) stateChangeLogger.error(errorMsg, e) } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 86f88f5..52d35a3 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -23,7 +23,7 @@ import java.util.Properties import kafka.utils._ import kafka.log._ import kafka.zk.ZooKeeperTestHarness -import kafka.server.KafkaConfig +import kafka.server.{KafkaServer, KafkaConfig} import kafka.utils.{Logging, ZkUtils, TestUtils} import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition} @@ -153,10 +153,10 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) // in sync replicas should not have any replica that is not in the new assigned replicas checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) + ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) servers.foreach(_.shutdown()) } @@ -183,6 +183,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) + ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) servers.foreach(_.shutdown()) } @@ -209,6 +210,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) + ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) servers.foreach(_.shutdown()) } @@ -247,6 +249,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) + // ensure that there are no under replicated partitions + ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) servers.foreach(_.shutdown()) } @@ -382,6 +386,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { phantomInSyncReplicas.size == 0) } + private def ensureNoUnderReplicatedPartitions(topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int], + servers: Seq[KafkaServer]) { + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) + assertFalse("Reassigned partition [%s,%d] is underreplicated".format(topic, partitionToBeReassigned), + inSyncReplicas.size < assignedReplicas.size) + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned) + assertTrue("Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned), leader.isDefined) + val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head + assertTrue("Reassigned partition [%s,%d] is underreplicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get), + leaderBroker.replicaManager.underReplicatedPartitionCount() == 0) + } + private def checkIfReassignPartitionPathExists(): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) }