diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 518d2df..7de911a 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -21,11 +21,11 @@ import kafka.admin.AdminUtils
import kafka.utils.{ZkUtils, Pool, Time, Logging}
import kafka.utils.Utils.inLock
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
-import kafka.log.LogConfig
+import kafka.log.{FileMessageSet, LogConfig}
import kafka.server.{OffsetManager, ReplicaManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
-import kafka.message.ByteBufferMessageSet
+import kafka.message.{MessageSet, ByteBufferMessageSet}
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -307,11 +307,21 @@ class Partition(val topic: String,
val oldHighWatermark = leaderReplica.highWatermark
if(newHighWatermark > oldHighWatermark) {
leaderReplica.highWatermark = newHighWatermark
- debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark))
+ debug("High watermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark))
+ // search for the corresponding log segment position and base offset,
+ // if the HW is the end offset, use the log end position
+ val newHighWatermarkInfo = leaderReplica.log.get.read(newHighWatermark, 1, None)
+ if (newHighWatermarkInfo.equals(MessageSet.Empty)) {
+ leaderReplica.logHWPosition.set(leaderReplica.logActiveSegmentSize.get())
+ leaderReplica.logHWSegmentBaseOffset.set(leaderReplica.logActiveSegmentBaseOffset.get())
+ } else {
+ leaderReplica.logHWPosition.set(newHighWatermarkInfo.asInstanceOf[FileMessageSet].startPosition())
+ leaderReplica.logHWSegmentBaseOffset.set(newHighWatermarkInfo.asInstanceOf[FileMessageSet].baseOffset())
+ }
+ } else {
+ warn("Skipping update high watermark since Old hw %d is larger than new hw is %d for partition [%s,%d]. All leo's are %s"
+ .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
}
- else
- debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are %s"
- .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
@@ -363,6 +373,8 @@ class Partition(val topic: String,
case Some(leaderReplica) =>
val log = leaderReplica.log.get
val info = log.append(messages, assignOffsets = true)
+ leaderReplica.logActiveSegmentSize.set(info.endFilePos)
+ leaderReplica.logActiveSegmentBaseOffset.set(log.activeSegment.baseOffset)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
info
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 5e659b4..889b3e0 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -21,18 +21,29 @@ import kafka.log.Log
import kafka.utils.{SystemTime, Time, Logging}
import kafka.common.KafkaException
import kafka.server.ReplicaManager
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
class Replica(val brokerId: Int,
val partition: Partition,
time: Time = SystemTime,
initialHighWatermarkValue: Long = 0L,
val log: Option[Log] = None) extends Logging {
- //only defined in local replica
- private[this] var highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue)
- // only used for remote replica; logEndOffsetValue for local replica is kept in log
- private[this] var logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset)
- private[this] var logEndOffsetUpdateTimeMsValue: AtomicLong = new AtomicLong(time.milliseconds)
+ // the high watermark offset value, only defined in local replica
+ private[this] val highWatermarkValue: AtomicLong = new AtomicLong(initialHighWatermarkValue)
+ // the high watermark offset's corresponding file position, only defined in local replica
+ private[this] val logEndOffsetValue = new AtomicLong(ReplicaManager.UnknownLogEndOffset)
+ // the time when log offset is updated
+ private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds)
+
+ // the log position of the high watermark, only kept in local replica
+ val logHWPosition = new AtomicInteger(ReplicaManager.UnknownLogPosition)
+ // the base offset of the log segment containing the high watermark, only kept in local replica
+ val logHWSegmentBaseOffset = new AtomicLong(ReplicaManager.UnknownlogSegmentBaseOffset)
+ // the log end position value, only kept in local replica
+ val logActiveSegmentSize = new AtomicInteger(ReplicaManager.UnknownLogPosition)
+ // the base offset of the last segment in the log, only kept in local replica
+ val logActiveSegmentBaseOffset = new AtomicLong(ReplicaManager.UnknownlogSegmentBaseOffset)
+
val topic = partition.topic
val partitionId = partition.partitionId
@@ -45,7 +56,6 @@ class Replica(val brokerId: Int,
} else
throw new KafkaException("Shouldn't set logEndOffset for replica %d partition [%s,%d] since it's local"
.format(brokerId, topic, partitionId))
-
}
def logEndOffset = {
@@ -54,7 +64,9 @@ class Replica(val brokerId: Int,
else
logEndOffsetValue.get()
}
-
+
+ def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
+
def isLocal: Boolean = {
log match {
case Some(l) => true
@@ -62,8 +74,6 @@ class Replica(val brokerId: Int,
}
}
- def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get()
-
def highWatermark_=(newHighWatermark: Long) {
if (isLocal) {
trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d"
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index b2652dd..df6ec24 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -202,6 +202,16 @@ class FileMessageSet private[kafka](@volatile var file: File,
* The number of bytes taken up by this file set
*/
def sizeInBytes(): Int = _size.get()
+
+ /**
+ * Returns the start position in the segment file
+ */
+ def startPosition(): Int = start
+
+ /**
+ * Returns the base offset of the segment file
+ */
+ def baseOffset() = file.getName().split("\\.")(0).toLong
/**
* Append these messages to the message set
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b7bc5ff..e6a536e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -278,8 +278,9 @@ class Log(val dir: File,
}
}
- // now append to the log
+ // now append to the log and record the file end position as the segment file size
segment.append(appendInfo.firstOffset, validMessages)
+ appendInfo.endFilePos = segment.log.sizeInBytes()
// increment the log end offset
nextOffset.set(appendInfo.lastOffset + 1)
@@ -303,11 +304,12 @@ class Log(val dir: File,
* @param lastOffset The last offset in the message set
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
+ * @param endFilePos The end file position of the last appended message
* @param codec The codec used in the message set
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
*/
- case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
-
+ case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, var endFilePos: Int, offsetsMonotonic: Boolean)
+
/**
* Validate the following:
*
@@ -362,7 +364,7 @@ class Log(val dir: File,
if(messageCodec != NoCompressionCodec)
codec = messageCodec
}
- LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic)
+ LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, -1, monotonic)
}
/**
@@ -412,14 +414,15 @@ class Log(val dir: File,
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
while(entry != null) {
- val messages = entry.getValue.read(startOffset, maxOffset, maxLength)
- if(messages == null)
+ val messageSet = entry.getValue.read(startOffset, maxOffset, maxLength)
+ if(messageSet == null)
entry = segments.higherEntry(entry.getKey)
else
- return messages
+ return messageSet
}
- // okay we are beyond the end of the last segment but less than the log end offset
+ // okay we are beyond the end of the last segment but less than the target offset,
+ // so return the empty message set
MessageSet.Empty
}
@@ -433,7 +436,7 @@ class Log(val dir: File,
// find any segments that match the user-supplied predicate UNLESS it is the final segment
// and it is empty (since we would just end up re-creating it
val lastSegment = activeSegment
- var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
+ val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
val numToDelete = deletable.size
if(numToDelete > 0) {
lock synchronized {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 2faa196..01fc306 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -315,7 +315,6 @@ private[log] class Cleaner(val id: Int,
* @param log The log being cleaned
* @param segments The group of segments being cleaned
* @param map The offset map to use for cleaning segments
- * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
* @param deleteHorizonMs The time to retain delete tombstones
*/
private[log] def cleanSegments(log: Log,
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 0d6926e..2af434f 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -99,7 +99,7 @@ class LogSegment(val log: FileMessageSet,
val mapping = index.lookup(offset)
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
-
+
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
@@ -108,7 +108,7 @@ class LogSegment(val log: FileMessageSet,
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
*
- * @return The message set read or null if the startOffset is larger than the largest offset in this log.
+ * @return The message set read or null if the startOffset is larger than the largest offset in this log
*/
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = {
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 3b15254..7846dda 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -92,12 +92,12 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var response: FetchResponse = null
try {
- trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+ trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
response = simpleConsumer.fetch(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning.get) {
- warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.getMessage))
+ warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
partitionMapLock synchronized {
partitionsWithError ++= partitionMap.keys
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0b668f2..53de1e8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -29,7 +29,6 @@ import kafka.controller.KafkaController
import kafka.utils.{Pool, SystemTime, Logging}
import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
import scala.collection._
import org.I0Itec.zkclient.ZkClient
@@ -127,22 +126,6 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse)))
}
- /**
- * Check if a partitionData from a produce request can unblock any
- * DelayedFetch requests.
- */
- def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) {
- val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes)
- trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
-
- // send any newly unblocked responses
- for(fetchReq <- satisfied) {
- val topicData = readMessageSets(fetchReq.fetch)
- val response = FetchResponse(fetchReq.fetch.correlationId, topicData)
- requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
- }
- }
-
private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = {
val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map {
case (topicAndPartition, offset) =>
@@ -171,24 +154,21 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle a produce request or offset commit request (which is really a specialized producer request)
*/
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
-
- val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) {
- val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
- (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
- }
- else {
- (request.requestObj.asInstanceOf[ProducerRequest], None)
- }
+ val (produceRequest, offsetCommitRequestOpt) =
+ if (request.requestId == RequestKeys.OffsetCommitKey) {
+ val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+ (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
+ } else {
+ (request.requestObj.asInstanceOf[ProducerRequest], None)
+ }
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+
val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)
val numPartitionsInError = localProduceResults.count(_.error.isDefined)
- produceRequest.data.foreach(partitionAndData =>
- maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes))
-
val allPartitionHaveReplicationFactorOne =
!produceRequest.data.keySet.exists(
m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1)
@@ -235,29 +215,22 @@ class KafkaApis(val requestChannel: RequestChannel,
val delayedRequest = new DelayedProduce(
producerRequestKeys,
request,
- statuses,
- produceRequest,
produceRequest.ackTimeoutMs.toLong,
+ produceRequest,
+ statuses,
offsetCommitRequestOpt)
- producerRequestPurgatory.watch(delayedRequest)
-
/*
* Replica fetch requests may have arrived (and potentially satisfied)
* delayedProduce requests while they were being added to the purgatory.
- * Here, we explicitly check if any of them can be satisfied.
+ * In this case, respond immediately
*/
- var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
- producerRequestKeys.foreach(key =>
- satisfiedProduceRequests ++=
- producerRequestPurgatory.update(key, key))
- debug(satisfiedProduceRequests.size +
- " producer requests unblocked during produce to local log.")
- satisfiedProduceRequests.foreach(_.respond())
-
- // we do not need the data anymore
- produceRequest.emptyData()
+ if (!producerRequestPurgatory.checkAndMaybeWatch(delayedRequest))
+ delayedRequest.respond()
}
+
+ // we do not need the data anymore
+ produceRequest.emptyData()
}
case class DelayedProduceResponseStatus(requiredOffset: Long,
@@ -288,13 +261,12 @@ class KafkaApis(val requestChannel: RequestChannel,
partitionAndData.map {case (topicAndPartition, messages) =>
try {
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
- val info =
- partitionOpt match {
- case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
- case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
- .format(topicAndPartition, brokerId))
-
- }
+ val info = partitionOpt match {
+ case Some(partition) =>
+ partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet])
+ case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
+ .format(topicAndPartition, brokerId))
+ }
val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1)
@@ -304,6 +276,13 @@ class KafkaApis(val requestChannel: RequestChannel,
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages)
BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
+ // some fetch request may be satisified after local log append
+ // note for some other cases:
+ // 1. broker make leader
+ // 2. leader shrink ISR
+ // HW may also be moved but we do not need to check for delayed fetch requests
+ unblockDelayedFetchRequests(new RequestKey(topicAndPartition))
+
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
.format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset))
ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset)
@@ -338,25 +317,22 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
- if(fetchRequest.isFromFollower) {
+ // if the fetch request comes from the follower, check if the partition HW can be updated
+ if(fetchRequest.isFromFollower)
maybeUpdatePartitionHw(fetchRequest)
- // after updating HW, some delayed produce requests may be unblocked
- var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
- fetchRequest.requestInfo.foreach {
- case (topicAndPartition, _) =>
- val key = new RequestKey(topicAndPartition)
- satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
- }
- debug("Replica %d fetch unblocked %d producer requests."
- .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
- satisfiedProduceRequests.foreach(_.respond())
- }
val dataRead = readMessageSets(fetchRequest)
val bytesReadable = dataRead.values.map(_.messages.sizeInBytes).sum
+ val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, currDataResponse) =>
+ errorIncurred || (currDataResponse.error != ErrorMapping.NoError))
+ // send the data immediately if 1) fetch request does not want to wait
+ // 2) fetch request does not require any data
+ // 3) has enough data to respond
+ // 4) some error happens while reading data
if(fetchRequest.maxWait <= 0 ||
+ fetchRequest.numPartitions <= 0 ||
bytesReadable >= fetchRequest.minBytes ||
- fetchRequest.numPartitions <= 0) {
+ errorReadingData) {
debug("Returning fetch response %s for fetch request with correlation id %d to client %s"
.format(dataRead.values.map(_.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId))
val response = new FetchResponse(fetchRequest.correlationId, dataRead)
@@ -366,17 +342,40 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchRequest.clientId))
// create a list of (topic, partition) pairs to use as keys for this delayed request
val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_))
- val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable)
- fetchRequestPurgatory.watch(delayedFetch)
+ val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, dataRead.map {
+ case (topicAndPartition, partitionData) =>
+ if (partitionData.messages.isInstanceOf[FileMessageSet])
+ topicAndPartition -> DelayedFetchRequestStatus(
+ partitionData.messages.asInstanceOf[FileMessageSet].baseOffset(),
+ partitionData.messages.asInstanceOf[FileMessageSet].startPosition())
+ else
+ topicAndPartition -> DelayedFetchRequestStatus(-1, -1)
+ })
+
+ /*
+ * Replica fetch requests may have arrived (and potentially satisfied)
+ * delayedFetch requests while they were being added to the purgatory.
+ * In this case, respond immediately
+ */
+ fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch)
}
}
private def maybeUpdatePartitionHw(fetchRequest: FetchRequest) {
debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest))
- fetchRequest.requestInfo.foreach(info => {
- val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset)
- replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
- })
+ fetchRequest.requestInfo.foreach {
+ case (topicAndPartition, partitionFetchInfo) =>
+ replicaManager.recordFollowerPosition(topicAndPartition.topic,
+ topicAndPartition.partition, fetchRequest.replicaId, partitionFetchInfo.offset)
+
+ // after updating HW, some delayed produce and fetch requests may be unblocked;
+ // note for some other cases:
+ // 1. broker make leader
+ // 2. leader shrink ISR
+ // HW may also be moved but we do not need to check for delayed fetch requests
+ unblockDelayedProduceRequests(new RequestKey(topicAndPartition))
+ unblockDelayedFetchRequests(new RequestKey(topicAndPartition))
+ }
}
/**
@@ -390,16 +389,14 @@ class KafkaApis(val requestChannel: RequestChannel,
case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
val partitionData =
try {
- val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
- BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
- BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
- if (!isFetchFromFollower) {
- new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
- } else {
+ val (messageSet, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
+ BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messageSet.sizeInBytes)
+ BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messageSet.sizeInBytes)
+ if (isFetchFromFollower) {
debug("Leader %d for partition [%s,%d] received fetch request from follower %d"
.format(brokerId, topic, partition, fetchRequest.replicaId))
- new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
}
+ new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messageSet)
} catch {
// NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
// since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
@@ -442,14 +439,14 @@ class KafkaApis(val requestChannel: RequestChannel,
None
else
Some(localReplica.highWatermark)
- val messages = localReplica.log match {
+ val messageSet = localReplica.log match {
case Some(log) =>
log.read(offset, maxSize, maxOffsetOpt)
case None =>
error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic, partition, brokerId))
MessageSet.Empty
}
- (messages, localReplica.highWatermark)
+ (messageSet, localReplica.highWatermark)
}
/**
@@ -636,6 +633,29 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
+ def unblockDelayedProduceRequests(key: RequestKey) {
+ val satisfied = producerRequestPurgatory.update(key)
+ debug("Request key %s unblocked %d producer requests."
+ .format(key.keyLabel, satisfied.size))
+ satisfied.foreach(_.respond())
+ }
+
+ /**
+ * Check if a partitionData from a produce request can unblock any
+ * DelayedFetch requests.
+ */
+ def unblockDelayedFetchRequests(key: RequestKey) {
+ val satisfied = fetchRequestPurgatory.update(key)
+ debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size))
+
+ // send any newly unblocked responses
+ for(fetchReq <- satisfied) {
+ fetchReq.respond()
+ }
+ }
+
+
+
def close() {
debug("Shutting down.")
fetchRequestPurgatory.shutdown()
@@ -660,56 +680,99 @@ class KafkaApis(val requestChannel: RequestChannel,
override def keyLabel = "%s-%d".format(topic, partition)
}
+ case class DelayedFetchRequestStatus(segmentBaseOffset: Long, fetchOffsetPosition: Int)
+
/**
- * A delayed fetch request
+ * A delayed fetch request, which is satisified (or more
+ * accurately, unblocked) -- if:
+ * Case A: This broker is no longer the leader for ANY of the partitions it tries to fetch
+ * - should return error.
+ * Case B: The fetch offset locates not on the last segment of the log
+ * - should return all the data on that segment.
+ * Case C: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
+ * - should return whatever data is available.
*/
- class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long)
+ class DelayedFetch(keys: Seq[RequestKey],
+ request: RequestChannel.Request,
+ delayMs: Long,
+ val fetch: FetchRequest,
+ val partitionFetchInfo: immutable.Map[TopicAndPartition, DelayedFetchRequestStatus])
extends DelayedRequest(keys, request, delayMs) {
- val bytesAccumulated = new AtomicLong(initialSize)
+
+ def isSatisfied() : Boolean = {
+ var accumulatedSize = 0
+ val fromFollower = fetch.isFromFollower
+ try {
+ partitionFetchInfo.foreach {
+ case (topicAndPartition, requestStatus) =>
+ if (requestStatus.fetchOffsetPosition >= 0) {
+ val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
+ val fetchableSegmentBaseOffset =
+ if (fromFollower) replica.logActiveSegmentBaseOffset.get()
+ else replica.logHWSegmentBaseOffset.get()
+ if (fetchableSegmentBaseOffset > requestStatus.segmentBaseOffset) {
+ debug("Satisfying fetch request %s since it is fetching inactive segments.".format(fetch))
+ return true
+ }
+ val fetchableEndPosition =
+ if (fromFollower) replica.logActiveSegmentSize.get()
+ else replica.logHWPosition.get()
+ accumulatedSize += fetchableEndPosition - requestStatus.fetchOffsetPosition
+ }
+ }
+ } catch {
+ case nle: NotLeaderForPartitionException =>
+ debug("Satisfying fetch request %s since leader has changed.".format(fetch))
+ return true
+ }
+
+ // unblocked if there are enough accumulated data
+ accumulatedSize >= fetch.minBytes
+ }
+
+ def respond() {
+ val topicData = readMessageSets(fetch)
+ val response = FetchResponse(fetch.correlationId, topicData)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
+ }
}
/**
* A holding pen for fetch requests waiting to be satisfied
*/
class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int)
- extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) {
+ extends RequestPurgatory[DelayedFetch](brokerId, purgeInterval) {
this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
/**
- * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
+ * Check if a specified delayed fetch request is satisfied
*/
- def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = {
- val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes)
- accumulatedSize >= delayedFetch.fetch.minBytes
- }
+ def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied()
/**
* When a request expires just answer it with whatever data is present
*/
def expire(delayed: DelayedFetch) {
debug("Expiring fetch request %s.".format(delayed.fetch))
- try {
- val topicData = readMessageSets(delayed.fetch)
- val response = FetchResponse(delayed.fetch.correlationId, topicData)
- val fromFollower = delayed.fetch.isFromFollower
- delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
- requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
- }
- catch {
- case e1: LeaderNotAvailableException =>
- debug("Leader changed before fetch request %s expired.".format(delayed.fetch))
- case e2: UnknownTopicOrPartitionException =>
- debug("Replica went offline before fetch request %s expired.".format(delayed.fetch))
- }
+ val fromFollower = delayed.fetch.isFromFollower
+ delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
+ delayed.respond()
}
}
+ /** A delayed produce request, which is satisified (or more
+ * accurately, unblocked) -- if for every partition it produce to:
+ * Case A: This broker is not the leader: unblock - should return error.
+ * Case B: This broker is the leader:
+ * B.1 - If there was a localError (when writing to the local log): unblock - should return error
+ * B.2 - else, at least requiredAcks replicas should be caught up to this request.
+ */
class DelayedProduce(keys: Seq[RequestKey],
request: RequestChannel.Request,
- val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus],
- produce: ProducerRequest,
delayMs: Long,
- offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
+ val produce: ProducerRequest,
+ val partitionStatus: immutable.Map[TopicAndPartition, DelayedProduceResponseStatus],
+ val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
extends DelayedRequest(keys, request, delayMs) with Logging {
// first update the acks pending variable according to the error code
@@ -745,47 +808,34 @@ class KafkaApis(val requestChannel: RequestChannel,
request, new BoundedByteBufferSend(response)))
}
- /**
- * Returns true if this delayed produce request is satisfied (or more
- * accurately, unblocked) -- this is the case if for every partition:
- * Case A: This broker is not the leader: unblock - should return error.
- * Case B: This broker is the leader:
- * B.1 - If there was a localError (when writing to the local log): unblock - should return error
- * B.2 - else, at least requiredAcks replicas should be caught up to this request.
- *
- * As partitions become acknowledged, we may be able to unblock
- * DelayedFetchRequests that are pending on those partitions.
- */
- def isSatisfied(followerFetchRequestKey: RequestKey) = {
- val topic = followerFetchRequestKey.topic
- val partitionId = followerFetchRequestKey.partition
- val fetchPartitionStatus = partitionStatus(TopicAndPartition(topic, partitionId))
- trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
- .format(topic, partitionId, fetchPartitionStatus.acksPending))
- if (fetchPartitionStatus.acksPending) {
- val partitionOpt = replicaManager.getPartition(topic, partitionId)
- val (hasEnough, errorCode) = partitionOpt match {
- case Some(partition) =>
- partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks)
- case None =>
- (false, ErrorMapping.UnknownTopicOrPartitionCode)
- }
- if (errorCode != ErrorMapping.NoError) {
- fetchPartitionStatus. acksPending = false
- fetchPartitionStatus.status.error = errorCode
- } else if (hasEnough) {
- fetchPartitionStatus.acksPending = false
- fetchPartitionStatus.status.error = ErrorMapping.NoError
- }
- if (!fetchPartitionStatus.acksPending) {
- val messageSizeInBytes = produce.topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition)
- maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes)
+ def isSatisfied() = {
+ // check for each partition if it still has pending acks
+ partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
+ trace("Checking producer request satisfaction for %s, acksPending = %b"
+ .format(topicAndPartition, fetchPartitionStatus.acksPending))
+ // skip those partitions that have already been satisfied
+ if (fetchPartitionStatus.acksPending) {
+ val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
+ val (hasEnough, errorCode) = partitionOpt match {
+ case Some(partition) =>
+ partition.checkEnoughReplicasReachOffset(
+ fetchPartitionStatus.requiredOffset,
+ produce.requiredAcks)
+ case None =>
+ (false, ErrorMapping.UnknownTopicOrPartitionCode)
+ }
+ if (errorCode != ErrorMapping.NoError) {
+ fetchPartitionStatus.acksPending = false
+ fetchPartitionStatus.status.error = errorCode
+ } else if (hasEnough) {
+ fetchPartitionStatus.acksPending = false
+ fetchPartitionStatus.status.error = ErrorMapping.NoError
+ }
}
}
// unblocked if there are no partitions with pending acks
val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
- trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied))
satisfied
}
}
@@ -794,12 +844,11 @@ class KafkaApis(val requestChannel: RequestChannel,
* A holding pen for produce requests waiting to be satisfied.
*/
private [kafka] class ProducerRequestPurgatory(purgeInterval: Int)
- extends RequestPurgatory[DelayedProduce, RequestKey](brokerId, purgeInterval) {
+ extends RequestPurgatory[DelayedProduce](brokerId, purgeInterval) {
this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
- protected def checkSatisfied(followerFetchRequestKey: RequestKey,
- delayedProduce: DelayedProduce) =
- delayedProduce.isSatisfied(followerFetchRequestKey)
+ protected def checkSatisfied(delayedProduce: DelayedProduce) =
+ delayedProduce.isSatisfied()
/**
* Handle an expired delayed request
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6a56a77..df811d1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit
object ReplicaManager {
val UnknownLogEndOffset = -1L
+ val UnknownLogPosition = -1
+ val UnknownlogSegmentBaseOffset = -1L
val HighWatermarkFilename = "replication-offset-checkpoint"
}
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index c064c5c..9092d7b 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -45,7 +45,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
*
* For us the key is generally a (topic, partition) pair.
* By calling
- * watch(delayedRequest)
+ * checkAndMaybeWatch(delayedRequest)
* we will add triggers for each of the given keys. It is up to the user to then call
* val satisfied = update(key, request)
* when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
@@ -61,18 +61,19 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
* this function handles delayed requests that have hit their time limit without being satisfied.
*
*/
-abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 10000)
+abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 10000)
extends Logging with KafkaMetricsGroup {
/* a list of requests watching each key */
- private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
+ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
- private val requestCounter = new AtomicInteger(0)
+ /* the number of requests being watched, duplicates added on different watchers are also counted */
+ private val watched = new AtomicInteger(0)
newGauge(
"PurgatorySize",
new Gauge[Int] {
- def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
+ def value = watched.get() + expiredRequestReaper.numRequests
}
)
@@ -91,33 +92,39 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
/**
* Add a new delayed request watching the contained keys
*/
- def watch(delayedRequest: T) {
- requestCounter.getAndIncrement()
-
+ def checkAndMaybeWatch(delayedRequest: T): Boolean = {
for(key <- delayedRequest.keys) {
- var lst = watchersFor(key)
- lst.add(delayedRequest)
+ val lst = watchersFor(key)
+ // the request could already be satisified and
+ // hence not added to watchers; in this case we can
+ // return immediately without trying to add to rest watchers
+ if(!lst.add(delayedRequest))
+ return false
}
+
+ // if it is indeed watched, add to the expire queue also
expiredRequestReaper.enqueue(delayedRequest)
+
+ true
}
/**
* Update any watchers and return a list of newly satisfied requests.
*/
- def update(key: Any, request: R): Seq[T] = {
+ def update(key: Any): Seq[T] = {
val w = watchersForKey.get(key)
if(w == null)
Seq.empty
else
- w.collectSatisfiedRequests(request)
+ w.collectSatisfiedRequests()
}
private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
/**
- * Check if this request satisfied this delayed request
+ * Check if this delayed request is already satisfied
*/
- protected def checkSatisfied(request: R, delayed: T): Boolean
+ protected def checkSatisfied(request: T): Boolean
/**
* Handle an expired delayed request
@@ -125,7 +132,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
protected def expire(delayed: T)
/**
- * Shutdown the expirey thread
+ * Shutdown the expire reaper thread
*/
def shutdown() {
expiredRequestReaper.shutdown()
@@ -135,16 +142,17 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
* A linked list of DelayedRequests watching some key with some associated
* bookkeeping logic.
*/
- private class Watchers {
-
-
+ private class Watchers(key: Any) {
private val requests = new util.ArrayList[T]
- def numRequests = requests.size
-
- def add(t: T) {
+ def add(t: T): Boolean = {
synchronized {
+ // atomically check satisfactory criteria and add to the watch list
+ if (checkSatisfied(t))
+ return false
requests.add(t)
+ watched.getAndIncrement()
+ return true
}
}
@@ -156,6 +164,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
val curr = iter.next
if(curr.satisfied.get()) {
iter.remove()
+ watched.getAndIncrement()
purged += 1
}
}
@@ -163,7 +172,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
}
}
- def collectSatisfiedRequests(request: R): Seq[T] = {
+ def collectSatisfiedRequests(): Seq[T] = {
val response = new mutable.ArrayBuffer[T]
synchronized {
val iter = requests.iterator()
@@ -175,9 +184,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
} else {
// synchronize on curr to avoid any race condition with expire
// on client-side.
- val satisfied = curr synchronized checkSatisfied(request, curr)
+ val satisfied = curr synchronized checkSatisfied(curr)
if(satisfied) {
iter.remove()
+ watched.getAndIncrement()
val updated = curr.satisfied.compareAndSet(false, true)
if(updated == true) {
response += curr
@@ -216,8 +226,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
expire(curr)
}
}
- if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge
- requestCounter.set(0)
+ if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge
val purged = purgeSatisfied()
debug("Purged %d requests from delay queue.".format(purged))
val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
@@ -266,10 +275,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
}
/**
- * Delete all expired events from the delay queue
+ * Delete all satisfied events from the delay queue and the watcher lists
*/
private def purgeSatisfied(): Int = {
var purged = 0
+
+ // purge the delayed queue
val iter = delayed.iterator()
while(iter.hasNext) {
val curr = iter.next()
@@ -278,6 +289,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
purged += 1
}
}
+
+ // purge the watcher lists
+ watchersForKey.values.foreach(_.purgeSatisfied())
+
purged
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1da1393..0d5315d 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -575,15 +575,15 @@ class LogTest extends JUnitSuite {
@Test
def testAppendMessageWithNullPayload() {
- var log = new Log(logDir,
+ val log = new Log(logDir,
LogConfig(),
recoveryPoint = 0L,
time.scheduler,
time)
log.append(new ByteBufferMessageSet(new Message(bytes = null)))
- val ms = log.read(0, 4096, None)
- assertEquals(0, ms.head.offset)
- assertTrue("Message payload should be null.", ms.head.message.isNull)
+ val messageSet = log.read(0, 4096, None)
+ assertEquals(0, messageSet.head.offset)
+ assertTrue("Message payload should be null.", messageSet.head.message.isNull)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
index 4f61f84..081203a 100644
--- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
@@ -46,17 +46,17 @@ class RequestPurgatoryTest extends JUnit3Suite {
def testRequestSatisfaction() {
val r1 = new DelayedRequest(Array("test1"), null, 100000L)
val r2 = new DelayedRequest(Array("test2"), null, 100000L)
- assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1", producerRequest1).size)
- purgatory.watch(r1)
- assertEquals("Still nothing satisfied", 0, purgatory.update("test1", producerRequest1).size)
- purgatory.watch(r2)
- assertEquals("Still nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
+ assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size)
+ assertTrue("r1 watched", purgatory.checkAndMaybeWatch(r1))
+ assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size)
+ assertTrue("r2 watched", purgatory.checkAndMaybeWatch(r2))
+ assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size)
purgatory.satisfied += r1
- assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1", producerRequest1))
- assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size)
+ assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1"))
+ assertEquals("Nothing satisfied", 0, purgatory.update("test1").size)
purgatory.satisfied += r2
- assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2))
- assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
+ assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2"))
+ assertEquals("Nothing satisfied", 0, purgatory.update("test2").size)
}
@Test
@@ -65,8 +65,8 @@ class RequestPurgatoryTest extends JUnit3Suite {
val r1 = new DelayedRequest(Array("test1"), null, expiration)
val r2 = new DelayedRequest(Array("test1"), null, 200000L)
val start = System.currentTimeMillis
- purgatory.watch(r1)
- purgatory.watch(r2)
+ assertTrue("r1 watched", purgatory.checkAndMaybeWatch(r1))
+ assertTrue("r2 watched", purgatory.checkAndMaybeWatch(r2))
purgatory.awaitExpiration(r1)
val elapsed = System.currentTimeMillis - start
assertTrue("r1 expired", purgatory.expired.contains(r1))
@@ -74,7 +74,7 @@ class RequestPurgatoryTest extends JUnit3Suite {
assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
}
- class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
+ class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] {
val satisfied = mutable.Set[DelayedRequest]()
val expired = mutable.Set[DelayedRequest]()
def awaitExpiration(delayed: DelayedRequest) = {
@@ -82,7 +82,7 @@ class RequestPurgatoryTest extends JUnit3Suite {
delayed.wait()
}
}
- def checkSatisfied(request: ProducerRequest, delayed: DelayedRequest): Boolean = satisfied.contains(delayed)
+ def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed)
def expire(delayed: DelayedRequest) {
expired += delayed
delayed synchronized {