diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c08eab0..b23a718 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -16,22 +16,23 @@ */ package kafka.cluster -import scala.collection._ +import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils._ -import java.lang.Object +import kafka.utils.{ZkUtils, Pool, Time, Logging} +import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig import kafka.server.{OffsetManager, ReplicaManager} -import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController -import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet -import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping} + import java.io.IOException +import java.util.concurrent.locks.ReentrantReadWriteLock import scala.Some -import kafka.common.TopicAndPartition +import scala.collection._ + +import com.yammer.metrics.core.Gauge /** @@ -48,7 +49,7 @@ class Partition(val topic: String, var leaderReplicaIdOpt: Option[Int] = None var inSyncReplicas: Set[Replica] = Set.empty[Replica] private val assignedReplicaMap = new Pool[Int,Replica] - private val leaderIsrUpdateLock = new Object + private val leaderIsrUpdateLock = new ReentrantReadWriteLock() private var zkVersion: Int = LeaderAndIsr.initialZKVersion private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. @@ -72,7 +73,7 @@ class Partition(val topic: String, ) def isUnderReplicated(): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => inSyncReplicas.size < assignedReplicas.size @@ -114,7 +115,7 @@ class Partition(val topic: String, } def leaderReplicaIfLocal(): Option[Replica] = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIdOpt match { case Some(leaderReplicaId) => if (leaderReplicaId == localBrokerId) @@ -140,7 +141,7 @@ class Partition(val topic: String, def delete() { // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None @@ -155,7 +156,7 @@ class Partition(val topic: String, } def getLeaderEpoch(): Int = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { return this.leaderEpoch } } @@ -167,7 +168,7 @@ class Partition(val topic: String, def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -200,7 +201,7 @@ class Partition(val topic: String, def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -234,7 +235,7 @@ class Partition(val topic: String, } def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) val replicaOpt = getReplica(replicaId) if(!replicaOpt.isDefined) { @@ -270,7 +271,7 @@ class Partition(val topic: String, } def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => val numAcks = inSyncReplicas.count(r => { @@ -314,7 +315,7 @@ class Partition(val topic: String, } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { leaderReplicaIfLocal() match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) @@ -356,7 +357,7 @@ class Partition(val topic: String, } def appendMessagesToLeader(messages: ByteBufferMessageSet) = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val leaderReplicaOpt = leaderReplicaIfLocal() leaderReplicaOpt match { case Some(leaderReplica) => @@ -402,7 +403,7 @@ class Partition(val topic: String, } override def toString(): String = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { val partitionString = new StringBuilder partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index fecee8d..d6d02a4 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -36,6 +36,8 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message def validBytes: Int = underlying.validBytes + def numMessages: Int = underlying.numMessages + override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] { val underlyingIterator = underlying.iterator override def hasNext(): Boolean = { diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index b2652dd..1e4c553 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -202,7 +202,17 @@ class FileMessageSet private[kafka](@volatile var file: File, * The number of bytes taken up by this file set */ def sizeInBytes(): Int = _size.get() - + + /** + * The number of messages in this file set + */ + def numMessages(): Int = { + var messages = 0 + for (messageAndOffset <- this) + messages += 1 + messages + } + /** * Append these messages to the message set */ diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 46df8d9..3ad1124 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -235,7 +235,7 @@ class Log(val dir: File, return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log - var validMessages = trimInvalidBytes(messages) + var validMessages = trimInvalidBytes(messages, appendInfo) try { // they are valid, insert them in the log @@ -246,7 +246,7 @@ class Log(val dir: File, val segment = maybeRoll() if(assignOffsets) { - // assign offsets to the messageset + // assign offsets to the message set val offset = new AtomicLong(nextOffset.get) try { validMessages = validMessages.assignOffsets(offset, appendInfo.codec) @@ -260,8 +260,7 @@ class Log(val dir: File, throw new IllegalArgumentException("Out of order offsets found in " + messages) } - // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison - // happens with the new message size (after re-compression, if any) + // Re-check if the message sizes are valid after possible re-compression. for(messageAndOffset <- validMessages.shallowIterator) { if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." @@ -290,15 +289,18 @@ class Log(val dir: File, /** Struct to hold various quantities we compute about each message set before appending to the log * @param firstOffset The first offset in the message set * @param lastOffset The last offset in the message set + * @param shallowCount The number of shallow messages + * @param bytesCount The number of total bytes * @param codec The codec used in the message set * @param offsetsMonotonic Are the offsets in this message set monotonically increasing */ - case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, offsetsMonotonic: Boolean) + case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, bytesCount: Int, offsetsMonotonic: Boolean) /** * Validate the following: *