diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c08eab0..518d2df 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -16,22 +16,23 @@ */ package kafka.cluster -import scala.collection._ +import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils._ -import java.lang.Object +import kafka.utils.{ZkUtils, Pool, Time, Logging} +import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig import kafka.server.{OffsetManager, ReplicaManager} -import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController -import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet -import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping} + import java.io.IOException +import java.util.concurrent.locks.ReentrantReadWriteLock import scala.Some -import kafka.common.TopicAndPartition +import scala.collection._ + +import com.yammer.metrics.core.Gauge /** @@ -48,7 +49,7 @@ class Partition(val topic: String, var leaderReplicaIdOpt: Option[Int] = None var inSyncReplicas: Set[Replica] = Set.empty[Replica] private val assignedReplicaMap = new Pool[Int,Replica] - private val leaderIsrUpdateLock = new Object + private val leaderIsrUpdateLock = new ReentrantReadWriteLock() private var zkVersion: Int = LeaderAndIsr.initialZKVersion private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. @@ -72,7 +73,7 @@ class Partition(val topic: String, ) def isUnderReplicated(): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => inSyncReplicas.size < assignedReplicas.size @@ -114,7 +115,7 @@ class Partition(val topic: String, } def leaderReplicaIfLocal(): Option[Replica] = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIdOpt match { case Some(leaderReplicaId) => if (leaderReplicaId == localBrokerId) @@ -140,7 +141,7 @@ class Partition(val topic: String, def delete() { // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None @@ -155,7 +156,7 @@ class Partition(val topic: String, } def getLeaderEpoch(): Int = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { return this.leaderEpoch } } @@ -167,7 +168,7 @@ class Partition(val topic: String, def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -200,7 +201,7 @@ class Partition(val topic: String, def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -234,7 +235,7 @@ class Partition(val topic: String, } def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) val replicaOpt = getReplica(replicaId) if(!replicaOpt.isDefined) { @@ -270,7 +271,7 @@ class Partition(val topic: String, } def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { leaderReplicaIfLocal() match { case Some(_) => val numAcks = inSyncReplicas.count(r => { @@ -314,7 +315,7 @@ class Partition(val topic: String, } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.writeLock()) { leaderReplicaIfLocal() match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) @@ -356,7 +357,7 @@ class Partition(val topic: String, } def appendMessagesToLeader(messages: ByteBufferMessageSet) = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { val leaderReplicaOpt = leaderReplicaIfLocal() leaderReplicaOpt match { case Some(leaderReplica) => @@ -402,7 +403,7 @@ class Partition(val topic: String, } override def toString(): String = { - leaderIsrUpdateLock synchronized { + inLock(leaderIsrUpdateLock.readLock()) { val partitionString = new StringBuilder partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 46df8d9..9165db8 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -17,15 +17,18 @@ package kafka.log -import java.io.{IOException, File} -import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} -import java.util.concurrent.atomic._ import kafka.utils._ -import scala.collection.JavaConversions -import java.text.NumberFormat import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup +import kafka.server.BrokerTopicStats + +import java.io.{IOException, File} +import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} +import java.util.concurrent.atomic._ +import java.text.NumberFormat +import scala.collection.JavaConversions + import com.yammer.metrics.core.Gauge @@ -235,7 +238,7 @@ class Log(val dir: File, return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log - var validMessages = trimInvalidBytes(messages) + var validMessages = trimInvalidBytes(messages, appendInfo) try { // they are valid, insert them in the log @@ -246,7 +249,7 @@ class Log(val dir: File, val segment = maybeRoll() if(assignOffsets) { - // assign offsets to the messageset + // assign offsets to the message set val offset = new AtomicLong(nextOffset.get) try { validMessages = validMessages.assignOffsets(offset, appendInfo.codec) @@ -260,12 +263,14 @@ class Log(val dir: File, throw new IllegalArgumentException("Out of order offsets found in " + messages) } - // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison - // happens with the new message size (after re-compression, if any) + // re-validate message sizes since after re-compression some may exceed the limit for(messageAndOffset <- validMessages.shallowIterator) { - if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) + if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) + } } // now append to the log @@ -287,18 +292,22 @@ class Log(val dir: File, } } - /** Struct to hold various quantities we compute about each message set before appending to the log + /** + * Struct to hold various quantities we compute about each message set before appending to the log * @param firstOffset The first offset in the message set * @param lastOffset The last offset in the message set + * @param shallowCount The number of shallow messages + * @param validBytes The number of valid bytes * @param codec The codec used in the message set * @param offsetsMonotonic Are the offsets in this message set monotonically increasing */ - case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, offsetsMonotonic: Boolean) + case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) /** * Validate the following: *
    *
  1. each message matches its CRC + *
  2. each message size is valid *
* * Also compute the following quantities: @@ -306,12 +315,14 @@ class Log(val dir: File, *
  • First offset in the message set *
  • Last offset in the message set *
  • Number of messages + *
  • Number of valid bytes *
  • Whether the offsets are monotonically increasing *
  • Whether any compression codec is used (if many are used, then the last one is given) * */ private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = { - var messageCount = 0 + var shallowMessageCount = 0 + var validBytesCount = 0 var firstOffset, lastOffset = -1L var codec: CompressionCodec = NoCompressionCodec var monotonic = true @@ -325,25 +336,38 @@ class Log(val dir: File, // update the last offset seen lastOffset = messageAndOffset.offset - // check the validity of the message by checking CRC val m = messageAndOffset.message + + // Check if the message sizes are valid. + val messageSize = MessageSet.entrySize(m) + if(messageSize > config.maxMessageSize) { + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." + .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) + } + + // check the validity of the message by checking CRC m.ensureValid() - messageCount += 1; + + shallowMessageCount += 1 + validBytesCount += messageSize val messageCodec = m.compressionCodec if(messageCodec != NoCompressionCodec) codec = messageCodec } - LogAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic) + LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic) } /** * Trim any invalid bytes from the end of this message set (if there are any) * @param messages The message set to trim + * @param info The general information of the message set * @return A trimmed message set. This may be the same as what was passed in or it may not. */ - private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = { - val messageSetValidBytes = messages.validBytes + private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = { + val messageSetValidBytes = info.validBytes if(messageSetValidBytes < 0) throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests") if(messageSetValidBytes == messages.sizeInBytes) { diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index a1b5c63..f1b8432 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -80,17 +80,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { * Gives the total size of this message set in bytes */ def sizeInBytes: Int - - /** - * Validate the checksum of all the messages in the set. Throws an InvalidMessageException if the checksum doesn't - * match the payload for any message. - */ - def validate(): Unit = { - for(messageAndOffset <- this) - if(!messageAndOffset.message.isValid) - throw new InvalidMessageException - } - + /** * Print this message set's contents. If the message set has more than 100 messages, just * print the first 100. diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb0359d..0b668f2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,19 +17,21 @@ package kafka.server -import kafka.admin.AdminUtils import kafka.api._ +import kafka.common._ +import kafka.log._ import kafka.message._ import kafka.network._ -import kafka.log._ -import scala.collection._ -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ +import kafka.admin.AdminUtils import kafka.metrics.KafkaMetricsGroup -import kafka.common._ -import kafka.utils.{Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response import kafka.controller.KafkaController +import kafka.utils.{Pool, SystemTime, Logging} + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic._ +import scala.collection._ + import org.I0Itec.zkclient.ZkClient /** @@ -284,10 +286,6 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data trace("Append [%s] to local log ".format(partitionAndData.toString)) partitionAndData.map {case (topicAndPartition, messages) => - // update stats for incoming bytes rate - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) - try { val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = @@ -297,11 +295,12 @@ class KafkaApis(val requestChannel: RequestChannel, .format(topicAndPartition, brokerId)) } + val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) - // update stats for successfully appended messages - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).logBytesAppendRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.logBytesAppendRate.mark(messages.sizeInBytes) + // update stats for successfully appended bytes and messages as bytesInRate and messageInRate + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index f11f6e2..00bcc06 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -97,7 +97,7 @@ class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup { val messagesInRate = newMeter(name + "MessagesInPerSec", "messages", TimeUnit.SECONDS) val bytesInRate = newMeter(name + "BytesInPerSec", "bytes", TimeUnit.SECONDS) val bytesOutRate = newMeter(name + "BytesOutPerSec", "bytes", TimeUnit.SECONDS) - val logBytesAppendRate = newMeter(name + "LogBytesAppendedPerSec", "bytes", TimeUnit.SECONDS) + val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec", "bytes", TimeUnit.SECONDS) val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS) val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS) }