diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index d117f10..90c03c6 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -150,8 +150,8 @@ object FetchResponse { } -case class FetchResponse(correlationId: Int, - data: Map[TopicAndPartition, FetchResponsePartitionData]) { +case class FetchResponse(val correlationId: Int, + val data: Map[TopicAndPartition, FetchResponsePartitionData]) { /** * Partitions the data into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 5a1d801..7110d6c 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -44,7 +44,7 @@ object ProducerResponse { case class ProducerResponseStatus(var error: Short, offset: Long) case class ProducerResponse(override val correlationId: Int, - status: Map[TopicAndPartition, ProducerResponseStatus]) + val status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse(correlationId = correlationId) { /** diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 518d2df..ef222a8 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,7 @@ import kafka.utils.{ZkUtils, Pool, Time, Logging} import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{OffsetManager, ReplicaManager} +import kafka.server.{TopicPartitionRequestKey, LogOffsetMetadata, OffsetManager, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -40,8 +40,8 @@ import com.yammer.metrics.core.Gauge */ class Partition(val topic: String, val partitionId: Int, - var replicationFactor: Int, - time: Time, + val replicationFactor: Int, + val time: Time, val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup { private val localBrokerId = replicaManager.config.brokerId private val logManager = replicaManager.logManager @@ -180,14 +180,17 @@ class Partition(val topic: String, val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet // remove assigned replicas that have been removed by the controller (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) - // reset LogEndOffset for remote replicas - assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) inSyncReplicas = newInSyncReplicas leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion leaderReplicaIdOpt = Some(localBrokerId) + // construct the high watermark metadata for the new leader replica + val newLeaderReplica = getReplica().get + newLeaderReplica.highWatermark = newLeaderReplica.log.get.find(newLeaderReplica.highWatermark.messageOffset) + // reset log end offset for remote replicas + assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata) // we may need to increment high watermark since ISR could be down to 1 - maybeIncrementLeaderHW(getReplica().get) + maybeIncrementLeaderHW(newLeaderReplica) if (topic == OffsetManager.OffsetsTopicName) offsetManager.loadOffsetsFromLog(partitionId) true @@ -234,18 +237,8 @@ class Partition(val topic: String, } } - def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { + def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) { inLock(leaderIsrUpdateLock.writeLock()) { - debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) - val replicaOpt = getReplica(replicaId) - if(!replicaOpt.isDefined) { - throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" + - " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, - offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) - } - val replica = replicaOpt.get - replica.logEndOffset = offset - // check if this replica needs to be added to the ISR leaderReplicaIfLocal() match { case Some(leaderReplica) => @@ -254,8 +247,10 @@ class Partition(val topic: String, // For a replica to get added back to ISR, it has to satisfy 3 conditions- // 1. It is not already in the ISR // 2. It is part of the assigned replica list. See KAFKA-1097 - // 3. It's log end offset >= leader's highwatermark - if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) { + // 3. It's log end offset >= leader's high watermark + if (!inSyncReplicas.contains(replica) + && assignedReplicas.map(_.brokerId).contains(replicaId) + && !replica.logEndOffset.precedes(leaderHW)) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" @@ -276,7 +271,7 @@ class Partition(val topic: String, case Some(_) => val numAcks = inSyncReplicas.count(r => { if (!r.isLocal) - r.logEndOffset >= requiredOffset + r.logEndOffset.messageOffset >= requiredOffset else true /* also count the local (leader) replica */ }) @@ -303,15 +298,18 @@ class Partition(val topic: String, */ private def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) - val newHighWatermark = allLogEndOffsets.min + val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.LogOrdering) val oldHighWatermark = leaderReplica.highWatermark - if(newHighWatermark > oldHighWatermark) { + if(oldHighWatermark.precedes(newHighWatermark)) { leaderReplica.highWatermark = newHighWatermark - debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark)) + debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) + // some delayed requests may be unblocked after HW changed + replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId)) + replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId)) + } else { + warn("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" + .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) } - else - debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are %s" - .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { @@ -350,7 +348,8 @@ class Partition(val topic: String, if(stuckReplicas.size > 0) debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above - val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages) + val slowReplicas = candidateReplicas.filter(r => r.logEndOffset.messageOffset >= 0 + && (leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset) > keepInSyncMessages) if(slowReplicas.size > 0) debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas @@ -363,6 +362,8 @@ class Partition(val topic: String, case Some(leaderReplica) => val log = leaderReplica.log.get val info = log.append(messages, assignOffsets = true) + // probably unblock some follower fetch requests since log end offset has been updated + replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) info diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 5e659b4..744d2fa 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -19,42 +19,27 @@ package kafka.cluster import kafka.log.Log import kafka.utils.{SystemTime, Time, Logging} -import kafka.common.KafkaException -import kafka.server.ReplicaManager +import kafka.server.LogOffsetMetadata + import java.util.concurrent.atomic.AtomicLong +import kafka.common.KafkaException class Replica(val brokerId: Int, val partition: Partition, time: Time = SystemTime, initialHighWatermarkValue: Long = 0L, val log: Option[Log] = None) extends Logging { - //only defined in local replica - private[this] var highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue) - // only used for remote replica; logEndOffsetValue for local replica is kept in log - private[this] var logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset) - private[this] var logEndOffsetUpdateTimeMsValue: AtomicLong = new AtomicLong(time.milliseconds) + // the high watermark offset value, in non-leader replicas only its message offsets are kept + @volatile private[this] var highWatermarkValue: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue) + // the log end offset value, kept in all replicas; + // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch + @volatile private[this] var logEndOffsetValue: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata + // the time when log offset is updated + private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds) + val topic = partition.topic val partitionId = partition.partitionId - def logEndOffset_=(newLogEndOffset: Long) { - if (!isLocal) { - logEndOffsetValue.set(newLogEndOffset) - logEndOffsetUpdateTimeMsValue.set(time.milliseconds) - trace("Setting log end offset for replica %d for partition [%s,%d] to %d" - .format(brokerId, topic, partitionId, logEndOffsetValue.get())) - } else - throw new KafkaException("Shouldn't set logEndOffset for replica %d partition [%s,%d] since it's local" - .format(brokerId, topic, partitionId)) - - } - - def logEndOffset = { - if (isLocal) - log.get.logEndOffset - else - logEndOffsetValue.get() - } - def isLocal: Boolean = { log match { case Some(l) => true @@ -62,26 +47,37 @@ class Replica(val brokerId: Int, } } - def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get() - - def highWatermark_=(newHighWatermark: Long) { + def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { if (isLocal) { - trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d" - .format(brokerId, topic, partitionId, brokerId, newHighWatermark)) - highWatermarkValue.set(newHighWatermark) - } else - throw new KafkaException("Unable to set highwatermark for replica %d partition [%s,%d] since it's not local" - .format(brokerId, topic, partitionId)) + throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId)) + } else { + logEndOffsetValue = newLogEndOffset + logEndOffsetUpdateTimeMsValue.set(time.milliseconds) + trace("Setting log end offset for replica %d for partition [%s,%d] to [%s]" + .format(brokerId, topic, partitionId, logEndOffsetValue)) + } } - def highWatermark = { + def logEndOffset = if (isLocal) - highWatermarkValue.get() + log.get.logEndOffsetMetadata else - throw new KafkaException("Unable to get highwatermark for replica %d partition [%s,%d] since it's not local" - .format(brokerId, topic, partitionId)) + logEndOffsetValue + + def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get() + + def highWatermark_=(newHighWatermark: LogOffsetMetadata) { + if (isLocal) { + highWatermarkValue = newHighWatermark + trace("Setting high watermark for replica %d partition [%s,%d] on broker %d to [%s]" + .format(brokerId, topic, partitionId, brokerId, newHighWatermark)) + } else { + throw new KafkaException("Should not set high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId)) + } } + def highWatermark = highWatermarkValue + override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Replica])) return false diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index b2652dd..df6ec24 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -202,6 +202,16 @@ class FileMessageSet private[kafka](@volatile var file: File, * The number of bytes taken up by this file set */ def sizeInBytes(): Int = _size.get() + + /** + * Returns the start position in the segment file + */ + def startPosition(): Int = start + + /** + * Returns the base offset of the segment file + */ + def baseOffset() = file.getName().split("\\.")(0).toLong /** * Append these messages to the message set diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b7bc5ff..dd6bfc8 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -21,7 +21,7 @@ import kafka.utils._ import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup -import kafka.server.BrokerTopicStats +import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats} import java.io.{IOException, File} import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} @@ -52,10 +52,10 @@ class Log(val dir: File, @volatile var config: LogConfig, @volatile var recoveryPoint: Long = 0L, val scheduler: Scheduler, - time: Time = SystemTime) extends Logging with KafkaMetricsGroup { + val time: Time = SystemTime) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ - + /* A lock that guards all modifications to the log */ private val lock = new Object @@ -67,7 +67,8 @@ class Log(val dir: File, loadSegments() /* Calculate the offset of the next message */ - private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset()) + private val nextOffset = new AtomicLong(activeSegment.nextOffset()) + @volatile var nextOffsetMetadata = new LogOffsetMetadata(nextOffset.get(), activeSegment.baseOffset, activeSegment.size.toInt) val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name) @@ -167,6 +168,11 @@ class Log(val dir: File, for (s <- logSegments) s.index.sanityCheck() } + + private def updateLogEndOffset(messageOffset: Long) { + nextOffset.set(messageOffset) + nextOffsetMetadata = new LogOffsetMetadata(nextOffset.get(), activeSegment.baseOffset, activeSegment.size.toInt) + } private def recoverLog() { // if we have the clean shutdown marker, skip recovery @@ -278,11 +284,12 @@ class Log(val dir: File, } } - // now append to the log + // now append to the log and record the file end position as the segment file size segment.append(appendInfo.firstOffset, validMessages) + appendInfo.endFilePos = segment.log.sizeInBytes() // increment the log end offset - nextOffset.set(appendInfo.lastOffset + 1) + updateLogEndOffset(appendInfo.lastOffset + 1) trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" .format(this.name, appendInfo.firstOffset, nextOffset.get(), validMessages)) @@ -303,11 +310,12 @@ class Log(val dir: File, * @param lastOffset The last offset in the message set * @param shallowCount The number of shallow messages * @param validBytes The number of valid bytes + * @param endFilePos The end file position of the last appended message * @param codec The codec used in the message set * @param offsetsMonotonic Are the offsets in this message set monotonically increasing */ - case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) - + case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, var endFilePos: Int, offsetsMonotonic: Boolean) + /** * Validate the following: *
    @@ -362,7 +370,7 @@ class Log(val dir: File, if(messageCodec != NoCompressionCodec) codec = messageCodec } - LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic) + LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, -1, monotonic) } /** @@ -392,15 +400,15 @@ class Log(val dir: File, * @param maxOffset -The offset to read up to, exclusive. (i.e. the first offset NOT included in the resulting message set). * * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment. - * @return The messages read + * @return The fetch data information including offset metadata and messages read */ - def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = { + def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = { trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size)) // check if the offset is valid and in range val next = nextOffset.get if(startOffset == next) - return MessageSet.Empty + return FetchDataInfo(logEndOffsetMetadata, MessageSet.Empty) var entry = segments.floorEntry(startOffset) @@ -412,15 +420,31 @@ class Log(val dir: File, // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log while(entry != null) { - val messages = entry.getValue.read(startOffset, maxOffset, maxLength) - if(messages == null) + val messageSet = entry.getValue.read(startOffset, maxOffset, maxLength) + if(messageSet == null) { entry = segments.higherEntry(entry.getKey) - else - return messages + } else { + if (messageSet.isInstanceOf[FileMessageSet]) { + val fileMessageSet = messageSet.asInstanceOf[FileMessageSet] + val offsetMetadata = new LogOffsetMetadata(startOffset, fileMessageSet.baseOffset(), fileMessageSet.startPosition()) + return FetchDataInfo(offsetMetadata, messageSet) + } else { + return FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, messageSet) + } + } } - // okay we are beyond the end of the last segment but less than the log end offset - MessageSet.Empty + // okay we are beyond the end of the last segment but less than the target offset, + // so return the empty message set + FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + } + + /** + * Given a message offset, find its log metadata + */ + def find(offset: Long): LogOffsetMetadata = { + val fetchDataInfo = read(offset, 1) + fetchDataInfo.fetchOffset } /** @@ -433,7 +457,7 @@ class Log(val dir: File, // find any segments that match the user-supplied predicate UNLESS it is the final segment // and it is empty (since we would just end up re-creating it val lastSegment = activeSegment - var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) + val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) val numToDelete = deletable.size if(numToDelete > 0) { lock synchronized { @@ -463,6 +487,11 @@ class Log(val dir: File, def logEndOffset: Long = nextOffset.get /** + * The offset metadata of the next message that will be appended to the log + */ + def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata + + /** * Roll the log over to a new empty log segment if necessary * @return The currently active segment after (perhaps) rolling to a new segment */ @@ -582,7 +611,7 @@ class Log(val dir: File, val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) deletable.foreach(deleteSegment(_)) activeSegment.truncateTo(targetOffset) - this.nextOffset.set(targetOffset) + updateLogEndOffset(targetOffset) this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) } } @@ -602,7 +631,7 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, time = time)) - this.nextOffset.set(newOffset) + updateLogEndOffset(newOffset) this.recoveryPoint = math.min(newOffset, this.recoveryPoint) } } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 2faa196..01fc306 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -315,7 +315,6 @@ private[log] class Cleaner(val id: Int, * @param log The log being cleaned * @param segments The group of segments being cleaned * @param map The offset map to use for cleaning segments - * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet * @param deleteHorizonMs The time to retain delete tombstones */ private[log] def cleanSegments(log: Log, diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0d6926e..2af434f 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -99,7 +99,7 @@ class LogSegment(val log: FileMessageSet, val mapping = index.lookup(offset) log.searchFor(offset, max(mapping.position, startingFilePosition)) } - + /** * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. @@ -108,7 +108,7 @@ class LogSegment(val log: FileMessageSet, * @param maxSize The maximum number of bytes to include in the message set we read * @param maxOffset An optional maximum offset for the message set we read * - * @return The message set read or null if the startOffset is larger than the largest offset in this log. + * @return The message set read or null if the startOffset is larger than the largest offset in this log */ @threadsafe def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 3b15254..7846dda 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -92,12 +92,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke val partitionsWithError = new mutable.HashSet[TopicAndPartition] var response: FetchResponse = null try { - trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) + trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { case t: Throwable => if (isRunning.get) { - warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.getMessage)) + warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala new file mode 100644 index 0000000..fb74afb --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import scala.collection.immutable.Map +import scala.collection.Seq +import kafka.network.RequestChannel +import kafka.api.{FetchResponse, FetchRequest} +import kafka.common.{NotLeaderForPartitionException, TopicAndPartition} + + +/** + * A delayed fetch request, which is satisfied (or more + * accurately, unblocked) -- if: + * Case A: This broker is no longer the leader for ANY of the partitions it tries to fetch + * - should return error. + * Case B: The fetch offset locates not on the last segment of the log + * - should return all the data on that segment. + * Case C: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * - should return whatever data is available. + */ + +class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], + override val request: RequestChannel.Request, + override val delayMs: Long, + val fetch: FetchRequest, + val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata]) + extends DelayedRequest(keys, request, delayMs) { + + def isSatisfied(replicaManager: ReplicaManager) : Boolean = { + var accumulatedSize = 0 + val fromFollower = fetch.isFromFollower + try { + partitionFetchOffsets.foreach { + case (topicAndPartition, fetchOffset) => + if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { + val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) + val endOffset = + if (fromFollower) + replica.logEndOffset + else + replica.highWatermark + + if (endOffset.older(fetchOffset)) { + // if fetching on a later segment, skip + debug("Skipping partition %s for fetch request %s since it is fetching later segments.".format(topicAndPartition, fetch)) + } else if (fetchOffset.older(endOffset)) { + // unblock if there is one fetching partition on older segments + debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch)) + return true + } else if (fetchOffset.precedes(endOffset)) { + accumulatedSize += endOffset.bytesDiff(fetchOffset) + } + } + } + } catch { + case nle: NotLeaderForPartitionException => + debug("Satisfying fetch request %s since leader has changed.".format(fetch)) + return true + } + + // unblocked if there are enough accumulated data + accumulatedSize >= fetch.minBytes + } + + def respond(replicaManager: ReplicaManager): FetchResponse = { + val topicData = replicaManager.readMessageSets(fetch) + FetchResponse(fetch.correlationId, topicData.mapValues(data => data._1)) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala new file mode 100644 index 0000000..3b38128 --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.network.RequestChannel +import kafka.api._ +import kafka.common.ErrorMapping +import kafka.utils.Logging +import kafka.api.ProducerResponseStatus +import kafka.common.TopicAndPartition +import scala.Some +import scala.collection.immutable.Map +import scala.collection.Seq + +/** A delayed produce request, which is satisified (or more + * accurately, unblocked) -- if for every partition it produce to: + * Case A: This broker is not the leader: unblock - should return error. + * Case B: This broker is the leader: + * B.1 - If there was a localError (when writing to the local log): unblock - should return error + * B.2 - else, at least requiredAcks replicas should be caught up to this request. + */ + +class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], + override val request: RequestChannel.Request, + override val delayMs: Long, + val produce: ProducerRequest, + val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus], + val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) + extends DelayedRequest(keys, request, delayMs) with Logging { + + // first update the acks pending variable according to the error code + partitionStatus foreach { case (topicAndPartition, delayedStatus) => + if (delayedStatus.responseStatus.error == ErrorMapping.NoError) { + // Timeout error state will be cleared when required acks are received + delayedStatus.acksPending = true + delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode + } else { + delayedStatus.acksPending = false + } + + trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) + } + + def respond(offsetManager: OffsetManager): RequestOrResponse = { + val responseStatus = partitionStatus.mapValues(status => status.responseStatus) + + val errorCode = responseStatus.find { case (_, status) => + status.error != ErrorMapping.NoError + }.map(_._2.error).getOrElse(ErrorMapping.NoError) + + if (errorCode == ErrorMapping.NoError) { + offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + } + + val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize)) + .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) + + response + } + + def isSatisfied(replicaManager: ReplicaManager) = { + // check for each partition if it still has pending acks + partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) => + trace("Checking producer request satisfaction for %s, acksPending = %b" + .format(topicAndPartition, fetchPartitionStatus.acksPending)) + // skip those partitions that have already been satisfied + if (fetchPartitionStatus.acksPending) { + val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) + val (hasEnough, errorCode) = partitionOpt match { + case Some(partition) => + partition.checkEnoughReplicasReachOffset( + fetchPartitionStatus.requiredOffset, + produce.requiredAcks) + case None => + (false, ErrorMapping.UnknownTopicOrPartitionCode) + } + if (errorCode != ErrorMapping.NoError) { + fetchPartitionStatus.acksPending = false + fetchPartitionStatus.responseStatus.error = errorCode + } else if (hasEnough) { + fetchPartitionStatus.acksPending = false + fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError + } + } + } + + // unblocked if there are no partitions with pending acks + val satisfied = ! partitionStatus.exists(p => p._2.acksPending) + satisfied + } +} + +case class DelayedProduceResponseStatus(val requiredOffset: Long, + val responseStatus: ProducerResponseStatus) { + var acksPending = false + + override def toString = + "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( + acksPending, responseStatus.error, responseStatus.offset, requiredOffset) +} diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala new file mode 100644 index 0000000..26f278f --- /dev/null +++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.message.MessageSet + +case class FetchDataInfo(fetchOffset: LogOffsetMetadata, messageSet: MessageSet) diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala new file mode 100644 index 0000000..a1dee88 --- /dev/null +++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.KafkaMetricsGroup +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.api.{FetchResponseSend, RequestOrResponse} +import java.util.concurrent.TimeUnit + +/** + * The purgatory holding delayed fetch requests + */ +class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) + extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { + this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) + + private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { + private val metricPrefix = if (forFollower) "Follower" else "Consumer" + + val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) + } + + private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) + private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) + + def recordDelayedFetchExpired(forFollower: Boolean) { + val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics + else aggregateNonFollowerFetchRequestMetrics + + metrics.expiredRequestMeter.mark() + } + + /** + * Check if a specified delayed fetch request is satisfied + */ + def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager) + + /** + * When a request expires just answer it with whatever data is present + */ + def expire(delayedFetch: DelayedFetch) { + debug("Expiring fetch request %s.".format(delayedFetch.fetch)) + val fromFollower = delayedFetch.fetch.isFromFollower + recordDelayedFetchExpired(fromFollower) + respond(delayedFetch) + } + + + // TODO: purgatory should not be resposible for sending back the responses + def respond(delayedFetch: DelayedFetch) { + val response = delayedFetch.respond(replicaManager) + requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response))) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0b668f2..1ccbdf3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -23,13 +23,10 @@ import kafka.log._ import kafka.message._ import kafka.network._ import kafka.admin.AdminUtils -import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.Response import kafka.controller.KafkaController -import kafka.utils.{Pool, SystemTime, Logging} +import kafka.utils.{SystemTime, Logging} -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ import scala.collection._ import org.I0Itec.zkclient.ZkClient @@ -45,11 +42,8 @@ class KafkaApis(val requestChannel: RequestChannel, val config: KafkaConfig, val controller: KafkaController) extends Logging { - private val producerRequestPurgatory = - new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests) - private val fetchRequestPurgatory = - new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) - private val delayedRequestMetrics = new DelayedRequestMetrics + val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel) + val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel) var metadataCache = new MetadataCache this.logIdent = "[KafkaApi-%d] ".format(brokerId) @@ -127,22 +121,6 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) } - /** - * Check if a partitionData from a produce request can unblock any - * DelayedFetch requests. - */ - def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) { - val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes) - trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) - - // send any newly unblocked responses - for(fetchReq <- satisfied) { - val topicData = readMessageSets(fetchReq.fetch) - val response = FetchResponse(fetchReq.fetch.correlationId, topicData) - requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) - } - } - private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = { val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map { case (topicAndPartition, offset) => @@ -171,24 +149,21 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle a produce request or offset commit request (which is really a specialized producer request) */ def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { - - val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) { - val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) - } - else { - (request.requestObj.asInstanceOf[ProducerRequest], None) - } + val (produceRequest, offsetCommitRequestOpt) = + if (request.requestId == RequestKeys.OffsetCommitKey) { + val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) + } else { + (request.requestObj.asInstanceOf[ProducerRequest], None) + } val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(produceRequest) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) + val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) val numPartitionsInError = localProduceResults.count(_.error.isDefined) - produceRequest.data.foreach(partitionAndData => - maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes)) - val allPartitionHaveReplicationFactorOne = !produceRequest.data.keySet.exists( m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) @@ -229,46 +204,26 @@ class KafkaApis(val requestChannel: RequestChannel, } else { // create a list of (topic, partition) pairs to use as keys for this delayed request val producerRequestKeys = produceRequest.data.keys.map( - topicAndPartition => new RequestKey(topicAndPartition)).toSeq + topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq val statuses = localProduceResults.map(r => r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap val delayedRequest = new DelayedProduce( producerRequestKeys, request, - statuses, - produceRequest, produceRequest.ackTimeoutMs.toLong, + produceRequest, + statuses, offsetCommitRequestOpt) - producerRequestPurgatory.watch(delayedRequest) - - /* - * Replica fetch requests may have arrived (and potentially satisfied) - * delayedProduce requests while they were being added to the purgatory. - * Here, we explicitly check if any of them can be satisfied. - */ - var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] - producerRequestKeys.foreach(key => - satisfiedProduceRequests ++= - producerRequestPurgatory.update(key, key)) - debug(satisfiedProduceRequests.size + - " producer requests unblocked during produce to local log.") - satisfiedProduceRequests.foreach(_.respond()) - - // we do not need the data anymore - produceRequest.emptyData() + // add the produce request for watch if it's not satisfied, otherwise send the response back + if (!producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)) + producerRequestPurgatory.respond(delayedRequest) } - } - - case class DelayedProduceResponseStatus(requiredOffset: Long, - status: ProducerResponseStatus) { - var acksPending = false - override def toString = - "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( - acksPending, status.error, status.offset, requiredOffset) + // we do not need the data anymore + produceRequest.emptyData() } - + case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { def this(key: TopicAndPartition, throwable: Throwable) = this(key, -1L, -1L, Some(throwable)) @@ -288,13 +243,12 @@ class KafkaApis(val requestChannel: RequestChannel, partitionAndData.map {case (topicAndPartition, messages) => try { val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) - val info = - partitionOpt match { - case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) - case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" - .format(topicAndPartition, brokerId)) - - } + val info = partitionOpt match { + case Some(partition) => + partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) + case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" + .format(topicAndPartition, brokerId)) + } val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) @@ -338,118 +292,54 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - if(fetchRequest.isFromFollower) { - maybeUpdatePartitionHw(fetchRequest) - // after updating HW, some delayed produce requests may be unblocked - var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] - fetchRequest.requestInfo.foreach { - case (topicAndPartition, _) => - val key = new RequestKey(topicAndPartition) - satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key) - } - debug("Replica %d fetch unblocked %d producer requests." - .format(fetchRequest.replicaId, satisfiedProduceRequests.size)) - satisfiedProduceRequests.foreach(_.respond()) - } - - val dataRead = readMessageSets(fetchRequest) - val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum + val dataRead = replicaManager.readMessageSets(fetchRequest) + + // if the fetch request comes from the follower, + // update its corresponding log end offset + if(fetchRequest.isFromFollower) + recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(data => data._2)) + + // check if this fetch request can be satisfied right away + val bytesReadable = dataRead.values.map(_._1.messages.sizeInBytes).sum + val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, currDataResponse) => + errorIncurred || (currDataResponse._1.error != ErrorMapping.NoError)) + // send the data immediately if 1) fetch request does not want to wait + // 2) fetch request does not require any data + // 3) has enough data to respond + // 4) some error happens while reading data if(fetchRequest.maxWait <= 0 || + fetchRequest.numPartitions <= 0 || bytesReadable >= fetchRequest.minBytes || - fetchRequest.numPartitions <= 0) { + errorReadingData) { debug("Returning fetch response %s for fetch request with correlation id %d to client %s" - .format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - val response = new FetchResponse(fetchRequest.correlationId, dataRead) + .format(dataRead.values.map(_._1.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) + val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(data => data._1)) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request - val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable) - fetchRequestPurgatory.watch(delayedFetch) - } - } + val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) + val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, + dataRead.mapValues(data => data._2)) - private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) { - debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest)) - fetchRequest.requestInfo.foreach(info => { - val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset) - replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) - }) - } - - /** - * Read from all the offset details given and return a map of - * (topic, partition) -> PartitionData - */ - private def readMessageSets(fetchRequest: FetchRequest) = { - val isFetchFromFollower = fetchRequest.isFromFollower - fetchRequest.requestInfo.map - { - case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => - val partitionData = - try { - val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) - BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes) - if (!isFetchFromFollower) { - new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) - } else { - debug("Leader %d for partition [%s,%d] received fetch request from follower %d" - .format(brokerId, topic, partition, fetchRequest.replicaId)) - new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) - } - } catch { - // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException - // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request - // for a partition it is the leader for - case utpe: UnknownTopicOrPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) - new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) - case nle: NotLeaderForPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) - new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) - case t: Throwable => - BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() - error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s" - .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage)) - new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) - } - (TopicAndPartition(topic, partition), partitionData) + // add the fetch request for watch if it's not satisfied, otherwise send the response back + if (!fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch)) + fetchRequestPurgatory.respond(delayedFetch) } } - /** - * Read from a single topic/partition at the given offset upto maxSize bytes - */ - private def readMessageSet(topic: String, - partition: Int, - offset: Long, - maxSize: Int, - fromReplicaId: Int): (MessageSet, Long) = { - // check if the current broker is the leader for the partitions - val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) - replicaManager.getReplicaOrException(topic, partition) - else - replicaManager.getLeaderReplicaIfLocal(topic, partition) - trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = - if (Request.isValidBrokerId(fromReplicaId)) - None - else - Some(localReplica.highWatermark) - val messages = localReplica.log match { - case Some(log) => - log.read(offset, maxSize, maxOffsetOpt) - case None => - error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic, partition, brokerId)) - MessageSet.Empty + private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { + debug("Record follower log end offsets: %s ".format(offsets)) + offsets.foreach { + case (topicAndPartition, offset) => + replicaManager.updateReplicaLEOAndUpdateHW(topicAndPartition.topic, + topicAndPartition.partition, replicaId, offset) + + // for producer requests with ack > 1 but not = -1, we need to check + // if they can be unblocked after some follower's log end offsets have moved + replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition)) } - (messages, localReplica.highWatermark) } /** @@ -473,7 +363,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!offsetRequest.isFromOrdinaryClient) { allOffsets } else { - val hw = localReplica.highWatermark + val hw = localReplica.highWatermark.messageOffset if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) else @@ -642,209 +532,5 @@ class KafkaApis(val requestChannel: RequestChannel, producerRequestPurgatory.shutdown() debug("Shut down complete.") } - - private [kafka] trait MetricKey { - def keyLabel: String - } - private [kafka] object MetricKey { - val globalLabel = "All" - } - - private [kafka] case class RequestKey(topic: String, partition: Int) - extends MetricKey { - - def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) - - def topicAndPartition = TopicAndPartition(topic, partition) - - override def keyLabel = "%s-%d".format(topic, partition) - } - - /** - * A delayed fetch request - */ - class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) - extends DelayedRequest(keys, request, delayMs) { - val bytesAccumulated = new AtomicLong(initialSize) - } - - /** - * A holding pen for fetch requests waiting to be satisfied - */ - class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int) - extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) { - this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId) - - /** - * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field - */ - def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = { - val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes) - accumulatedSize >= delayedFetch.fetch.minBytes - } - - /** - * When a request expires just answer it with whatever data is present - */ - def expire(delayed: DelayedFetch) { - debug("Expiring fetch request %s.".format(delayed.fetch)) - try { - val topicData = readMessageSets(delayed.fetch) - val response = FetchResponse(delayed.fetch.correlationId, topicData) - val fromFollower = delayed.fetch.isFromFollower - delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) - requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) - } - catch { - case e1: LeaderNotAvailableException => - debug("Leader changed before fetch request %s expired.".format(delayed.fetch)) - case e2: UnknownTopicOrPartitionException => - debug("Replica went offline before fetch request %s expired.".format(delayed.fetch)) - } - } - } - - class DelayedProduce(keys: Seq[RequestKey], - request: RequestChannel.Request, - val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus], - produce: ProducerRequest, - delayMs: Long, - offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) - extends DelayedRequest(keys, request, delayMs) with Logging { - - // first update the acks pending variable according to the error code - partitionStatus foreach { case (topicAndPartition, delayedStatus) => - if (delayedStatus.status.error == ErrorMapping.NoError) { - // Timeout error state will be cleared when requiredAcks are received - delayedStatus.acksPending = true - delayedStatus.status.error = ErrorMapping.RequestTimedOutCode - } else { - delayedStatus.acksPending = false - } - - trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) - } - - def respond() { - val responseStatus = partitionStatus.map { case (topicAndPartition, delayedStatus) => - topicAndPartition -> delayedStatus.status - } - - val errorCode = responseStatus.find { case (_, status) => - status.error != ErrorMapping.NoError - }.map(_._2.error).getOrElse(ErrorMapping.NoError) - - if (errorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) - } - - val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, config.offsetMetadataMaxSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) - - requestChannel.sendResponse(new RequestChannel.Response( - request, new BoundedByteBufferSend(response))) - } - - /** - * Returns true if this delayed produce request is satisfied (or more - * accurately, unblocked) -- this is the case if for every partition: - * Case A: This broker is not the leader: unblock - should return error. - * Case B: This broker is the leader: - * B.1 - If there was a localError (when writing to the local log): unblock - should return error - * B.2 - else, at least requiredAcks replicas should be caught up to this request. - * - * As partitions become acknowledged, we may be able to unblock - * DelayedFetchRequests that are pending on those partitions. - */ - def isSatisfied(followerFetchRequestKey: RequestKey) = { - val topic = followerFetchRequestKey.topic - val partitionId = followerFetchRequestKey.partition - val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId)) - trace("Checking producer request satisfaction for %s-%d, acksPending = %b" - .format(topic, partitionId, fetchPartitionStatus.acksPending)) - if (fetchPartitionStatus.acksPending) { - val partitionOpt = replicaManager.getPartition(topic, partitionId) - val (hasEnough, errorCode) = partitionOpt match { - case Some(partition) => - partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks) - case None => - (false, ErrorMapping.UnknownTopicOrPartitionCode) - } - if (errorCode != ErrorMapping.NoError) { - fetchPartitionStatus. acksPending = false - fetchPartitionStatus.status.error = errorCode - } else if (hasEnough) { - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.status.error = ErrorMapping.NoError - } - if (!fetchPartitionStatus.acksPending) { - val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition) - maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes) - } - } - - // unblocked if there are no partitions with pending acks - val satisfied = ! partitionStatus.exists(p => p._2.acksPending) - trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied)) - satisfied - } - } - - /** - * A holding pen for produce requests waiting to be satisfied. - */ - private [kafka] class ProducerRequestPurgatory(purgeInterval: Int) - extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) { - this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId) - - protected def checkSatisfied(followerFetchRequestKey: RequestKey, - delayedProduce: DelayedProduce) = - delayedProduce.isSatisfied(followerFetchRequestKey) - - /** - * Handle an expired delayed request - */ - protected def expire(delayedProduce: DelayedProduce) { - for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) - delayedRequestMetrics.recordDelayedProducerKeyExpired(RequestKey(topicPartition.topic, topicPartition.partition)) - - delayedProduce.respond() - } - } - - private class DelayedRequestMetrics { - private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { - val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - - private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { - private val metricPrefix = if (forFollower) "Follower" else "Consumer" - - val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val producerRequestMetricsForKey = { - val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") - new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory)) - } - - private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics - - private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) - private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) - - def recordDelayedProducerKeyExpired(key: MetricKey) { - val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) - List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) - } - - def recordDelayedFetchExpired(forFollower: Boolean) { - val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - - metrics.expiredRequestMeter.mark() - } - } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c22e51e..3446234 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -103,6 +103,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) + // TODO: the following line will be removed in 0.9 + replicaManager.initWithRequestPurgatory(apis.producerRequestPurgatory, apis.fetchRequestPurgatory) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala new file mode 100644 index 0000000..6bef129 --- /dev/null +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.apache.kafka.common.KafkaException + +object LogOffsetMetadata { + val UnknownOffsetMetadata = new LogOffsetMetadata(-1, 0, 0) + + class LogOrdering extends Ordering[LogOffsetMetadata] { + override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = { + return x.messagesDiff(y).toInt + } + } + +} + +/* + * A log offset structure, including: + * 1. the message offset + * 2. the physical position on the located segment + * 3. the base message offset of the located segment + */ +class LogOffsetMetadata(val messageOffset: Long, val segmentBaseOffset: Long, val relativePositionInSegment: Int) { + + def this(messageOffset: Long) = this(messageOffset, -1, -1) + + // check if this offset is already on an older segment compared with the given offset + def older(that: LogOffsetMetadata): Boolean = this.segmentBaseOffset < that.segmentBaseOffset + + // check if this offset is on the same segment with the given offset + def peer(that: LogOffsetMetadata): Boolean = this.segmentBaseOffset == that.segmentBaseOffset + + // check if this offset is before the given offset + def precedes(that: LogOffsetMetadata): Boolean = this.messageOffset < that.messageOffset + + // compute the number of messages between this offset to the given offset + def messagesDiff(that: LogOffsetMetadata): Long = { + return this.messageOffset - that.messageOffset + } + + // compute the number of bytes between this offset to the given offset + // if they are on the same segment and this offset precedes the given offset + def bytesDiff(that: LogOffsetMetadata): Int = { + if(! peer(that)) + throw new KafkaException("This offset metadata is either not on the same segment of the other offset") + return this.relativePositionInSegment - that.relativePositionInSegment + } + + override def toString() = messageOffset.toString + " [" + segmentBaseOffset + " : " + relativePositionInSegment + "]" + + override def equals(that: Any) = (that.isInstanceOf[LogOffsetMetadata] + && this.messageOffset == that.asInstanceOf[LogOffsetMetadata].messageOffset + && this.segmentBaseOffset == that.asInstanceOf[LogOffsetMetadata].segmentBaseOffset + && this.relativePositionInSegment == that.asInstanceOf[LogOffsetMetadata].relativePositionInSegment) +} diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 0e22897..d8cd01c 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -271,7 +271,7 @@ class OffsetManager(val config: OffsetManagerConfig, // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { buffer.clear() - val messages = log.read(currOffset, config.loadBufferSize).asInstanceOf[FileMessageSet] + val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] messages.readInto(buffer, 0) val messageSet = new ByteBufferMessageSet(buffer) messageSet.foreach { msgAndOffset => @@ -312,7 +312,7 @@ class OffsetManager(val config: OffsetManagerConfig, val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId) val hw = partitionOpt.map { partition => - partition.leaderReplicaIfLocal().map(_.highWatermark).getOrElse(-1L) + partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L) }.getOrElse(-1L) hw diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala new file mode 100644 index 0000000..3cfc291 --- /dev/null +++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.TimeUnit +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.Pool +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.api.RequestOrResponse + +/** + * The purgatory holding delayed producer requests + */ +class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) + extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { + this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) + + private class DelayedProducerRequestMetrics(keyLabel: String = RequestKey.globalLabel) extends KafkaMetricsGroup { + val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) + } + + private val producerRequestMetricsForKey = { + val valueFactory = (k: RequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") + new Pool[RequestKey, DelayedProducerRequestMetrics](Some(valueFactory)) + } + + private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics + + def recordDelayedProducerKeyExpired(key: RequestKey) { + val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) + List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) + } + + def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager) + + /** + * Handle an expired delayed request + */ + def expire(delayedProduce: DelayedProduce) { + debug("Expiring produce request %s.".format(delayedProduce.produce)) + for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) + recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition)) + respond(delayedProduce) + } + + // TODO: purgatory should not be resposible for sending back the responses + def respond(delayedProduce: DelayedProduce) { + val response = delayedProduce.respond(offsetManager) + requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response))) + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 75ae1e1..6879e73 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -47,16 +47,19 @@ class ReplicaFetcherThread(name:String, val replica = replicaMgr.getReplica(topic, partitionId).get val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - if (fetchOffset != replica.logEndOffset) - throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset)) + if (fetchOffset != replica.logEndOffset.messageOffset) + throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset)) trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" - .format(replica.brokerId, replica.logEndOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw)) + .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw)) replica.log.get.append(messageSet, assignOffsets = false) trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s" - .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, topicAndPartition)) - val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) - replica.highWatermark = followerHighWatermark - trace("Follower %d set replica highwatermark for partition [%s,%d] to %d" + .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition)) + val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw) + // for the follower replica, we do not need to keep + // its segment base offset the physical position, + // these values will be computed upon making the leader + replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) + trace("Follower %d set replica high watermark for partition [%s,%d] to %s" .format(replica.brokerId, topic, partitionId, followerHighWatermark)) } catch { case e: KafkaStorageException => @@ -82,7 +85,7 @@ class ReplicaFetcherThread(name:String, * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. */ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) - if (leaderEndOffset < replica.logEndOffset) { + if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. @@ -91,13 +94,13 @@ class ReplicaFetcherThread(name:String, // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" - .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset)) + .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) Runtime.getRuntime.halt(1) } replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderEndOffset)) + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset)) leaderEndOffset } else { /** @@ -109,7 +112,7 @@ class ReplicaFetcherThread(name:String, val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderStartOffset)) + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) leaderStartOffset } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6a56a77..f0370c3 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,23 +16,28 @@ */ package kafka.server -import collection._ -import mutable.HashMap import kafka.cluster.{Broker, Partition, Replica} import kafka.utils._ import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup import kafka.common._ -import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} +import kafka.api._ import kafka.controller.KafkaController +import kafka.common.TopicAndPartition +import kafka.message.MessageSet import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} import java.util.concurrent.TimeUnit +import scala.Predef._ +import scala.collection._ +import scala.collection.mutable.HashMap +import scala.collection.Map +import scala.collection.Set +import scala.Some object ReplicaManager { - val UnknownLogEndOffset = -1L val HighWatermarkFilename = "replication-offset-checkpoint" } @@ -56,6 +61,9 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger + var producerRequestPurgatory: ProducerRequestPurgatory = null + var fetchRequestPurgatory: FetchRequestPurgatory = null + newGauge( "LeaderCount", new Gauge[Int] { @@ -93,6 +101,40 @@ class ReplicaManager(val config: KafkaConfig, } /** + * Initialize the replica manager with the request purgatory + * + * TODO: will be removed in 0.9 where we refactor server structure + */ + + def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) { + this.producerRequestPurgatory = producerRequestPurgatory + this.fetchRequestPurgatory = fetchRequestPurgatory + } + + /** + * Unblock some delayed produce requests with the request key + */ + def unblockDelayedProduceRequests(key: RequestKey) { + val satisfied = producerRequestPurgatory.update(key) + debug("Request key %s unblocked %d producer requests." + .format(key.keyLabel, satisfied.size)) + + // send any newly unblocked responses + satisfied.foreach(producerRequestPurgatory.respond(_)) + } + + /** + * Unblock some delayed fetch requests with the request key + */ + def unblockDelayedFetchRequests(key: RequestKey) { + val satisfied = fetchRequestPurgatory.update(key) + debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size)) + + // send any newly unblocked responses + satisfied.foreach(fetchRequestPurgatory.respond(_)) + } + + /** * This function is only used in two places: in Partition.updateISR() and KafkaApis.handleProducerRequest(). * In the former case, the partition should have been created, in the latter case, return -1 will put the request into purgatory */ @@ -212,6 +254,77 @@ class ReplicaManager(val config: KafkaConfig, } } + /** + * Read from all the offset details given and return a map of + * (topic, partition) -> PartitionData + */ + def readMessageSets(fetchRequest: FetchRequest) = { + val isFetchFromFollower = fetchRequest.isFromFollower + fetchRequest.requestInfo.map + { + case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => + val partitionDataAndOffsetInfo = + try { + val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) + BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) + if (isFetchFromFollower) { + debug("Partition [%s,%d] received fetch request from follower %d" + .format(topic, partition, fetchRequest.replicaId)) + } + (new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, fetchInfo.messageSet), fetchInfo.fetchOffset) + } catch { + // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException + // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request + // for a partition it is the leader for + case utpe: UnknownTopicOrPartitionException => + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) + (new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) + case nle: NotLeaderForPartitionException => + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) + (new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) + case t: Throwable => + BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() + error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s" + .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage)) + (new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) + } + (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) + } + } + + /** + * Read from a single topic/partition at the given offset upto maxSize bytes + */ + private def readMessageSet(topic: String, + partition: Int, + offset: Long, + maxSize: Int, + fromReplicaId: Int): (FetchDataInfo, Long) = { + // check if the current broker is the leader for the partitions + val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) + getReplicaOrException(topic, partition) + else + getLeaderReplicaIfLocal(topic, partition) + trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) + val maxOffsetOpt = + if (Request.isValidBrokerId(fromReplicaId)) + None + else + Some(localReplica.highWatermark.messageOffset) + val fetchInfo = localReplica.log match { + case Some(log) => + log.read(offset, maxSize, maxOffsetOpt) + case None => + error("Leader for partition [%s,%d] does not have a local log".format(topic, partition)) + FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + } + (fetchInfo, localReplica.highWatermark.messageOffset) + } + def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) { replicaStateChangeLock synchronized { if(updateMetadataRequest.controllerEpoch < controllerEpoch) { @@ -419,7 +532,7 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId))) } - logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap) + logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + @@ -437,7 +550,9 @@ class ReplicaManager(val config: KafkaConfig, else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => - new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap + new TopicAndPartition(partition) -> BrokerAndInitialOffset( + leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, + partition.getReplica().get.logEndOffset.messageOffset)).toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) partitionsToMakeFollower.foreach { partition => @@ -471,12 +586,24 @@ class ReplicaManager(val config: KafkaConfig, curLeaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } - def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = { + def updateReplicaLEOAndUpdateHW(topic: String, partitionId: Int, replicaId: Int, offset: LogOffsetMetadata) = { val partitionOpt = getPartition(topic, partitionId) - if(partitionOpt.isDefined) { - partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset) - } else { + if(!partitionOpt.isDefined) { warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId)) + } else { + debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset.messageOffset, topic, partitionId)) + val partition = partitionOpt.get + val replicaOpt = partition.getReplica(replicaId) + if(!replicaOpt.isDefined) { + throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" + + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, + offset.messageOffset, topic, partitionId, replicaId, partition.assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + } + val replica = replicaOpt.get + replica.logEndOffset = offset + + // check if we need to update HW and expand Isr + partition.updateLeaderHWAndMaybeExpandIsr(replicaId) } } @@ -487,7 +614,7 @@ class ReplicaManager(val config: KafkaConfig, val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap + val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap try { highWatermarkCheckpoints(dir).write(hwms) } catch { diff --git a/core/src/main/scala/kafka/server/RequestKey.scala b/core/src/main/scala/kafka/server/RequestKey.scala new file mode 100644 index 0000000..74ba1ed --- /dev/null +++ b/core/src/main/scala/kafka/server/RequestKey.scala @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.TopicAndPartition + +/** + * Keys used for delayed request metrics recording + */ +trait RequestKey { + def keyLabel: String +} + +object RequestKey { + val globalLabel = "All" +} + +case class TopicPartitionRequestKey(topic: String, partition: Int) extends RequestKey { + + def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) + + override def keyLabel = "%s-%d".format(topic, partition) +} diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 3d0ff1e..ba5f991 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -32,7 +32,7 @@ import com.yammer.metrics.core.Gauge * The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied, * for example a key could be a (topic, partition) pair. */ -class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { +class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, override val delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { val satisfied = new AtomicBoolean(false) } @@ -45,8 +45,8 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * * For us the key is generally a (topic, partition) pair. * By calling - * watch(delayedRequest) - * we will add triggers for each of the given keys. It is up to the user to then call + * checkAndMaybeWatch(delayedRequest) + * we will watch requests if they are not satisfied by adding triggers for each of the given keys. It is up to the user to then call * val satisfied = update(key, request) * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this * new request. @@ -61,18 +61,19 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * this function handles delayed requests that have hit their time limit without being satisfied. * */ -abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 1000) +abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { /* a list of requests watching each key */ - private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key))) - private val requestCounter = new AtomicInteger(0) + /* the number of requests being watched, duplicates added on different watchers are also counted */ + private val watched = new AtomicInteger(0) newGauge( "PurgatorySize", new Gauge[Int] { - def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests + def value = watched.get() + expiredRequestReaper.numRequests } ) @@ -91,33 +92,39 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge /** * Add a new delayed request watching the contained keys */ - def watch(delayedRequest: T) { - requestCounter.getAndIncrement() - + def checkAndMaybeWatch(delayedRequest: T): Boolean = { for(key <- delayedRequest.keys) { - var lst = watchersFor(key) - lst.add(delayedRequest) + val lst = watchersFor(key) + // the request could already be satisified and + // hence not added to watchers; in this case we can + // return immediately without trying to add to rest watchers + if(!lst.add(delayedRequest)) + return false } + + // if it is indeed watched, add to the expire queue also expiredRequestReaper.enqueue(delayedRequest) + + true } /** * Update any watchers and return a list of newly satisfied requests. */ - def update(key: Any, request: R): Seq[T] = { + def update(key: Any): Seq[T] = { val w = watchersForKey.get(key) if(w == null) Seq.empty else - w.collectSatisfiedRequests(request) + w.collectSatisfiedRequests() } private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) /** - * Check if this request satisfied this delayed request + * Check if this delayed request is already satisfied */ - protected def checkSatisfied(request: R, delayed: T): Boolean + protected def checkSatisfied(request: T): Boolean /** * Handle an expired delayed request @@ -125,7 +132,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge protected def expire(delayed: T) /** - * Shutdown the expirey thread + * Shutdown the expire reaper thread */ def shutdown() { expiredRequestReaper.shutdown() @@ -135,15 +142,17 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge * A linked list of DelayedRequests watching some key with some associated * bookkeeping logic. */ - private class Watchers { - - private val requests = new util.LinkedList[T] + private class Watchers(key: Any) { + private val requests = new util.ArrayList[T] - def numRequests = requests.size - - def add(t: T) { + def add(t: T): Boolean = { synchronized { + // atomically check satisfactory criteria and add to the watch list + if (checkSatisfied(t)) + return false requests.add(t) + watched.getAndIncrement() + return true } } @@ -155,6 +164,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge val curr = iter.next if(curr.satisfied.get()) { iter.remove() + watched.getAndDecrement() purged += 1 } } @@ -162,7 +172,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } } - def collectSatisfiedRequests(request: R): Seq[T] = { + def collectSatisfiedRequests(): Seq[T] = { val response = new mutable.ArrayBuffer[T] synchronized { val iter = requests.iterator() @@ -174,9 +184,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } else { // synchronize on curr to avoid any race condition with expire // on client-side. - val satisfied = curr synchronized checkSatisfied(request, curr) + val satisfied = curr synchronized checkSatisfied(curr) if(satisfied) { iter.remove() + watched.getAndDecrement() val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { response += curr @@ -215,9 +226,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge expire(curr) } } - if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge + if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge debug("Beginning purgatory purge") - requestCounter.set(0) val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum @@ -266,10 +276,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } /** - * Delete all expired events from the delay queue + * Delete all satisfied events from the delay queue and the watcher lists */ private def purgeSatisfied(): Int = { var purged = 0 + + // purge the delayed queue val iter = delayed.iterator() while(iter.hasNext) { val curr = iter.next() @@ -278,6 +290,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge purged += 1 } } + purged } } diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala index d727649..8fae43c 100644 --- a/core/src/main/scala/kafka/utils/DelayedItem.scala +++ b/core/src/main/scala/kafka/utils/DelayedItem.scala @@ -20,7 +20,7 @@ package kafka.utils import java.util.concurrent._ import scala.math._ -class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed with Logging { +class DelayedItem[T](val item: T, val delay: Long, val unit: TimeUnit) extends Delayed with Logging { val createdMs = SystemTime.milliseconds val delayMs = { diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 8fcd068..e19b8b2 100644 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -91,7 +91,7 @@ object StressTestLog { @volatile var offset = 0 override def work() { try { - log.read(offset, 1024, Some(offset+1)) match { + log.read(offset, 1024, Some(offset+1)).messageSet match { case read: FileMessageSet if read.sizeInBytes > 0 => { val first = read.head require(first.offset == offset, "We should either read nothing or the message we asked for.") diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 9f04bd3..a5386a0 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -74,7 +74,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val replica = servers.head.replicaManager.getReplica(topic, 0).get assertTrue("HighWatermark should equal logEndOffset with just 1 replica", - replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark) + replica.logEndOffset.messageOffset > 0 && replica.logEndOffset.equals(replica.highWatermark)) val request = new FetchRequestBuilder() .clientId("test-client") @@ -248,13 +248,13 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with "Published messages should be in the log") val replicaId = servers.head.config.brokerId - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") - TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark == 2 }, + TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark.messageOffset == 2 }, "High watermark should equal to log end offset") // test if the consumer received the messages in the correct order when producer has enabled request pipelining diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index d03d4c4..d7279ea 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -104,7 +104,7 @@ class LogManagerTest extends JUnit3Suite { assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes) try { log.read(0, 1024) @@ -158,7 +158,7 @@ class LogManagerTest extends JUnit3Suite { assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) - assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes) + assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes) try { log.read(0, 1024) fail("Should get exception from fetching earlier.") diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1da1393..577d102 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -131,11 +131,11 @@ class LogTest extends JUnitSuite { for(i <- 0 until messages.length) log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i))) for(i <- 0 until messages.length) { - val read = log.read(i, 100, Some(i+1)).head + val read = log.read(i, 100, Some(i+1)).messageSet.head assertEquals("Offset read should match order appended.", i, read.offset) assertEquals("Message should match appended.", messages(i), read.message) } - assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size) + assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size) } /** @@ -153,7 +153,7 @@ class LogTest extends JUnitSuite { log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false) for(i <- 50 until messageIds.max) { val idx = messageIds.indexWhere(_ >= i) - val read = log.read(i, 100, None).head + val read = log.read(i, 100, None).messageSet.head assertEquals("Offset read should match message id.", messageIds(idx), read.offset) assertEquals("Message should match appended.", messages(idx), read.message) } @@ -176,7 +176,7 @@ class LogTest extends JUnitSuite { // now manually truncate off all but one message from the first segment to create a gap in the messages log.logSegments.head.truncateTo(1) - assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset) + assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset) } /** @@ -188,7 +188,7 @@ class LogTest extends JUnitSuite { def testReadOutOfRange() { createEmptyLogs(logDir, 1024) val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time) - assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes) + assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes) try { log.read(0, 1024) fail("Expected exception on invalid read.") @@ -219,12 +219,12 @@ class LogTest extends JUnitSuite { /* do successive reads to ensure all our messages are there */ var offset = 0L for(i <- 0 until numMessages) { - val messages = log.read(offset, 1024*1024) + val messages = log.read(offset, 1024*1024).messageSet assertEquals("Offsets not equal", offset, messages.head.offset) assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message, messages.head.message) offset = messages.head.offset + 1 } - val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) + val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet assertEquals("Should be no more messages", 0, lastRead.size) // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure @@ -245,7 +245,7 @@ class LogTest extends JUnitSuite { log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message) + def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message) /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) @@ -363,7 +363,7 @@ class LogTest extends JUnitSuite { log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) - assertEquals(i, log.read(i, 100, None).head.offset) + assertEquals(i, log.read(i, 100, None).messageSet.head.offset) log.close() } @@ -575,15 +575,15 @@ class LogTest extends JUnitSuite { @Test def testAppendMessageWithNullPayload() { - var log = new Log(logDir, + val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, time.scheduler, time) log.append(new ByteBufferMessageSet(new Message(bytes = null))) - val ms = log.read(0, 4096, None) - assertEquals(0, ms.head.offset) - assertTrue("Message payload should be null.", ms.head.message.isNull) + val messageSet = log.read(0, 4096, None).messageSet + assertEquals(0, messageSet.head.offset) + assertTrue("Message payload should be null.", messageSet.head.message.isNull) } @Test diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 558a5d6..90aba19 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -72,18 +72,12 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { partition0.addReplicaIfNotExists(followerReplicaPartition0) replicaManager.checkpointHighWatermarks() fooPartition0Hw = hwmFor(replicaManager, topic, 0) - assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) - try { - followerReplicaPartition0.highWatermark - fail("Should fail with KafkaException") - }catch { - case e: KafkaException => // this is ok - } - // set the highwatermark for local replica - partition0.getReplica().get.highWatermark = 5L + assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw) + // set the high watermark for local replica + partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L) replicaManager.checkpointHighWatermarks() fooPartition0Hw = hwmFor(replicaManager, topic, 0) - assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) + assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw) EasyMock.verify(zkClient) } @@ -110,12 +104,12 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0) replicaManager.checkpointHighWatermarks() topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) - assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw) - // set the highwatermark for local replica - topic1Partition0.getReplica().get.highWatermark = 5L + assertEquals(leaderReplicaTopic1Partition0.highWatermark.messageOffset, topic1Partition0Hw) + // set the high watermark for local replica + topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L) replicaManager.checkpointHighWatermarks() topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) - assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark) + assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark.messageOffset) assertEquals(5L, topic1Partition0Hw) // add another partition and set highwatermark val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1) @@ -126,13 +120,13 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0) replicaManager.checkpointHighWatermarks() var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) - assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw) + assertEquals(leaderReplicaTopic2Partition0.highWatermark.messageOffset, topic2Partition0Hw) // set the highwatermark for local replica - topic2Partition0.getReplica().get.highWatermark = 15L - assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark) + topic2Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(15L) + assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark.messageOffset) // change the highwatermark for topic1 - topic1Partition0.getReplica().get.highWatermark = 10L - assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark) + topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(10L) + assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark.messageOffset) replicaManager.checkpointHighWatermarks() // verify checkpointed hw for topic 2 topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 2cd3a3f..7d9af93 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -46,7 +46,7 @@ class IsrExpirationTest extends JUnit3Suite { val leaderReplica = partition0.getReplica(configs.head.brokerId).get // let the follower catch up to 10 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10) + (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(10L)) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -69,7 +69,7 @@ class IsrExpirationTest extends JUnit3Suite { assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get // set remote replicas leo to something low, like 4 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 4L) + (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(4L)) // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than // replicaMaxLagBytes, the follower is out of sync. @@ -97,7 +97,7 @@ class IsrExpirationTest extends JUnit3Suite { private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = { val log1 = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log1.logEndOffset).andReturn(logEndOffset).times(expectedCalls) + EasyMock.expect(log1.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(logEndOffset)).times(expectedCalls) EasyMock.replay(log1) log1 diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index b349fce..7c10c2c 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -85,7 +85,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // give some time for the follower 1 to record leader HW TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == numMessages, "Failed to update high watermark for follower after timeout") servers.foreach(server => server.replicaManager.checkpointHighWatermarks()) @@ -134,7 +134,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // give some time for follower 1 to record leader HW of 60 TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) @@ -147,7 +147,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val hw = 20L // give some time for follower 1 to record leader HW of 600 TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) @@ -165,7 +165,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // allow some time for the follower to get the leader HW TestUtils.waitUntilTrue(() => - server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // kill the server hosting the preferred replica server1.shutdown() @@ -191,7 +191,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { // allow some time for the follower to get the leader HW TestUtils.waitUntilTrue(() => - server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw, + server1.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed servers.foreach(server => server.shutdown()) diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 4f61f84..081203a 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -46,17 +46,17 @@ class RequestPurgatoryTest extends JUnit3Suite { def testRequestSatisfaction() { val r1 = new DelayedRequest(Array("test1"), null, 100000L) val r2 = new DelayedRequest(Array("test2"), null, 100000L) - assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1", producerRequest1).size) - purgatory.watch(r1) - assertEquals("Still nothing satisfied", 0, purgatory.update("test1", producerRequest1).size) - purgatory.watch(r2) - assertEquals("Still nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) + assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) + assertTrue("r1 watched", purgatory.checkAndMaybeWatch(r1)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) + assertTrue("r2 watched", purgatory.checkAndMaybeWatch(r2)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) purgatory.satisfied += r1 - assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1", producerRequest1)) - assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size) + assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) + assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) purgatory.satisfied += r2 - assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2)) - assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) + assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) + assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) } @Test @@ -65,8 +65,8 @@ class RequestPurgatoryTest extends JUnit3Suite { val r1 = new DelayedRequest(Array("test1"), null, expiration) val r2 = new DelayedRequest(Array("test1"), null, 200000L) val start = System.currentTimeMillis - purgatory.watch(r1) - purgatory.watch(r2) + assertTrue("r1 watched", purgatory.checkAndMaybeWatch(r1)) + assertTrue("r2 watched", purgatory.checkAndMaybeWatch(r2)) purgatory.awaitExpiration(r1) val elapsed = System.currentTimeMillis - start assertTrue("r1 expired", purgatory.expired.contains(r1)) @@ -74,7 +74,7 @@ class RequestPurgatoryTest extends JUnit3Suite { assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) } - class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] { + class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] { val satisfied = mutable.Set[DelayedRequest]() val expired = mutable.Set[DelayedRequest]() def awaitExpiration(delayed: DelayedRequest) = { @@ -82,7 +82,7 @@ class RequestPurgatoryTest extends JUnit3Suite { delayed.wait() } } - def checkSatisfied(request: ProducerRequest, delayed: DelayedRequest): Boolean = satisfied.contains(delayed) + def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) def expire(delayed: DelayedRequest) { expired += delayed delayed synchronized { diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index b1c4ce9..cd27e1b 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -26,7 +26,9 @@ import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite import kafka.api._ import scala.Some -import kafka.common.TopicAndPartition +import kafka.common.{ErrorMapping, TopicAndPartition} +import java.util.Collections +import scala.collection.JavaConversions._ class SimpleFetchTest extends JUnit3Suite { @@ -45,13 +47,13 @@ class SimpleFetchTest extends JUnit3Suite { * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync * but is still in ISR (hasn't yet expired from ISR). * - * When a normal consumer fetches data, it only should only see data upto the HW of the leader, + * When a normal consumer fetches data, it should only see data up to the HW of the leader, * in this case up an offset of "5". */ def testNonReplicaSeesHwWhenFetching() { /* setup */ val time = new MockTime - val leo = 20 + val leo = 20L val hw = 5 val fetchSize = 100 val messages = new Message("test-message".getBytes()) @@ -64,7 +66,11 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() EasyMock.expect(log) - EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(0L, 0L, leo.toInt), + new ByteBufferMessageSet(messages) + )).anyTimes() EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) @@ -76,14 +82,25 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(0, fetchSize, Some(hw)) + val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Collections.singletonMap(TopicAndPartition(topic, partitionId), (partitionData, fetchInfo.fetchOffset)).toMap + }).anyTimes() EasyMock.replay(replicaManager) val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L + partition.getReplica(configs(1).brokerId).get.logEndOffset = new LogOffsetMetadata(leo - 5L, 0L, leo.toInt - 5) EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(0, fetchSize, Some(hw)) + val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Collections.singletonMap(TopicAndPartition(topic, partitionId), (partitionData, fetchInfo.fetchOffset)).toMap + }).anyTimes() + EasyMock.replay(replicaManager) val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) @@ -138,7 +155,11 @@ class SimpleFetchTest extends JUnit3Suite { val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() - EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(new ByteBufferMessageSet(messages)) + EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(followerLEO, 0L, followerLEO), + new ByteBufferMessageSet(messages) + )).anyTimes() EasyMock.replay(log) val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) @@ -150,16 +171,27 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) + val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Collections.singletonMap(TopicAndPartition(topic, partitionId), (partitionData, fetchInfo.fetchOffset)).toMap + }).anyTimes() EasyMock.replay(replicaManager) val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long] + partition.getReplica(followerReplicaId).get.logEndOffset = new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO) EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() - EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO)) + EasyMock.expect(replicaManager.updateReplicaLEOAndUpdateHW(topic, partitionId, followerReplicaId, new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO))) EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId)) EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() + EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ + val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) + val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) + Collections.singletonMap(TopicAndPartition(topic, partitionId), (partitionData, fetchInfo.fetchOffset)).toMap + }).anyTimes() + EasyMock.expect(replicaManager.unblockDelayedProduceRequests(EasyMock.anyObject())).anyTimes() EasyMock.replay(replicaManager) val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) @@ -204,7 +236,7 @@ class SimpleFetchTest extends JUnit3Suite { partition.inSyncReplicas = allReplicas.toSet // set the leader and its hw and the hw update time partition.leaderReplicaIdOpt = Some(leaderId) - leaderReplica.highWatermark = leaderHW + leaderReplica.highWatermark = new LogOffsetMetadata(leaderHW) partition }