diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c08eab0..518d2df 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -16,22 +16,23 @@ */ package kafka.cluster -import scala.collection._ +import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils._ -import java.lang.Object +import kafka.utils.{ZkUtils, Pool, Time, Logging} +import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig import kafka.server.{OffsetManager, ReplicaManager} -import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController -import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet -import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping} + import java.io.IOException +import java.util.concurrent.locks.ReentrantReadWriteLock import scala.Some -import kafka.common.TopicAndPartition +import scala.collection._ + +import com.yammer.metrics.core.Gauge /** @@ -48,7 +49,7 @@ class Partition(val topic: String, var leaderReplicaIdOpt: Option[Int] = None var inSyncReplicas: Set[Replica] = Set.empty[Replica] private val assignedReplicaMap = new Pool[Int,Replica] - private val leaderIsrUpdateLock = new Object + private val leaderIsrUpdateLock = new ReentrantReadWriteLock() private var zkVersion: Int = LeaderAndIsr.initialZKVersion private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. @@ -72,7 +73,7 @@ class Partition(val topic: String, ) def isUnderReplicated(): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => inSyncReplicas.size < assignedReplicas.size @@ -114,7 +115,7 @@ class Partition(val topic: String, } def leaderReplicaIfLocal(): Option[Replica] = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIdOpt match { case Some(leaderReplicaId) => if (leaderReplicaId == localBrokerId) @@ -140,7 +141,7 @@ class Partition(val topic: String, def delete() { // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None @@ -155,7 +156,7 @@ class Partition(val topic: String, } def getLeaderEpoch(): Int = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { return this.leaderEpoch } } @@ -167,7 +168,7 @@ class Partition(val topic: String, def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -200,7 +201,7 @@ class Partition(val topic: String, def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -234,7 +235,7 @@ class Partition(val topic: String, } def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) val replicaOpt = getReplica(replicaId) if(!replicaOpt.isDefined) { @@ -270,7 +271,7 @@ class Partition(val topic: String, } def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => val numAcks = inSyncReplicas.count(r => { @@ -314,7 +315,7 @@ class Partition(val topic: String, } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { leaderReplicaIfLocal() match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) @@ -356,7 +357,7 @@ class Partition(val topic: String, } def appendMessagesToLeader(messages: ByteBufferMessageSet) = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { val leaderReplicaOpt = leaderReplicaIfLocal() leaderReplicaOpt match { case Some(leaderReplica) => @@ -402,7 +403,7 @@ class Partition(val topic: String, } override def toString(): String = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { val partitionString = new StringBuilder partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 46df8d9..4a71f82 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -17,15 +17,18 @@ package kafka.log -import java.io.{IOException, File} -import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} -import java.util.concurrent.atomic._ import kafka.utils._ -import scala.collection.JavaConversions -import java.text.NumberFormat import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup +import kafka.server.BrokerTopicStats + +import java.io.{IOException, File} +import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} +import java.util.concurrent.atomic._ +import java.text.NumberFormat +import scala.collection.JavaConversions + import com.yammer.metrics.core.Gauge @@ -235,7 +238,7 @@ class Log(val dir: File, return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log - var validMessages = trimInvalidBytes(messages) + var validMessages = trimInvalidBytes(messages, appendInfo) try { // they are valid, insert them in the log @@ -246,7 +249,7 @@ class Log(val dir: File, val segment = maybeRoll() if(assignOffsets) { - // assign offsets to the messageset + // assign offsets to the message set val offset = new AtomicLong(nextOffset.get) try { validMessages = validMessages.assignOffsets(offset, appendInfo.codec) @@ -260,12 +263,16 @@ class Log(val dir: File, throw new IllegalArgumentException("Out of order offsets found in " + messages) } - // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison - // happens with the new message size (after re-compression, if any) + // re-validate message sizes since after re-compression some may exceed the limit for(messageAndOffset <- validMessages.shallowIterator) { - if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) + if(MessageSet.entrySNnnnNize(messageAndOffset.message) > config.maxMessageSize) { + // we record the original message set size instead of trimmed size + // to be consistent with pre-compression bytesRejectedRate recording + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) + } } // now append to the log @@ -287,18 +294,22 @@ class Log(val dir: File, } } - /** Struct to hold various quantities we compute about each message set before appending to the log + /** + * Struct to hold various quantities we compute about each message set before appending to the log * @param firstOffset The first offset in the message set * @param lastOffset The last offset in the message set + * @param shallowCount The number of shallow messages + * @param validBytes The number of valid bytes * @param codec The codec used in the message set * @param offsetsMonotonic Are the offsets in this message set monotonically increasing */ - case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, offsetsMonotonic: Boolean) + case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) /** * Validate the following: *