diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 518d2df..7de911a 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -21,11 +21,11 @@ import kafka.admin.AdminUtils import kafka.utils.{ZkUtils, Pool, Time, Logging} import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} -import kafka.log.LogConfig +import kafka.log.{FileMessageSet, LogConfig} import kafka.server.{OffsetManager, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController -import kafka.message.ByteBufferMessageSet +import kafka.message.{MessageSet, ByteBufferMessageSet} import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock @@ -307,11 +307,21 @@ class Partition(val topic: String, val oldHighWatermark = leaderReplica.highWatermark if(newHighWatermark > oldHighWatermark) { leaderReplica.highWatermark = newHighWatermark - debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark)) + debug("High watermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark)) + // search for the corresponding log segment position and base offset, + // if the HW is the end offset, use the log end position + val newHighWatermarkInfo = leaderReplica.log.get.read(newHighWatermark, 1, None) + if (newHighWatermarkInfo.equals(MessageSet.Empty)) { + leaderReplica.logHWPosition.set(leaderReplica.logActiveSegmentSize.get()) + leaderReplica.logHWSegmentBaseOffset.set(leaderReplica.logActiveSegmentBaseOffset.get()) + } else { + leaderReplica.logHWPosition.set(newHighWatermarkInfo.asInstanceOf[FileMessageSet].startPosition()) + leaderReplica.logHWSegmentBaseOffset.set(newHighWatermarkInfo.asInstanceOf[FileMessageSet].baseOffset()) + } + } else { + warn("Skipping update high watermark since Old hw %d is larger than new hw is %d for partition [%s,%d]. All leo's are %s" + .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) } - else - debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are %s" - .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { @@ -363,6 +373,8 @@ class Partition(val topic: String, case Some(leaderReplica) => val log = leaderReplica.log.get val info = log.append(messages, assignOffsets = true) + leaderReplica.logActiveSegmentSize.set(info.endFilePos) + leaderReplica.logActiveSegmentBaseOffset.set(log.activeSegment.baseOffset) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) info diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 5e659b4..889b3e0 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -21,18 +21,29 @@ import kafka.log.Log import kafka.utils.{SystemTime, Time, Logging} import kafka.common.KafkaException import kafka.server.ReplicaManager -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} class Replica(val brokerId: Int, val partition: Partition, time: Time = SystemTime, initialHighWatermarkValue: Long = 0L, val log: Option[Log] = None) extends Logging { - //only defined in local replica - private[this] var highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue) - // only used for remote replica; logEndOffsetValue for local replica is kept in log - private[this] var logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset) - private[this] var logEndOffsetUpdateTimeMsValue: AtomicLong = new AtomicLong(time.milliseconds) + // the high watermark offset value, only defined in local replica + private[this] val highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue) + // the high watermark offset's corresponding file position, only defined in local replica + private[this] val logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset) + // the time when log offset is updated + private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds) + + // the log position of the high watermark, only kept in local replica + val logHWPosition = new AtomicInteger(ReplicaManager.UnknownLogPosition) + // the base offset of the log segment containing the high watermark, only kept in local replica + val logHWSegmentBaseOffset = new AtomicLong(ReplicaManager.UnknownlogSegmentBaseOffset) + // the log end position value, only kept in local replica + val logActiveSegmentSize = new AtomicInteger(ReplicaManager.UnknownLogPosition) + // the base offset of the last segment in the log, only kept in local replica + val logActiveSegmentBaseOffset = new AtomicLong(ReplicaManager.UnknownlogSegmentBaseOffset) + val topic = partition.topic val partitionId = partition.partitionId @@ -45,7 +56,6 @@ class Replica(val brokerId: Int, } else throw new KafkaException("Shouldn't set logEndOffset for replica %d partition [%s,%d] since it's local" .format(brokerId, topic, partitionId)) - } def logEndOffset = { @@ -54,7 +64,9 @@ class Replica(val brokerId: Int, else logEndOffsetValue.get() } - + + def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get() + def isLocal: Boolean = { log match { case Some(l) => true @@ -62,8 +74,6 @@ class Replica(val brokerId: Int, } } - def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get() - def highWatermark_=(newHighWatermark: Long) { if (isLocal) { trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d" diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index b2652dd..df6ec24 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -202,6 +202,16 @@ class FileMessageSet private[kafka](@volatile var file: File, * The number of bytes taken up by this file set */ def sizeInBytes(): Int = _size.get() + + /** + * Returns the start position in the segment file + */ + def startPosition(): Int = start + + /** + * Returns the base offset of the segment file + */ + def baseOffset() = file.getName().split("\\.")(0).toLong /** * Append these messages to the message set diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b7bc5ff..e6a536e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -278,8 +278,9 @@ class Log(val dir: File, } } - // now append to the log + // now append to the log and record the file end position as the segment file size segment.append(appendInfo.firstOffset, validMessages) + appendInfo.endFilePos = segment.log.sizeInBytes() // increment the log end offset nextOffset.set(appendInfo.lastOffset + 1) @@ -303,11 +304,12 @@ class Log(val dir: File, * @param lastOffset The last offset in the message set * @param shallowCount The number of shallow messages * @param validBytes The number of valid bytes + * @param endFilePos The end file position of the last appended message * @param codec The codec used in the message set * @param offsetsMonotonic Are the offsets in this message set monotonically increasing */ - case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) - + case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, var endFilePos: Int, offsetsMonotonic: Boolean) + /** * Validate the following: *
    @@ -362,7 +364,7 @@ class Log(val dir: File, if(messageCodec != NoCompressionCodec) codec = messageCodec } - LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic) + LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, -1, monotonic) } /** @@ -412,14 +414,15 @@ class Log(val dir: File, // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log while(entry != null) { - val messages = entry.getValue.read(startOffset, maxOffset, maxLength) - if(messages == null) + val messageSet = entry.getValue.read(startOffset, maxOffset, maxLength) + if(messageSet == null) entry = segments.higherEntry(entry.getKey) else - return messages + return messageSet } - // okay we are beyond the end of the last segment but less than the log end offset + // okay we are beyond the end of the last segment but less than the target offset, + // so return the empty message set MessageSet.Empty } @@ -433,7 +436,7 @@ class Log(val dir: File, // find any segments that match the user-supplied predicate UNLESS it is the final segment // and it is empty (since we would just end up re-creating it val lastSegment = activeSegment - var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) + val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) val numToDelete = deletable.size if(numToDelete > 0) { lock synchronized { diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 2faa196..01fc306 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -315,7 +315,6 @@ private[log] class Cleaner(val id: Int, * @param log The log being cleaned * @param segments The group of segments being cleaned * @param map The offset map to use for cleaning segments - * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet * @param deleteHorizonMs The time to retain delete tombstones */ private[log] def cleanSegments(log: Log, diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0d6926e..2af434f 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -99,7 +99,7 @@ class LogSegment(val log: FileMessageSet, val mapping = index.lookup(offset) log.searchFor(offset, max(mapping.position, startingFilePosition)) } - + /** * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. @@ -108,7 +108,7 @@ class LogSegment(val log: FileMessageSet, * @param maxSize The maximum number of bytes to include in the message set we read * @param maxOffset An optional maximum offset for the message set we read * - * @return The message set read or null if the startOffset is larger than the largest offset in this log. + * @return The message set read or null if the startOffset is larger than the largest offset in this log */ @threadsafe def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 3b15254..7846dda 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -92,12 +92,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke val partitionsWithError = new mutable.HashSet[TopicAndPartition] var response: FetchResponse = null try { - trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) + trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { case t: Throwable => if (isRunning.get) { - warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.getMessage)) + warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0b668f2..53de1e8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -29,7 +29,6 @@ import kafka.controller.KafkaController import kafka.utils.{Pool, SystemTime, Logging} import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ import scala.collection._ import org.I0Itec.zkclient.ZkClient @@ -127,22 +126,6 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) } - /** - * Check if a partitionData from a produce request can unblock any - * DelayedFetch requests. - */ - def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) { - val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes) - trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) - - // send any newly unblocked responses - for(fetchReq <- satisfied) { - val topicData = readMessageSets(fetchReq.fetch) - val response = FetchResponse(fetchReq.fetch.correlationId, topicData) - requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) - } - } - private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = { val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map { case (topicAndPartition, offset) => @@ -171,24 +154,21 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle a produce request or offset commit request (which is really a specialized producer request) */ def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { - - val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) { - val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) - } - else { - (request.requestObj.asInstanceOf[ProducerRequest], None) - } + val (produceRequest, offsetCommitRequestOpt) = + if (request.requestId == RequestKeys.OffsetCommitKey) { + val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) + } else { + (request.requestObj.asInstanceOf[ProducerRequest], None) + } val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(produceRequest) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) + val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) val numPartitionsInError = localProduceResults.count(_.error.isDefined) - produceRequest.data.foreach(partitionAndData => - maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes)) - val allPartitionHaveReplicationFactorOne = !produceRequest.data.keySet.exists( m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) @@ -235,29 +215,22 @@ class KafkaApis(val requestChannel: RequestChannel, val delayedRequest = new DelayedProduce( producerRequestKeys, request, - statuses, - produceRequest, produceRequest.ackTimeoutMs.toLong, + produceRequest, + statuses, offsetCommitRequestOpt) - producerRequestPurgatory.watch(delayedRequest) - /* * Replica fetch requests may have arrived (and potentially satisfied) * delayedProduce requests while they were being added to the purgatory. - * Here, we explicitly check if any of them can be satisfied. + * In this case, respond immediately */ - var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] - producerRequestKeys.foreach(key => - satisfiedProduceRequests ++= - producerRequestPurgatory.update(key, key)) - debug(satisfiedProduceRequests.size + - " producer requests unblocked during produce to local log.") - satisfiedProduceRequests.foreach(_.respond()) - - // we do not need the data anymore - produceRequest.emptyData() + if (!producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)) + delayedRequest.respond() } + + // we do not need the data anymore + produceRequest.emptyData() } case class DelayedProduceResponseStatus(requiredOffset: Long, @@ -288,13 +261,12 @@ class KafkaApis(val requestChannel: RequestChannel, partitionAndData.map {case (topicAndPartition, messages) => try { val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) - val info = - partitionOpt match { - case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) - case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" - .format(topicAndPartition, brokerId)) - - } + val info = partitionOpt match { + case Some(partition) => + partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) + case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" + .format(topicAndPartition, brokerId)) + } val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) @@ -304,6 +276,13 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + // some fetch request may be satisified after local log append + // note for some other cases: + // 1. broker make leader + // 2. leader shrink ISR + // HW may also be moved but we do not need to check for delayed fetch requests + unblockDelayedFetchRequests(new RequestKey(topicAndPartition)) + trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset) @@ -338,25 +317,22 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - if(fetchRequest.isFromFollower) { + // if the fetch request comes from the follower, check if the partition HW can be updated + if(fetchRequest.isFromFollower) maybeUpdatePartitionHw(fetchRequest) - // after updating HW, some delayed produce requests may be unblocked - var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] - fetchRequest.requestInfo.foreach { - case (topicAndPartition, _) => - val key = new RequestKey(topicAndPartition) - satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key) - } - debug("Replica %d fetch unblocked %d producer requests." - .format(fetchRequest.replicaId, satisfiedProduceRequests.size)) - satisfiedProduceRequests.foreach(_.respond()) - } val dataRead = readMessageSets(fetchRequest) val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum + val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, currDataResponse) => + errorIncurred || (currDataResponse.error != ErrorMapping.NoError)) + // send the data immediately if 1) fetch request does not want to wait + // 2) fetch request does not require any data + // 3) has enough data to respond + // 4) some error happens while reading data if(fetchRequest.maxWait <= 0 || + fetchRequest.numPartitions <= 0 || bytesReadable >= fetchRequest.minBytes || - fetchRequest.numPartitions <= 0) { + errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" .format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) val response = new FetchResponse(fetchRequest.correlationId, dataRead) @@ -366,17 +342,40 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable) - fetchRequestPurgatory.watch(delayedFetch) + val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.map { + case (topicAndPartition, partitionData) => + if (partitionData.messages.isInstanceOf[FileMessageSet]) + topicAndPartition -> DelayedFetchRequestStatus( + partitionData.messages.asInstanceOf[FileMessageSet].baseOffset(), + partitionData.messages.asInstanceOf[FileMessageSet].startPosition()) + else + topicAndPartition -> DelayedFetchRequestStatus(-1, -1) + }) + + /* + * Replica fetch requests may have arrived (and potentially satisfied) + * delayedFetch requests while they were being added to the purgatory. + * In this case, respond immediately + */ + fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) } } private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) { debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest)) - fetchRequest.requestInfo.foreach(info => { - val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset) - replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) - }) + fetchRequest.requestInfo.foreach { + case (topicAndPartition, partitionFetchInfo) => + replicaManager.recordFollowerPosition(topicAndPartition.topic, + topicAndPartition.partition, fetchRequest.replicaId, partitionFetchInfo.offset) + + // after updating HW, some delayed produce and fetch requests may be unblocked; + // note for some other cases: + // 1. broker make leader + // 2. leader shrink ISR + // HW may also be moved but we do not need to check for delayed fetch requests + unblockDelayedProduceRequests(new RequestKey(topicAndPartition)) + unblockDelayedFetchRequests(new RequestKey(topicAndPartition)) + } } /** @@ -390,16 +389,14 @@ class KafkaApis(val requestChannel: RequestChannel, case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => val partitionData = try { - val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) - BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes) - if (!isFetchFromFollower) { - new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) - } else { + val (messageSet, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) + BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messageSet.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messageSet.sizeInBytes) + if (isFetchFromFollower) { debug("Leader %d for partition [%s,%d] received fetch request from follower %d" .format(brokerId, topic, partition, fetchRequest.replicaId)) - new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) } + new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messageSet) } catch { // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request @@ -442,14 +439,14 @@ class KafkaApis(val requestChannel: RequestChannel, None else Some(localReplica.highWatermark) - val messages = localReplica.log match { + val messageSet = localReplica.log match { case Some(log) => log.read(offset, maxSize, maxOffsetOpt) case None => error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic, partition, brokerId)) MessageSet.Empty } - (messages, localReplica.highWatermark) + (messageSet, localReplica.highWatermark) } /** @@ -636,6 +633,29 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + def unblockDelayedProduceRequests(key: RequestKey) { + val satisfied = producerRequestPurgatory.update(key) + debug("Request key %s unblocked %d producer requests." + .format(key.keyLabel, satisfied.size)) + satisfied.foreach(_.respond()) + } + + /** + * Check if a partitionData from a produce request can unblock any + * DelayedFetch requests. + */ + def unblockDelayedFetchRequests(key: RequestKey) { + val satisfied = fetchRequestPurgatory.update(key) + debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size)) + + // send any newly unblocked responses + for(fetchReq <- satisfied) { + fetchReq.respond() + } + } + + + def close() { debug("Shutting down.") fetchRequestPurgatory.shutdown() @@ -660,56 +680,99 @@ class KafkaApis(val requestChannel: RequestChannel, override def keyLabel = "%s-%d".format(topic, partition) } + case class DelayedFetchRequestStatus(segmentBaseOffset: Long, fetchOffsetPosition: Int) + /** - * A delayed fetch request + * A delayed fetch request, which is satisified (or more + * accurately, unblocked) -- if: + * Case A: This broker is no longer the leader for ANY of the partitions it tries to fetch + * - should return error. + * Case B: The fetch offset locates not on the last segment of the log + * - should return all the data on that segment. + * Case C: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * - should return whatever data is available. */ - class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) + class DelayedFetch(keys: Seq[RequestKey], + request: RequestChannel.Request, + delayMs: Long, + val fetch: FetchRequest, + val partitionFetchInfo: immutable.Map[TopicAndPartition, DelayedFetchRequestStatus]) extends DelayedRequest(keys, request, delayMs) { - val bytesAccumulated = new AtomicLong(initialSize) + + def isSatisfied() : Boolean = { + var accumulatedSize = 0 + val fromFollower = fetch.isFromFollower + try { + partitionFetchInfo.foreach { + case (topicAndPartition, requestStatus) => + if (requestStatus.fetchOffsetPosition >= 0) { + val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) + val fetchableSegmentBaseOffset = + if (fromFollower) replica.logActiveSegmentBaseOffset.get() + else replica.logHWSegmentBaseOffset.get() + if (fetchableSegmentBaseOffset > requestStatus.segmentBaseOffset) { + debug("Satisfying fetch request %s since it is fetching inactive segments.".format(fetch)) + return true + } + val fetchableEndPosition = + if (fromFollower) replica.logActiveSegmentSize.get() + else replica.logHWPosition.get() + accumulatedSize += fetchableEndPosition - requestStatus.fetchOffsetPosition + } + } + } catch { + case nle: NotLeaderForPartitionException => + debug("Satisfying fetch request %s since leader has changed.".format(fetch)) + return true + } + + // unblocked if there are enough accumulated data + accumulatedSize >= fetch.minBytes + } + + def respond() { + val topicData = readMessageSets(fetch) + val response = FetchResponse(fetch.correlationId, topicData) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) + } } /** * A holding pen for fetch requests waiting to be satisfied */ class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int) - extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) { + extends RequestPurgatory[DelayedFetch](brokerId, purgeInterval) { this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId) /** - * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field + * Check if a specified delayed fetch request is satisfied */ - def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = { - val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes) - accumulatedSize >= delayedFetch.fetch.minBytes - } + def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied() /** * When a request expires just answer it with whatever data is present */ def expire(delayed: DelayedFetch) { debug("Expiring fetch request %s.".format(delayed.fetch)) - try { - val topicData = readMessageSets(delayed.fetch) - val response = FetchResponse(delayed.fetch.correlationId, topicData) - val fromFollower = delayed.fetch.isFromFollower - delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) - requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) - } - catch { - case e1: LeaderNotAvailableException => - debug("Leader changed before fetch request %s expired.".format(delayed.fetch)) - case e2: UnknownTopicOrPartitionException => - debug("Replica went offline before fetch request %s expired.".format(delayed.fetch)) - } + val fromFollower = delayed.fetch.isFromFollower + delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) + delayed.respond() } } + /** A delayed produce request, which is satisified (or more + * accurately, unblocked) -- if for every partition it produce to: + * Case A: This broker is not the leader: unblock - should return error. + * Case B: This broker is the leader: + * B.1 - If there was a localError (when writing to the local log): unblock - should return error + * B.2 - else, at least requiredAcks replicas should be caught up to this request. + */ class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, - val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus], - produce: ProducerRequest, delayMs: Long, - offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) + val produce: ProducerRequest, + val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus], + val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) extends DelayedRequest(keys, request, delayMs) with Logging { // first update the acks pending variable according to the error code @@ -745,47 +808,34 @@ class KafkaApis(val requestChannel: RequestChannel, request, new BoundedByteBufferSend(response))) } - /** - * Returns true if this delayed produce request is satisfied (or more - * accurately, unblocked) -- this is the case if for every partition: - * Case A: This broker is not the leader: unblock - should return error. - * Case B: This broker is the leader: - * B.1 - If there was a localError (when writing to the local log): unblock - should return error - * B.2 - else, at least requiredAcks replicas should be caught up to this request. - * - * As partitions become acknowledged, we may be able to unblock - * DelayedFetchRequests that are pending on those partitions. - */ - def isSatisfied(followerFetchRequestKey: RequestKey) = { - val topic = followerFetchRequestKey.topic - val partitionId = followerFetchRequestKey.partition - val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId)) - trace("Checking producer request satisfaction for %s-%d, acksPending = %b" - .format(topic, partitionId, fetchPartitionStatus.acksPending)) - if (fetchPartitionStatus.acksPending) { - val partitionOpt = replicaManager.getPartition(topic, partitionId) - val (hasEnough, errorCode) = partitionOpt match { - case Some(partition) => - partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks) - case None => - (false, ErrorMapping.UnknownTopicOrPartitionCode) - } - if (errorCode != ErrorMapping.NoError) { - fetchPartitionStatus. acksPending = false - fetchPartitionStatus.status.error = errorCode - } else if (hasEnough) { - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.status.error = ErrorMapping.NoError - } - if (!fetchPartitionStatus.acksPending) { - val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition) - maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes) + def isSatisfied() = { + // check for each partition if it still has pending acks + partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) => + trace("Checking producer request satisfaction for %s, acksPending = %b" + .format(topicAndPartition, fetchPartitionStatus.acksPending)) + // skip those partitions that have already been satisfied + if (fetchPartitionStatus.acksPending) { + val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) + val (hasEnough, errorCode) = partitionOpt match { + case Some(partition) => + partition.checkEnoughReplicasReachOffset( + fetchPartitionStatus.requiredOffset, + produce.requiredAcks) + case None => + (false, ErrorMapping.UnknownTopicOrPartitionCode) + } + if (errorCode != ErrorMapping.NoError) { + fetchPartitionStatus.acksPending = false + fetchPartitionStatus.status.error = errorCode + } else if (hasEnough) { + fetchPartitionStatus.acksPending = false + fetchPartitionStatus.status.error = ErrorMapping.NoError + } } } // unblocked if there are no partitions with pending acks val satisfied = ! partitionStatus.exists(p => p._2.acksPending) - trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied)) satisfied } } @@ -794,12 +844,11 @@ class KafkaApis(val requestChannel: RequestChannel, * A holding pen for produce requests waiting to be satisfied. */ private [kafka] class ProducerRequestPurgatory(purgeInterval: Int) - extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) { + extends RequestPurgatory[DelayedProduce](brokerId, purgeInterval) { this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId) - protected def checkSatisfied(followerFetchRequestKey: RequestKey, - delayedProduce: DelayedProduce) = - delayedProduce.isSatisfied(followerFetchRequestKey) + protected def checkSatisfied(delayedProduce: DelayedProduce) = + delayedProduce.isSatisfied() /** * Handle an expired delayed request diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6a56a77..df811d1 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit object ReplicaManager { val UnknownLogEndOffset = -1L + val UnknownLogPosition = -1 + val UnknownlogSegmentBaseOffset = -1L val HighWatermarkFilename = "replication-offset-checkpoint" } diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index c064c5c..9092d7b 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -45,7 +45,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * * For us the key is generally a (topic, partition) pair. * By calling - * watch(delayedRequest) + * checkAndMaybeWatch(delayedRequest) * we will add triggers for each of the given keys. It is up to the user to then call * val satisfied = update(key, request) * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this @@ -61,18 +61,19 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * this function handles delayed requests that have hit their time limit without being satisfied. * */ -abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 10000) +abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 10000) extends Logging with KafkaMetricsGroup { /* a list of requests watching each key */ - private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key))) - private val requestCounter = new AtomicInteger(0) + /* the number of requests being watched, duplicates added on different watchers are also counted */ + private val watched = new AtomicInteger(0) newGauge( "PurgatorySize", new Gauge[Int] { - def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests + def value = watched.get() + expiredRequestReaper.numRequests } ) @@ -91,33 +92,39 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge /** * Add a new delayed request watching the contained keys */ - def watch(delayedRequest: T) { - requestCounter.getAndIncrement() - + def checkAndMaybeWatch(delayedRequest: T): Boolean = { for(key <- delayedRequest.keys) { - var lst = watchersFor(key) - lst.add(delayedRequest) + val lst = watchersFor(key) + // the request could already be satisified and + // hence not added to watchers; in this case we can + // return immediately without trying to add to rest watchers + if(!lst.add(delayedRequest)) + return false } + + // if it is indeed watched, add to the expire queue also expiredRequestReaper.enqueue(delayedRequest) + + true } /** * Update any watchers and return a list of newly satisfied requests. */ - def update(key: Any, request: R): Seq[T] = { + def update(key: Any): Seq[T] = { val w = watchersForKey.get(key) if(w == null) Seq.empty else - w.collectSatisfiedRequests(request) + w.collectSatisfiedRequests() } private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) /** - * Check if this request satisfied this delayed request + * Check if this delayed request is already satisfied */ - protected def checkSatisfied(request: R, delayed: T): Boolean + protected def checkSatisfied(request: T): Boolean /** * Handle an expired delayed request @@ -125,7 +132,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge protected def expire(delayed: T) /** - * Shutdown the expirey thread + * Shutdown the expire reaper thread */ def shutdown() { expiredRequestReaper.shutdown() @@ -135,16 +142,17 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge * A linked list of DelayedRequests watching some key with some associated * bookkeeping logic. */ - private class Watchers { - - + private class Watchers(key: Any) { private val requests = new util.ArrayList[T] - def numRequests = requests.size - - def add(t: T) { + def add(t: T): Boolean = { synchronized { + // atomically check satisfactory criteria and add to the watch list + if (checkSatisfied(t)) + return false requests.add(t) + watched.getAndIncrement() + return true } } @@ -156,6 +164,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge val curr = iter.next if(curr.satisfied.get()) { iter.remove() + watched.getAndIncrement() purged += 1 } } @@ -163,7 +172,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } } - def collectSatisfiedRequests(request: R): Seq[T] = { + def collectSatisfiedRequests(): Seq[T] = { val response = new mutable.ArrayBuffer[T] synchronized { val iter = requests.iterator() @@ -175,9 +184,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } else { // synchronize on curr to avoid any race condition with expire // on client-side. - val satisfied = curr synchronized checkSatisfied(request, curr) + val satisfied = curr synchronized checkSatisfied(curr) if(satisfied) { iter.remove() + watched.getAndIncrement() val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { response += curr @@ -216,8 +226,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge expire(curr) } } - if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge - requestCounter.set(0) + if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum @@ -266,10 +275,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } /** - * Delete all expired events from the delay queue + * Delete all satisfied events from the delay queue and the watcher lists */ private def purgeSatisfied(): Int = { var purged = 0 + + // purge the delayed queue val iter = delayed.iterator() while(iter.hasNext) { val curr = iter.next() @@ -278,6 +289,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge purged += 1 } } + + // purge the watcher lists + watchersForKey.values.foreach(_.purgeSatisfied()) + purged } } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1da1393..0d5315d 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -575,15 +575,15 @@ class LogTest extends JUnitSuite { @Test def testAppendMessageWithNullPayload() { - var log = new Log(logDir, + val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, time.scheduler, time) log.append(new ByteBufferMessageSet(new Message(bytes = null))) - val ms = log.read(0, 4096, None) - assertEquals(0, ms.head.offset) - assertTrue("Message payload should be null.", ms.head.message.isNull) + val messageSet = log.read(0, 4096, None) + assertEquals(0, messageSet.head.offset) + assertTrue("Message payload should be null.", messageSet.head.message.isNull) } @Test diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 4f61f84..081203a 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -46,17 +46,17 @@ class RequestPurgatoryTest extends JUnit3Suite { def testRequestSatisfaction() { val r1 = new DelayedRequest(Array("test1"), null, 100000L) val r2 = new DelayedRequest(Array("test2"), null, 100000L) - assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1", producerRequest1).size) - purgatory.watch(r1) - assertEquals("Still nothing satisfied", 0, purgatory.update("test1", producerRequest1).size) - purgatory.watch(r2) - assertEquals("Still nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) + assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) + assertTrue("r1 watched", purgatory.checkAndMaybeWatch(r1)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) + assertTrue("r2 watched", purgatory.checkAndMaybeWatch(r2)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) purgatory.satisfied += r1 - assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1", producerRequest1)) - assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size) + assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) + assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) purgatory.satisfied += r2 - assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2)) - assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) + assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) + assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) } @Test @@ -65,8 +65,8 @@ class RequestPurgatoryTest extends JUnit3Suite { val r1 = new DelayedRequest(Array("test1"), null, expiration) val r2 = new DelayedRequest(Array("test1"), null, 200000L) val start = System.currentTimeMillis - purgatory.watch(r1) - purgatory.watch(r2) + assertTrue("r1 watched", purgatory.checkAndMaybeWatch(r1)) + assertTrue("r2 watched", purgatory.checkAndMaybeWatch(r2)) purgatory.awaitExpiration(r1) val elapsed = System.currentTimeMillis - start assertTrue("r1 expired", purgatory.expired.contains(r1)) @@ -74,7 +74,7 @@ class RequestPurgatoryTest extends JUnit3Suite { assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) } - class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] { + class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] { val satisfied = mutable.Set[DelayedRequest]() val expired = mutable.Set[DelayedRequest]() def awaitExpiration(delayed: DelayedRequest) = { @@ -82,7 +82,7 @@ class RequestPurgatoryTest extends JUnit3Suite { delayed.wait() } } - def checkSatisfied(request: ProducerRequest, delayed: DelayedRequest): Boolean = satisfied.contains(delayed) + def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) def expire(delayed: DelayedRequest) { expired += delayed delayed synchronized {