diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index cbac5d0..0373df8 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -18,14 +18,15 @@ package kafka.cluster import scala.collection._ import kafka.utils._ -import java.lang.Object import kafka.api.LeaderAndIsr import kafka.server.ReplicaManager import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup -import kafka.common.ErrorMapping +import kafka.common.{NotLeaderForPartitionException, ErrorMapping} import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger +import kafka.message.ByteBufferMessageSet +import java.util.concurrent.locks.ReentrantReadWriteLock /** @@ -43,7 +44,9 @@ class Partition(val topic: String, var leaderReplicaIdOpt: Option[Int] = None var inSyncReplicas: Set[Replica] = Set.empty[Replica] private val assignedReplicaMap = new Pool[Int,Replica] - private val leaderIsrUpdateLock = new Object + private val leaderIsrLock = new ReentrantReadWriteLock() + private val leaderIsrReadLock = leaderIsrLock.readLock() + private val leaderIsrWriteLock = leaderIsrLock.writeLock() private var zkVersion: Int = LeaderAndIsr.initialZKVersion private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. @@ -67,7 +70,7 @@ class Partition(val topic: String, ) def isUnderReplicated(): Boolean = { - leaderIsrUpdateLock synchronized { + leaderIsrReadLock synchronized { inSyncReplicas.size < replicationFactor } } @@ -100,7 +103,7 @@ class Partition(val topic: String, } def leaderReplicaIfLocal(): Option[Replica] = { - leaderIsrUpdateLock synchronized { + leaderIsrReadLock synchronized { leaderReplicaIdOpt match { case Some(leaderReplicaId) => if (leaderReplicaId == localBrokerId) @@ -130,7 +133,7 @@ class Partition(val topic: String, */ def makeLeader(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { - leaderIsrUpdateLock synchronized { + leaderIsrWriteLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr if (leaderEpoch >= leaderAndIsr.leaderEpoch){ stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " + @@ -167,7 +170,7 @@ class Partition(val topic: String, */ def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, aliveLeaders: Set[Broker], correlationId: Int): Boolean = { - leaderIsrUpdateLock synchronized { + leaderIsrWriteLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr if (leaderEpoch >= leaderAndIsr.leaderEpoch) { stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " + @@ -207,7 +210,7 @@ class Partition(val topic: String, } def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { - leaderIsrUpdateLock synchronized { + leaderIsrReadLock synchronized { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) val replica = getOrCreateReplica(replicaId) replica.logEndOffset = offset @@ -233,7 +236,7 @@ class Partition(val topic: String, } def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { - leaderIsrUpdateLock synchronized { + leaderIsrReadLock synchronized { leaderReplicaIfLocal() match { case Some(_) => val numAcks = inSyncReplicas.count(r => { @@ -259,7 +262,11 @@ class Partition(val topic: String, } } - def maybeIncrementLeaderHW(leaderReplica: Replica) { + /** + * There is no need to acquire the leaderIsrUpdate lock here since all callers of this private API acquire that lock + * @param leaderReplica + */ + private def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min val oldHighWatermark = leaderReplica.highWatermark @@ -273,7 +280,7 @@ class Partition(val topic: String, } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { - leaderIsrUpdateLock synchronized { + leaderIsrReadLock synchronized { leaderReplicaIfLocal() match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) @@ -315,19 +322,38 @@ class Partition(val topic: String, stuckReplicas ++ slowReplicas } + def appendMessagesToLeader(messages: ByteBufferMessageSet): (Long, Long) = { + leaderIsrReadLock synchronized { + val leaderReplicaOpt = leaderReplicaIfLocal() + leaderReplicaOpt match { + case Some(leaderReplica) => + val log = leaderReplica.log.get + val (start, end) = log.append(messages, assignOffsets = true) + // we may need to increment high watermark since ISR could be down to 1 + maybeIncrementLeaderHW(leaderReplica) + (start, end) + case None => + throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d" + .format(topic, partitionId, localBrokerId)) + } + } + } + private def updateIsr(newIsr: Set[Replica]) { - debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(","))) - val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) - // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), - ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) - if (updateSucceeded){ - inSyncReplicas = newIsr - zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) - } else { - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + leaderIsrWriteLock synchronized { + debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(","))) + val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) + // use the epoch of the controller that made the leadership decision, instead of the current controller epoch + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) + if (updateSucceeded){ + inSyncReplicas = newIsr + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + } } } @@ -345,7 +371,7 @@ class Partition(val topic: String, } override def toString(): String = { - leaderIsrUpdateLock synchronized { + leaderIsrReadLock synchronized { val partitionString = new StringBuilder partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7ee81a1..6b6f8f2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -188,11 +188,14 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) try { - val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) - val log = localReplica.log.get - val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true) - // we may need to increment high watermark since ISR could be down to 1 - localReplica.partition.maybeIncrementLeaderHW(localReplica) + val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) + val (start, end) = + partitionOpt match { + case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) + case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" + .format(topicAndPartition, brokerId)) + + } trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end)) ProduceResult(topicAndPartition, start, end) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0d39a57..cc971aa 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -180,7 +180,7 @@ class ReplicaManager(val config: KafkaConfig, partition.leaderReplicaIfLocal match { case Some(leaderReplica) => leaderReplica case None => - throw new NotLeaderForPartitionException("Leader not local for topic %s partition %d on broker %d" + throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d" .format(topic, partitionId, config.brokerId)) } }