diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c08eab0..b23a718 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -16,22 +16,23 @@ */ package kafka.cluster -import scala.collection._ +import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils._ -import java.lang.Object +import kafka.utils.{ZkUtils, Pool, Time, Logging} +import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig import kafka.server.{OffsetManager, ReplicaManager} -import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController -import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet -import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping} + import java.io.IOException +import java.util.concurrent.locks.ReentrantReadWriteLock import scala.Some -import kafka.common.TopicAndPartition +import scala.collection._ + +import com.yammer.metrics.core.Gauge /** @@ -48,7 +49,7 @@ class Partition(val topic: String, var leaderReplicaIdOpt: Option[Int] = None var inSyncReplicas: Set[Replica] = Set.empty[Replica] private val assignedReplicaMap = new Pool[Int,Replica] - private val leaderIsrUpdateLock = new Object + private val leaderIsrUpdateLock = new ReentrantReadWriteLock() private var zkVersion: Int = LeaderAndIsr.initialZKVersion private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. @@ -72,7 +73,7 @@ class Partition(val topic: String, ) def isUnderReplicated(): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => inSyncReplicas.size < assignedReplicas.size @@ -114,7 +115,7 @@ class Partition(val topic: String, } def leaderReplicaIfLocal(): Option[Replica] = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIdOpt match { case Some(leaderReplicaId) => if (leaderReplicaId == localBrokerId) @@ -140,7 +141,7 @@ class Partition(val topic: String, def delete() { // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None @@ -155,7 +156,7 @@ class Partition(val topic: String, } def getLeaderEpoch(): Int = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { return this.leaderEpoch } } @@ -167,7 +168,7 @@ class Partition(val topic: String, def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -200,7 +201,7 @@ class Partition(val topic: String, def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -234,7 +235,7 @@ class Partition(val topic: String, } def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) val replicaOpt = getReplica(replicaId) if(!replicaOpt.isDefined) { @@ -270,7 +271,7 @@ class Partition(val topic: String, } def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => val numAcks = inSyncReplicas.count(r => { @@ -314,7 +315,7 @@ class Partition(val topic: String, } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { leaderReplicaIfLocal() match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) @@ -356,7 +357,7 @@ class Partition(val topic: String, } def appendMessagesToLeader(messages: ByteBufferMessageSet) = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val leaderReplicaOpt = leaderReplicaIfLocal() leaderReplicaOpt match { case Some(leaderReplica) => @@ -402,7 +403,7 @@ class Partition(val topic: String, } override def toString(): String = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { val partitionString = new StringBuilder partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index fecee8d..d6d02a4 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -36,6 +36,8 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message def validBytes: Int = underlying.validBytes + def numMessages: Int = underlying.numMessages + override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] { val underlyingIterator = underlying.iterator override def hasNext(): Boolean = { diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index b2652dd..1e4c553 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -202,7 +202,17 @@ class FileMessageSet private[kafka](@volatile var file: File, * The number of bytes taken up by this file set */ def sizeInBytes(): Int = _size.get() - + + /** + * The number of messages in this file set + */ + def numMessages(): Int = { + var messages = 0 + for (messageAndOffset <- this) + messages += 1 + messages + } + /** * Append these messages to the message set */ diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 46df8d9..3ad1124 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -235,7 +235,7 @@ class Log(val dir: File, return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log - var validMessages = trimInvalidBytes(messages) + var validMessages = trimInvalidBytes(messages, appendInfo) try { // they are valid, insert them in the log @@ -246,7 +246,7 @@ class Log(val dir: File, val segment = maybeRoll() if(assignOffsets) { - // assign offsets to the messageset + // assign offsets to the message set val offset = new AtomicLong(nextOffset.get) try { validMessages = validMessages.assignOffsets(offset, appendInfo.codec) @@ -260,8 +260,7 @@ class Log(val dir: File, throw new IllegalArgumentException("Out of order offsets found in " + messages) } - // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison - // happens with the new message size (after re-compression, if any) + // Re-check if the message sizes are valid after possible re-compression. for(messageAndOffset <- validMessages.shallowIterator) { if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." @@ -290,15 +289,18 @@ class Log(val dir: File, /** Struct to hold various quantities we compute about each message set before appending to the log * @param firstOffset The first offset in the message set * @param lastOffset The last offset in the message set + * @param shallowCount The number of shallow messages + * @param bytesCount The number of total bytes * @param codec The codec used in the message set * @param offsetsMonotonic Are the offsets in this message set monotonically increasing */ - case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, offsetsMonotonic: Boolean) + case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, bytesCount: Int, offsetsMonotonic: Boolean) /** * Validate the following: *
    *
  1. each message matches its CRC + *
  2. each message size is valid *
* * Also compute the following quantities: @@ -306,12 +308,14 @@ class Log(val dir: File, *
  • First offset in the message set *
  • Last offset in the message set *
  • Number of messages + *
  • Number of valid bytes *
  • Whether the offsets are monotonically increasing *
  • Whether any compression codec is used (if many are used, then the last one is given) * */ private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = { var messageCount = 0 + var bytesCount = 0 var firstOffset, lastOffset = -1L var codec: CompressionCodec = NoCompressionCodec var monotonic = true @@ -328,22 +332,31 @@ class Log(val dir: File, // check the validity of the message by checking CRC val m = messageAndOffset.message m.ensureValid() - messageCount += 1; + + // Check if the message sizes are valid. + val messageSize = MessageSet.entrySize(m) + if(messageSize > config.maxMessageSize) + throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." + .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) + + messageCount += 1 + bytesCount += messageSize val messageCodec = m.compressionCodec if(messageCodec != NoCompressionCodec) codec = messageCodec } - LogAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic) + LogAppendInfo(firstOffset, lastOffset, codec, messageCount, bytesCount, monotonic) } /** * Trim any invalid bytes from the end of this message set (if there are any) * @param messages The message set to trim + * @param info The general information of the message set * @return A trimmed message set. This may be the same as what was passed in or it may not. */ - private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = { - val messageSetValidBytes = messages.validBytes + private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = { + val messageSetValidBytes = info.bytesCount if(messageSetValidBytes < 0) throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests") if(messageSetValidBytes == messages.sizeInBytes) { diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 73401c5..cf65063 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -97,6 +97,7 @@ object ByteBufferMessageSet { */ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet with Logging { private var shallowValidByteCount = -1 + private var shallowMessageCount = -1 def this(compressionCodec: CompressionCodec, messages: Message*) { this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*)) @@ -122,7 +123,20 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message } shallowValidByteCount } - + + private def shallowMessages: Int = { + if(shallowMessageCount < 0) { + var messages = 0 + val iter = this.internalIterator(true) + while(iter.hasNext) { + iter.next + messages += 1 + } + this.shallowMessageCount = messages + } + shallowMessageCount + } + /** Write the messages in this set to the given channel */ def writeTo(channel: GatheringByteChannel, offset: Long, size: Int): Int = { // Ignore offset and size from input. We just want to write the whole buffer to the channel. @@ -232,6 +246,11 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message def validBytes: Int = shallowValidBytes /** + * The total number of shallow messages in this message set not including any partial, trailing messages + */ + def numMessages: Int = shallowMessages + + /** * Two message sets are equal if their respective byte buffers are equal */ override def equals(other: Any): Boolean = { diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index a1b5c63..d3c7cab 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -82,14 +82,9 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { def sizeInBytes: Int /** - * Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't - * match the payload for any message. + * Gives the total number of messages in the set */ - def validate(): Unit = { - for(messageAndOffset <- this) - if(!messageAndOffset.message.isValid) - throw new InvalidMessageException - } + def numMessages(): Int /** * Print this message set's contents. If the message set has more than 100 messages, just diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb0359d..0ad9b03 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,19 +17,21 @@ package kafka.server -import kafka.admin.AdminUtils import kafka.api._ +import kafka.common._ +import kafka.log._ import kafka.message._ import kafka.network._ -import kafka.log._ -import scala.collection._ -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ +import kafka.admin.AdminUtils import kafka.metrics.KafkaMetricsGroup -import kafka.common._ -import kafka.utils.{Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response import kafka.controller.KafkaController +import kafka.utils.{Pool, SystemTime, Logging} + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic._ +import scala.collection._ + import org.I0Itec.zkclient.ZkClient /** @@ -284,9 +286,11 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data trace("Append [%s] to local log ".format(partitionAndData.toString)) partitionAndData.map {case (topicAndPartition, messages) => - // update stats for incoming bytes rate + // update stats for incoming bytes rate and message rate BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(messages.numMessages) + BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messages.numMessages) try { val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) @@ -297,13 +301,10 @@ class KafkaApis(val requestChannel: RequestChannel, .format(topicAndPartition, brokerId)) } - val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) // update stats for successfully appended messages BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).logBytesAppendRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.logBytesAppendRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))