diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index cbac5d0..9a29fb2 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -23,9 +23,10 @@ import kafka.api.LeaderAndIsr import kafka.server.ReplicaManager import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup -import kafka.common.ErrorMapping +import kafka.common.{NotLeaderForPartitionException, ErrorMapping} import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger +import kafka.message.ByteBufferMessageSet /** @@ -259,7 +260,11 @@ class Partition(val topic: String, } } - def maybeIncrementLeaderHW(leaderReplica: Replica) { + /** + * There is no need to acquire the leaderIsrUpdate lock here since all callers of this private API acquire that lock + * @param leaderReplica + */ + private def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min val oldHighWatermark = leaderReplica.highWatermark @@ -315,6 +320,23 @@ class Partition(val topic: String, stuckReplicas ++ slowReplicas } + def appendMessagesToLeader(messages: ByteBufferMessageSet): (Long, Long) = { + leaderIsrUpdateLock synchronized { + val leaderReplicaOpt = leaderReplicaIfLocal() + leaderReplicaOpt match { + case Some(leaderReplica) => + val log = leaderReplica.log.get + val (start, end) = log.append(messages, assignOffsets = true) + // we may need to increment high watermark since ISR could be down to 1 + maybeIncrementLeaderHW(leaderReplica) + (start, end) + case None => + throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d" + .format(topic, partitionId, localBrokerId)) + } + } + } + private def updateIsr(newIsr: Set[Replica]) { debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(","))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7ee81a1..6b6f8f2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -188,11 +188,14 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) try { - val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) - val log = localReplica.log.get - val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true) - // we may need to increment high watermark since ISR could be down to 1 - localReplica.partition.maybeIncrementLeaderHW(localReplica) + val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) + val (start, end) = + partitionOpt match { + case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) + case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" + .format(topicAndPartition, brokerId)) + + } trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end)) ProduceResult(topicAndPartition, start, end) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0d39a57..cc971aa 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -180,7 +180,7 @@ class ReplicaManager(val config: KafkaConfig, partition.leaderReplicaIfLocal match { case Some(leaderReplica) => leaderReplica case None => - throw new NotLeaderForPartitionException("Leader not local for topic %s partition %d on broker %d" + throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d" .format(topic, partitionId, config.brokerId)) } }