diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 65c04ed..2637586 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -122,7 +122,7 @@ object ReassignPartitionsCommand extends Logging { .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) // start the reassignment if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))) else println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 13f48ba..5c9307d 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -72,7 +72,7 @@ class Partition(val topic: String, leaderIsrUpdateLock synchronized { leaderReplicaIfLocal() match { case Some(_) => - inSyncReplicas.size < replicationFactor + inSyncReplicas.size < assignedReplicas.size case None => false } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 394e981..9390edf 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -20,7 +20,7 @@ package kafka.server import scala.collection.mutable import scala.collection.Set import scala.collection.Map -import kafka.utils.Logging +import kafka.utils.{Utils, Logging} import kafka.cluster.Broker import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition @@ -63,7 +63,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: ) private def getFetcherId(topic: String, partitionId: Int) : Int = { - (31 * topic.hashCode() + partitionId) % numFetchers + Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers } // to be defined in subclass to create a specific fetcher diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b0a3962..64f7eca 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -206,7 +206,7 @@ class ReplicaManager(val config: KafkaConfig, def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, stateInfo.leaderIsrAndControllerEpoch, leaderAndISRRequest.correlationId, + .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition)) } replicaStateChangeLock synchronized { @@ -228,10 +228,17 @@ class ReplicaManager(val config: KafkaConfig, leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) val partitionLeaderEpoch = partition.getLeaderEpoch() + // If the leader epoch is valid record the epoch of the controller that made the leadership decision. + // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) { - // If the leader epoch is valid record the epoch of the controller that made the leadership decision. - // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path - partitionState.put(partition, partitionStateInfo) + if(partitionStateInfo.allReplicas.contains(config.brokerId)) + partitionState.put(partition, partitionStateInfo) + else { + stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " + + "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]") + .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, + partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId)) + } } else { // Otherwise record the error code in response stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + @@ -303,7 +310,7 @@ class ReplicaManager(val config: KafkaConfig, case e: Throwable => partitionState.foreach { state => val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" + - "epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, + " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)) stateChangeLogger.error(errorMsg, e) }