From 4f4e1623ed2b8f626e687f9d199fb6e9aa5256f2 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 1 Sep 2014 14:48:57 -0700 Subject: [PATCH 1/3] dummy --- core/src/main/scala/kafka/api/FetchRequest.scala | 1 - core/src/main/scala/kafka/api/FetchResponse.scala | 10 +- .../main/scala/kafka/api/OffsetCommitRequest.scala | 24 +- .../src/main/scala/kafka/api/ProducerRequest.scala | 5 - .../main/scala/kafka/api/ProducerResponse.scala | 3 +- core/src/main/scala/kafka/cluster/Partition.scala | 47 ++- .../src/main/scala/kafka/common/ErrorMapping.scala | 2 + core/src/main/scala/kafka/log/Log.scala | 45 ++- .../kafka/network/BoundedByteBufferSend.scala | 4 +- .../src/main/scala/kafka/server/DelayedFetch.scala | 116 ++++--- .../main/scala/kafka/server/DelayedProduce.scala | 138 ++++---- .../scala/kafka/server/FetchRequestPurgatory.scala | 69 ---- core/src/main/scala/kafka/server/KafkaApis.scala | 311 ++++++------------ .../main/scala/kafka/server/OffsetManager.scala | 88 ++++- .../kafka/server/ProducerRequestPurgatory.scala | 69 ---- .../main/scala/kafka/server/ReplicaManager.scala | 345 +++++++++++++------- .../main/scala/kafka/server/RequestPurgatory.scala | 333 +++++++++---------- core/src/main/scala/kafka/utils/DelayedItem.scala | 8 +- .../server/HighwatermarkPersistenceTest.scala | 7 + .../unit/kafka/server/ISRExpirationTest.scala | 17 +- .../unit/kafka/server/ReplicaManagerTest.scala | 6 + .../unit/kafka/server/RequestPurgatoryTest.scala | 84 ++--- .../unit/kafka/server/ServerShutdownTest.scala | 2 +- .../scala/unit/kafka/server/SimpleFetchTest.scala | 270 +++++---------- 24 files changed, 964 insertions(+), 1040 deletions(-) delete mode 100644 core/src/main/scala/kafka/server/FetchRequestPurgatory.scala delete mode 100644 core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 51cdccf..56dab5f 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -30,7 +30,6 @@ import scala.collection.immutable.Map case class PartitionFetchInfo(offset: Long, fetchSize: Int) - object FetchRequest { val CurrentVersion = 0.shortValue val DefaultMaxWait = 0 diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index af93087..c2b84e6 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -25,6 +25,8 @@ import kafka.message.{MessageSet, ByteBufferMessageSet} import kafka.network.{MultiSend, Send} import kafka.api.ApiUtils._ +import scala.collection._ + object FetchResponsePartitionData { def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = { val error = buffer.getShort @@ -150,9 +152,11 @@ object FetchResponse { } } - -case class FetchResponse(correlationId: Int, - data: Map[TopicAndPartition, FetchResponsePartitionData]) { +/* + * Note that FetchResponse does not extend from RequestOrResponse as other responses does since it will + * be sent through the FetchResponseSend instead of the BoundedByteBufferSend. + */ +case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchResponsePartitionData]) { /** * Partitions the data into a map of maps (one for each topic). diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 861a6cf..050615c 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -78,28 +78,12 @@ case class OffsetCommitRequest(groupId: String, groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID, consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID) extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { + assert(versionId == 0 || versionId == 1, "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.") lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) - def filterLargeMetadata(maxMetadataSize: Int) = - requestInfo.filter(info => info._2.metadata == null || info._2.metadata.length <= maxMetadataSize) - - def responseFor(errorCode: Short, offsetMetadataMaxSize: Int) = { - val commitStatus = requestInfo.map {info => - (info._1, if (info._2.metadata != null && info._2.metadata.length > offsetMetadataMaxSize) - ErrorMapping.OffsetMetadataTooLargeCode - else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode) - ErrorMapping.ConsumerCoordinatorNotAvailableCode - else if (errorCode == ErrorMapping.NotLeaderForPartitionCode) - ErrorMapping.NotCoordinatorForConsumerCode - else - errorCode) - }.toMap - OffsetCommitResponse(commitStatus, correlationId) - } - def writeTo(buffer: ByteBuffer) { // Write envelope buffer.putShort(versionId) @@ -150,8 +134,10 @@ case class OffsetCommitRequest(groupId: String, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) - val errorResponse = responseFor(errorCode, Int.MaxValue) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + val commitStatus = requestInfo.mapValues(_ => errorCode) + val commitResponse = OffsetCommitResponse(commitStatus, correlationId) + + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(commitResponse))) } override def describe(details: Boolean): String = { diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index b2366e7..b062406 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -152,10 +152,5 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(",")) producerRequest.toString() } - - - def emptyData(){ - data.clear() - } } diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index a286272..5d1fac4 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -43,8 +43,7 @@ object ProducerResponse { case class ProducerResponseStatus(var error: Short, offset: Long) -case class ProducerResponse(correlationId: Int, - status: Map[TopicAndPartition, ProducerResponseStatus]) +case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse() { /** diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index ff106b4..1c62afc 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -230,7 +230,33 @@ class Partition(val topic: String, } } - def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) { + /** + * Update the log end offset of a certain replica of this partition + */ + def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) = { + getReplica(replicaId) match { + case Some(replica) => + replica.logEndOffset = offset + + // check if we need to expand ISR to include this replica + // if it is not in the ISR yet + maybeExpandIsr(replicaId) + + debug("Recorded replica %d LEO position %d for partition [%s,%d]." + .format(replicaId, offset.messageOffset, topic, partitionId)) + case None => + throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, + offset.messageOffset, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + } + } + + /** + * Check and maybe expand the ISR of the partition. + * + * This function can be triggered when a replica's LEO has incremented + */ + def maybeExpandIsr(replicaId: Int) { inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR leaderReplicaIfLocal() match { @@ -252,7 +278,10 @@ class Partition(val topic: String, updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } + // check if the HW of the partition can now be incremented + // since the replica maybe now in the ISR and its LEO has just incremented maybeIncrementLeaderHW(leaderReplica) + case None => // nothing to do if no longer leader } } @@ -286,8 +315,14 @@ class Partition(val topic: String, } /** - * There is no need to acquire the leaderIsrUpdate lock here since all callers of this private API acquire that lock - * @param leaderReplica + * Check and maybe increment the high watermark of the partition; + * this function can be triggered when + * + * 1. Partition ISR changed + * 2. Any replica's LEO changed (e.g. leader LEO changed and the ISR is down to 1) + * + * Note There is no need to acquire the leaderIsrUpdate lock here + * since all callers of this private API acquire that lock */ private def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) @@ -298,8 +333,8 @@ class Partition(val topic: String, debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) // some delayed requests may be unblocked after HW changed val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId) - replicaManager.unblockDelayedFetchRequests(requestKey) - replicaManager.unblockDelayedProduceRequests(requestKey) + replicaManager.tryCompleteDelayedFetch(requestKey) + replicaManager.tryCompleteDelayedProduce(requestKey) } else { debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) @@ -358,7 +393,7 @@ class Partition(val topic: String, val log = leaderReplica.log.get val info = log.append(messages, assignOffsets = true) // probably unblock some follower fetch requests since log end offset has been updated - replicaManager.unblockDelayedFetchRequests(new TopicPartitionRequestKey(this.topic, this.partitionId)) + replicaManager.tryCompleteDelayedFetch(new TopicPartitionRequestKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) info diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 3fae791..8232e35 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -79,4 +79,6 @@ object ErrorMapping { throw codeToException(code).newInstance() def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance() + + def exceptionNameFor(code: Short) : String = codeToException(code).getName() } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0ddf97b..fe13806 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -31,6 +31,40 @@ import scala.collection.JavaConversions import com.yammer.metrics.core.Gauge +object LogAppendInfo { + val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, -1, -1, false) +} + +/** + * Struct to hold various quantities we compute about each message set before appending to the log + * @param firstOffset The first offset in the message set + * @param lastOffset The last offset in the message set + * @param shallowCount The number of shallow messages + * @param validBytes The number of valid bytes + * @param codec The codec used in the message set + * @param offsetsMonotonic Are the offsets in this message set monotonically increasing + */ +case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) + +/* + * Result metadata of a log append operation on the log + */ +case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) { + def errorCode = error match { + case None => ErrorMapping.NoError + case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + } +} + +/* + * Result metadata of a log read operation on the log + */ +case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, error: Option[Throwable] = None) { + def errorCode = error match { + case None => ErrorMapping.NoError + case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + } +} /** * An append-only log for storing messages. @@ -302,17 +336,6 @@ class Log(val dir: File, } /** - * Struct to hold various quantities we compute about each message set before appending to the log - * @param firstOffset The first offset in the message set - * @param lastOffset The last offset in the message set - * @param shallowCount The number of shallow messages - * @param validBytes The number of valid bytes - * @param codec The codec used in the message set - * @param offsetsMonotonic Are the offsets in this message set monotonically increasing - */ - case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) - - /** * Validate the following: *
    *
  1. each message matches its CRC diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala index a624359..55ecac2 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala @@ -25,7 +25,7 @@ import kafka.api.RequestOrResponse @nonthreadsafe private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { - private var sizeBuffer = ByteBuffer.allocate(4) + private val sizeBuffer = ByteBuffer.allocate(4) // Avoid possibility of overflow for 2GB-4 byte buffer if(buffer.remaining > Int.MaxValue - sizeBuffer.limit) @@ -53,7 +53,7 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() - var written = channel.write(Array(sizeBuffer, buffer)) + val written = channel.write(Array(sizeBuffer, buffer)) // if we are done, mark it off if(!buffer.hasRemaining) complete = true diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index e0f14e2..68bf028 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -17,75 +17,107 @@ package kafka.server -import kafka.network.RequestChannel -import kafka.api.{FetchResponse, FetchRequest} -import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition} +import kafka.api.FetchResponsePartitionData +import kafka.api.PartitionFetchInfo +import kafka.common.UnknownTopicOrPartitionException +import kafka.common.NotLeaderForPartitionException +import kafka.common.TopicAndPartition -import scala.collection.immutable.Map -import scala.collection.Seq +import scala.collection._ + +case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo) { + + override def toString = "[startOffsetMetadata: " + startOffsetMetadata + ", " + + "fetchInfo: " + fetchInfo + "]" +} /** - * A delayed fetch request, which is satisfied (or more - * accurately, unblocked) -- if: - * Case A: This broker is no longer the leader for some partitions it tries to fetch - * - should return whatever data is available for the rest partitions. - * Case B: This broker is does not know of some partitions it tries to fetch - * - should return whatever data is available for the rest partitions. - * Case C: The fetch offset locates not on the last segment of the log - * - should return all the data on that segment. - * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes - * - should return whatever data is available. + * The fetch metadata maintained by the delayed produce request */ +case class FetchMetadata(fetchMinBytes: Int, + fetchOnlyLeader: Boolean, + fetchOnlyCommitted: Boolean, + fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) { + + override def toString = "[minBytes: " + fetchMinBytes + ", " + + "onlyLeader:" + fetchOnlyLeader + ", " + "onlyCommitted: " + fetchOnlyCommitted + ", " + "partitionStatus: " + fetchPartitionStatus + "]" +} -class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], - override val request: RequestChannel.Request, - override val delayMs: Long, - val fetch: FetchRequest, - private val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata]) - extends DelayedRequest(keys, request, delayMs) { +/** + * A delayed fetch request that can be created by the replica manager and watched + * in the fetch request purgatory + */ +class DelayedFetch(delayMs: Long, + fetchMetadata: FetchMetadata, + replicaManager: ReplicaManager, + callbackOnComplete: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) + extends DelayedRequest(delayMs) { - def isSatisfied(replicaManager: ReplicaManager) : Boolean = { + /** + * The request can be completed if: + * + * Case A: This broker is no longer the leader for some partitions it tries to fetch + * Case B: This broker is does not know of some partitions it tries to fetch + * Case C: The fetch offset locates not on the last segment of the log + * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + */ + override def tryComplete() : Boolean = { var accumulatedSize = 0 - val fromFollower = fetch.isFromFollower - partitionFetchOffsets.foreach { - case (topicAndPartition, fetchOffset) => + fetchMetadata.fetchPartitionStatus.foreach { + case (topicAndPartition, fetchStatus) => + val fetchOffset = fetchStatus.startOffsetMetadata try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) val endOffset = - if (fromFollower) - replica.logEndOffset - else + if (fetchMetadata.fetchOnlyCommitted) replica.highWatermark + else + replica.logEndOffset if (endOffset.offsetOnOlderSegment(fetchOffset)) { // Case C, this can happen when the new follower replica fetching on a truncated leader - debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition)) - return true + debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicAndPartition)) + return forceComplete() } else if (fetchOffset.offsetOnOlderSegment(endOffset)) { - // Case C, this can happen when the folloer replica is lagging too much - debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch)) - return true + // Case C, this can happen when the follower replica is lagging too much + debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata)) + return forceComplete() } else if (fetchOffset.precedes(endOffset)) { - accumulatedSize += endOffset.positionDiff(fetchOffset) + // we need take the partition fetch size as upper bound when accumulating the bytes + accumulatedSize += math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize) } } } catch { case utpe: UnknownTopicOrPartitionException => // Case A - debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetch)) - return true + debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata)) + return forceComplete() case nle: NotLeaderForPartitionException => // Case B - debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetch)) - return true + debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata)) + return forceComplete() } } // Case D - accumulatedSize >= fetch.minBytes + if (accumulatedSize >= fetchMetadata.fetchMinBytes) + forceComplete() + else + false } - def respond(replicaManager: ReplicaManager): FetchResponse = { - val topicData = replicaManager.readMessageSets(fetch) - FetchResponse(fetch.correlationId, topicData.mapValues(_.data)) + /** + * Upon completion, read whatever data is available and pass to the complete callback + */ + override def complete() { + val logReadResults = replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader, + fetchMetadata.fetchOnlyCommitted, + fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo)) + + val fetchPartitionData = logReadResults.mapValues(result => + FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) + + callbackOnComplete(fetchPartitionData) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 9481508..cc40c51 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -17,99 +17,109 @@ package kafka.server -import kafka.api._ + +import kafka.api.ProducerResponseStatus import kafka.common.ErrorMapping import kafka.common.TopicAndPartition -import kafka.utils.Logging -import kafka.network.RequestChannel import scala.Some -import scala.collection.immutable.Map -import scala.collection.Seq - -/** A delayed produce request, which is satisfied (or more - * accurately, unblocked) -- if for every partition it produce to: - * Case A: This broker is not the leader: unblock - should return error. - * Case B: This broker is the leader: - * B.1 - If there was a localError (when writing to the local log): unblock - should return error - * B.2 - else, at least requiredAcks replicas should be caught up to this request. - */ - -class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], - override val request: RequestChannel.Request, - override val delayMs: Long, - val produce: ProducerRequest, - val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus], - val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) - extends DelayedRequest(keys, request, delayMs) with Logging { +import scala.collection._ - // first update the acks pending variable according to the error code - partitionStatus foreach { case (topicAndPartition, delayedStatus) => - if (delayedStatus.responseStatus.error == ErrorMapping.NoError) { - // Timeout error state will be cleared when required acks are received - delayedStatus.acksPending = true - delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode - } else { - delayedStatus.acksPending = false - } +case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) { + @volatile var acksPending = false - trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) - } + override def toString = "[acksPending: %b, error: %d, startOffset: %d, requiredOffset: %d]" + .format(acksPending, responseStatus.error, responseStatus.offset, requiredOffset) +} - def respond(offsetManager: OffsetManager): RequestOrResponse = { - val responseStatus = partitionStatus.mapValues(status => status.responseStatus) +/** + * The produce metadata maintained by the delayed produce request + */ +case class ProduceMetadata(produceRequiredAcks: Short, + produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) { - val errorCode = responseStatus.find { case (_, status) => - status.error != ErrorMapping.NoError - }.map(_._2.error).getOrElse(ErrorMapping.NoError) + override def toString = "[requiredAcks: %d, partitionStatus: %s]" + .format(produceRequiredAcks, produceStatus) +} - if (errorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) +/** + * A delayed produce request that can be created by the replica manager and watched + * in the produce request purgatory + */ +class DelayedProduce(delayMs: Long, + produceMetadata: ProduceMetadata, + replicaManager: ReplicaManager, + callbackOnComplete: Map[TopicAndPartition, ProducerResponseStatus] => Unit) + extends DelayedRequest(delayMs) { + + // first update the acks pending variable according to the error code + produceMetadata.produceStatus.foreach { case (topicAndPartition, status) => + if (status.responseStatus.error == ErrorMapping.NoError) { + // Timeout error state will be cleared when required acks are received + status.acksPending = true + status.responseStatus.error = ErrorMapping.RequestTimedOutCode + } else { + status.acksPending = false } - val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) + trace("Initial partition status for %s is %s".format(topicAndPartition, status)) + } - response + produceMetadata.produceStatus.foreach { case (topicAndPartition, status) => + trace("Debugging: initial partition status for %s is %s".format(topicAndPartition, status)) } - def isSatisfied(replicaManager: ReplicaManager) = { + /** + * The delayed produce request can be completed if every partition + * it produces to is satisfied by one of the following: + * + * Case A: This broker is no longer the leader: should return error + * Case B: This broker is the leader: + * B.1 - If there was a localError (when writing to the local log): should return error + * B.2 - else, at least requiredAcks replicas should be caught up to this request. + */ + override def tryComplete(): Boolean = { // check for each partition if it still has pending acks - partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) => - trace("Checking producer request satisfaction for %s, acksPending = %b" - .format(topicAndPartition, fetchPartitionStatus.acksPending)) + produceMetadata.produceStatus.foreach { case (topicAndPartition, status) => + trace("Checking produce satisfaction for %s, current status %s" + .format(topicAndPartition, status)) // skip those partitions that have already been satisfied - if (fetchPartitionStatus.acksPending) { + if (status.acksPending) { val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => partition.checkEnoughReplicasReachOffset( - fetchPartitionStatus.requiredOffset, - produce.requiredAcks) + status.requiredOffset, + produceMetadata.produceRequiredAcks) case None => + // Case A (false, ErrorMapping.UnknownTopicOrPartitionCode) } if (errorCode != ErrorMapping.NoError) { - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.responseStatus.error = errorCode + // Case B.1 + status.acksPending = false + status.responseStatus.error = errorCode } else if (hasEnough) { - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError + // Case B.2 + status.acksPending = false + status.responseStatus.error = ErrorMapping.NoError } } } - // unblocked if there are no partitions with pending acks - val satisfied = ! partitionStatus.exists(p => p._2.acksPending) - satisfied + // check if each partition has satisfied at lease one of case A and case B + if (! produceMetadata.produceStatus.values.exists(p => p.acksPending)) + forceComplete() + else + false } -} - -case class DelayedProduceResponseStatus(val requiredOffset: Long, - val responseStatus: ProducerResponseStatus) { - @volatile var acksPending = false - override def toString = - "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( - acksPending, responseStatus.error, responseStatus.offset, requiredOffset) + /** + * Upon completion, return the current response status along with the error code per partition + */ + override def complete() { + val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) + callbackOnComplete(responseStatus) + } } + diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala deleted file mode 100644 index ed13188..0000000 --- a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.metrics.KafkaMetricsGroup -import kafka.network.RequestChannel -import kafka.api.FetchResponseSend - -import java.util.concurrent.TimeUnit - -/** - * The purgatory holding delayed fetch requests - */ -class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) - extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { - this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) - - private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { - private val metricPrefix = if (forFollower) "Follower" else "Consumer" - - val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) - private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) - - private def recordDelayedFetchExpired(forFollower: Boolean) { - val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - - metrics.expiredRequestMeter.mark() - } - - /** - * Check if a specified delayed fetch request is satisfied - */ - def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager) - - /** - * When a delayed fetch request expires just answer it with whatever data is present - */ - def expire(delayedFetch: DelayedFetch) { - debug("Expiring fetch request %s.".format(delayedFetch.fetch)) - val fromFollower = delayedFetch.fetch.isFromFollower - recordDelayedFetchExpired(fromFollower) - respond(delayedFetch) - } - - // TODO: purgatory should not be responsible for sending back the responses - def respond(delayedFetch: DelayedFetch) { - val response = delayedFetch.respond(replicaManager) - requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response))) - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c584b55..32696c1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -20,7 +20,6 @@ package kafka.server import kafka.api._ import kafka.common._ import kafka.log._ -import kafka.message._ import kafka.network._ import kafka.admin.AdminUtils import kafka.network.RequestChannel.Response @@ -42,12 +41,8 @@ class KafkaApis(val requestChannel: RequestChannel, val config: KafkaConfig, val controller: KafkaController) extends Logging { - val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel) - val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel) - // TODO: the following line will be removed in 0.9 - replicaManager.initWithRequestPurgatory(producerRequestPurgatory, fetchRequestPurgatory) - var metadataCache = new MetadataCache this.logIdent = "[KafkaApi-%d] ".format(brokerId) + val metadataCache = new MetadataCache /** * Top-level method that handles all requests and multiplexes to the right api @@ -56,7 +51,7 @@ class KafkaApis(val requestChannel: RequestChannel, try{ trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { - case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) + case RequestKeys.ProduceKey => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) @@ -64,7 +59,7 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) - case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request) + case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) @@ -123,175 +118,81 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) } - private def producerRequestFromOffsetCommit(offsetCommitRequest: OffsetCommitRequest) = { - val msgs = offsetCommitRequest.filterLargeMetadata(config.offsetMetadataMaxSize).map { - case (topicAndPartition, offset) => - new Message( - bytes = OffsetManager.offsetCommitValue(offset), - key = OffsetManager.offsetCommitKey(offsetCommitRequest.groupId, topicAndPartition.topic, topicAndPartition.partition) - ) - }.toSeq - - val producerData = mutable.Map( - TopicAndPartition(OffsetManager.OffsetsTopicName, offsetManager.partitionFor(offsetCommitRequest.groupId)) -> - new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, msgs:_*) - ) - - val request = ProducerRequest( - correlationId = offsetCommitRequest.correlationId, - clientId = offsetCommitRequest.clientId, - requiredAcks = config.offsetCommitRequiredAcks, - ackTimeoutMs = config.offsetCommitTimeoutMs, - data = producerData) - trace("Created producer request %s for offset commit request %s.".format(request, offsetCommitRequest)) - request - } /** - * Handle a produce request or offset commit request (which is really a specialized producer request) + * Handle an offset commit request */ - def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { - val (produceRequest, offsetCommitRequestOpt) = - if (request.requestId == RequestKeys.OffsetCommitKey) { - val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) - } else { - (request.requestObj.asInstanceOf[ProducerRequest], None) - } - - val sTime = SystemTime.milliseconds - val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) - debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - - val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) - - val numPartitionsInError = localProduceResults.count(_.error.isDefined) - if(produceRequest.requiredAcks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since - // no response is expected by the producer the handler will send a close connection response to the socket server - // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata - if (numPartitionsInError != 0) { - info(("Send the close connection response due to error handling produce request " + - "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") - .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) - requestChannel.closeConnection(request.processor, request) - } else { - - if (firstErrorCode == ErrorMapping.NoError) - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) - - if (offsetCommitRequestOpt.isDefined) { - val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) - } else - requestChannel.noOperation(request.processor, request) - } - } else if (produceRequest.requiredAcks == 1 || - produceRequest.numPartitions <= 0 || - numPartitionsInError == produceRequest.numPartitions) { - - if (firstErrorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + def handleOffsetCommitRequest(request: RequestChannel.Request) { + val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + + // the callback for sending the response + def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { + commitStatus.foreach { case (topicAndPartition, errorCode) => + // Here we only print warnings for known errors; if it is unknown, it will cause + // an error message in the replica manager already and hence can be ignored here + if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) { + warn("Offset commit request with correlation id %d from client %s on partition %s failed due to %s" + .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId, + topicAndPartition, ErrorMapping.exceptionNameFor(errorCode))) + } } - val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap - val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize)) - .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) - + val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) - } else { - // create a list of (topic, partition) pairs to use as keys for this delayed request - val producerRequestKeys = produceRequest.data.keys.map( - topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq - val statuses = localProduceResults.map(r => - r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap - val delayedRequest = new DelayedProduce( - producerRequestKeys, - request, - produceRequest.ackTimeoutMs.toLong, - produceRequest, - statuses, - offsetCommitRequestOpt) - - // add the produce request for watch if it's not satisfied, otherwise send the response back - val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest) - if (satisfiedByMe) - producerRequestPurgatory.respond(delayedRequest) } - // we do not need the data anymore - produceRequest.emptyData() - } - - case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { - def this(key: TopicAndPartition, throwable: Throwable) = - this(key, -1L, -1L, Some(throwable)) - - def errorCode = error match { - case None => ErrorMapping.NoError - case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]]) - } + // call offset manager to store offsets + offsetManager.storeOffsets( + offsetCommitRequest.groupId, + offsetCommitRequest.consumerId, + offsetCommitRequest.groupGenerationId, + offsetCommitRequest.requestInfo, + sendResponseCallback) } /** - * Helper method for handling a parsed producer request + * Handle a produce request */ - private def appendToLocalLog(producerRequest: ProducerRequest, isOffsetCommit: Boolean): Iterable[ProduceResult] = { - val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data - trace("Append [%s] to local log ".format(partitionAndData.toString)) - partitionAndData.map {case (topicAndPartition, messages) => - try { - if (Topic.InternalTopics.contains(topicAndPartition.topic) && - !(isOffsetCommit && topicAndPartition.topic == OffsetManager.OffsetsTopicName)) { - throw new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic)) - } - val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) - val info = partitionOpt match { - case Some(partition) => - partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) - case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" - .format(topicAndPartition, brokerId)) + def handleProducerRequest(request: RequestChannel.Request) { + val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] + + // the callback for sending the response + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + var errorInResponse = false + responseStatus.foreach { case (topicAndPartition, status) => + // Here we only print warnings for known errors; if it is unknown, it will cause + // an error message in the replica manager + if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { + warn("Produce request with correlation id %d from client %s on partition %s failed due to %s" + .format(produceRequest.correlationId, produceRequest.clientId, + topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) + errorInResponse = true } + } - val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) - - // update stats for successfully appended bytes and messages as bytesInRate and messageInRate - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) - - trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" - .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) - ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset) - } catch { - // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException - // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request - // for a partition it is the leader for - case e: KafkaStorageException => - fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) - Runtime.getRuntime.halt(1) - null - case ite: InvalidTopicException => - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - producerRequest.correlationId, producerRequest.clientId, topicAndPartition, ite.getMessage)) - new ProduceResult(topicAndPartition, ite) - case utpe: UnknownTopicOrPartitionException => - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage)) - new ProduceResult(topicAndPartition, utpe) - case nle: NotLeaderForPartitionException => - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) - new ProduceResult(topicAndPartition, nle) - case e: Throwable => - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() - error("Error processing ProducerRequest with correlation id %d from client %s on partition %s" - .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition), e) - new ProduceResult(topicAndPartition, e) - } + if (produceRequest.requiredAcks == 0) { + // no operation needed if producer request.required.acks = 0; however, if there is any error in handling + // the request, since no response is expected by the producer, the server will close socket server so that + // the producer client will know that some error has happened and will refresh its metadata + if (errorInResponse) { + info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" + .format(produceRequest.correlationId, produceRequest.clientId)) + requestChannel.closeConnection(request.processor, request) + } else { + requestChannel.noOperation(request.processor, request) + } + } else { + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } } + + // call the replica manager to append messages to the replicas + replicaManager.appendMessages( + produceRequest.ackTimeoutMs.toLong, + produceRequest.requiredAcks, + produceRequest.data, + sendResponseCallback) } /** @@ -299,59 +200,38 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - val dataRead = replicaManager.readMessageSets(fetchRequest) - - // if the fetch request comes from the follower, - // update its corresponding log end offset - if(fetchRequest.isFromFollower) - recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset)) - - // check if this fetch request can be satisfied right away - val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum - val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) => - errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError)) - // send the data immediately if 1) fetch request does not want to wait - // 2) fetch request does not require any data - // 3) has enough data to respond - // 4) some error happens while reading data - if(fetchRequest.maxWait <= 0 || - fetchRequest.numPartitions <= 0 || - bytesReadable >= fetchRequest.minBytes || - errorReadingData) { - debug("Returning fetch response %s for fetch request with correlation id %d to client %s" - .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) - } else { - debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, - fetchRequest.clientId)) - // create a list of (topic, partition) pairs to use as keys for this delayed request - val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, - dataRead.mapValues(_.offset)) - - // add the fetch request for watch if it's not satisfied, otherwise send the response back - val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) - if (satisfiedByMe) - fetchRequestPurgatory.respond(delayedFetch) - } - } - private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { - debug("Record follower log end offsets: %s ".format(offsets)) - offsets.foreach { - case (topicAndPartition, offset) => - replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic, - topicAndPartition.partition, replicaId, offset) + // the callback for sending the response + def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { + responsePartitionData.foreach { case (topicAndPartition, data) => + // Here we only print warnings for known errors; if it is unknown, it will cause + // an error message in the replica manager already and hence can be ignored here + if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) { + warn("Fetch request with correlation id %d from client %s on partition %s failed due to %s" + .format(fetchRequest.correlationId, fetchRequest.clientId, + topicAndPartition, ErrorMapping.exceptionNameFor(data.error))) + } - // for producer requests with ack > 1, we need to check - // if they can be unblocked after some follower's log end offsets have moved - replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition)) + // record the bytes out metrics only when the response is being sent + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) + } + + val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } + + // call the replica manager to append messages to the replicas + replicaManager.fetchMessages( + fetchRequest.maxWait.toLong, + fetchRequest.replicaId, + fetchRequest.minBytes, + fetchRequest.requestInfo, + sendResponseCallback) } /** - * Service the offset request API + * Handle a offset request request */ def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] @@ -411,7 +291,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { + private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null if(segsArray.last.size > 0) @@ -484,7 +364,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Service the topic metadata request API + * Handle a topic metadata request */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] @@ -496,7 +376,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /* - * Service the Offset fetch API + * Handle an offset fetch request */ def handleOffsetFetchRequest(request: RequestChannel.Request) { val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] @@ -511,7 +391,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /* - * Service the consumer metadata API + * Handle a consumer metadata request */ def handleConsumerMetadataRequest(request: RequestChannel.Request) { val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest] @@ -536,9 +416,8 @@ class KafkaApis(val requestChannel: RequestChannel, } def close() { - debug("Shutting down.") - fetchRequestPurgatory.shutdown() - producerRequestPurgatory.shutdown() + // TODO currently closing the API is an no-op since the API no longer maintain any modules + // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer debug("Shut down complete.") } } diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 43eb2a3..7b0e29d 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge import org.I0Itec.zkclient.ZkClient +import kafka.api.ProducerResponseStatus /** @@ -192,13 +193,88 @@ class OffsetManager(val config: OffsetManagerConfig, offsetsCache.put(key, offsetAndMetadata) } - def putOffsets(group: String, offsets: Map[TopicAndPartition, OffsetAndMetadata]) { - // this method is called _after_ the offsets have been durably appended to the commit log, so there is no need to - // check for current leadership as we do for the offset fetch - trace("Putting offsets %s for group %s in offsets partition %d.".format(offsets, group, partitionFor(group))) - offsets.foreach { case (topicAndPartition, offsetAndMetadata) => - putOffset(GroupTopicPartition(group, topicAndPartition), offsetAndMetadata) + /** + * Store offsets by appending it to the replicated log and then inserting to cache + */ + // TODO: generation id and consumer id is needed by coordinator to do consumer checking in the future + def storeOffsets(groupName: String, + consumerId: String, + generationId: Int, + offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + callbackOnComplete: immutable.Map[TopicAndPartition, Short] => Unit) { + + // first filter out partitions with offset metadata size exceeding limit + // TODO: in the future we may want to only support atomic commit and hence fail the whole commit + var commitStatus = offsetMetadata.mapValues { offsetAndMetadata => + if (offsetAndMetadata.metadata != null && offsetAndMetadata.metadata.length() > config.maxMetadataSize) + ErrorMapping.OffsetMetadataTooLargeCode + else + ErrorMapping.NoError + } + + val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => + commitStatus.get(topicAndPartition).get == ErrorMapping.NoError + } + + // construct the message set to append + val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => + new Message( + key = OffsetManager.offsetCommitKey(groupName, topicAndPartition.topic, topicAndPartition.partition), + bytes = OffsetManager.offsetCommitValue(offsetAndMetadata) + ) + }.toSeq + + val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupName)) + + val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> + new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) + + // set the callback function to insert offsets into cache after log append completed + def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + // the append response should only contain the topics partition + if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) + throw new IllegalStateException("Append status %s should only have one partition %s" + .format(responseStatus, offsetTopicPartition)) + + // construct the commit response status and insert + // the offset and metadata to cache iff the append status has no error + val status = responseStatus(offsetTopicPartition) + + if (status.error == ErrorMapping.NoError) { + filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => + putOffset(GroupTopicPartition(groupName, topicAndPartition), offsetAndMetadata) + } + } else { + debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s" + .format(filteredOffsetMetadata, groupName, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error))) + + // update the commit status error code with the corresponding log append error code + val commitErrorCode = + if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) + ErrorMapping.ConsumerCoordinatorNotAvailableCode + else if (status.error == ErrorMapping.NotLeaderForPartitionCode) + ErrorMapping.NotCoordinatorForConsumerCode + else + status.error + + commitStatus = commitStatus.mapValues { case errorCode => + if (errorCode == ErrorMapping.NoError) + commitErrorCode + else + errorCode + } + } + + // finally trigger the callback logic passed from the API layer + callbackOnComplete(commitStatus) } + + // call replica manager to append the offset messages + replicaManager.appendMessages( + config.offsetCommitTimeoutMs.toLong, + config.offsetCommitRequiredAcks, + offsetsAndMetadataMessageSet, + putCacheCallback) } /** diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala deleted file mode 100644 index d4a7d4a..0000000 --- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.metrics.KafkaMetricsGroup -import kafka.utils.Pool -import kafka.network.{BoundedByteBufferSend, RequestChannel} - -import java.util.concurrent.TimeUnit - -/** - * The purgatory holding delayed producer requests - */ -class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) - extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { - this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) - - private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup { - val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val producerRequestMetricsForKey = { - val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") - new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory)) - } - - private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics - - private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) { - val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) - List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) - } - - /** - * Check if a specified delayed fetch request is satisfied - */ - def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager) - - /** - * When a delayed produce request expires answer it with possible time out error codes - */ - def expire(delayedProduce: DelayedProduce) { - debug("Expiring produce request %s.".format(delayedProduce.produce)) - for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) - recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition)) - respond(delayedProduce) - } - - // TODO: purgatory should not be responsible for sending back the responses - def respond(delayedProduce: DelayedProduce) { - val response = delayedProduce.respond(offsetManager) - requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response))) - } -} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 68758e3..085f076 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -20,11 +20,11 @@ import kafka.api._ import kafka.common._ import kafka.utils._ import kafka.cluster.{Broker, Partition, Replica} -import kafka.log.LogManager +import kafka.log.{LogReadResult, LogAppendResult, LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.common.TopicAndPartition -import kafka.message.MessageSet +import kafka.message.{ByteBufferMessageSet, MessageSet} import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} @@ -43,10 +43,7 @@ object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" } -case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset: LogOffsetMetadata) - - -class ReplicaManager(val config: KafkaConfig, +class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, scheduler: Scheduler, @@ -64,8 +61,9 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger - var producerRequestPurgatory: ProducerRequestPurgatory = null - var fetchRequestPurgatory: FetchRequestPurgatory = null + val producerRequestPurgatory = new RequestPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests) + val fetchRequestPurgatory = new RequestPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) + newGauge( "LeaderCount", @@ -100,37 +98,27 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Initialize the replica manager with the request purgatory + * Try to complete some delayed produce requests with the request key; + * this can be triggered when: * - * TODO: will be removed in 0.9 where we refactor server structure - */ - - def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) { - this.producerRequestPurgatory = producerRequestPurgatory - this.fetchRequestPurgatory = fetchRequestPurgatory - } - - /** - * Unblock some delayed produce requests with the request key + * 1. The partition HW has changed (for acks = -1). + * 2. A follower replica's fetch operation is received (for acks > 1) */ - def unblockDelayedProduceRequests(key: DelayedRequestKey) { - val satisfied = producerRequestPurgatory.update(key) - debug("Request key %s unblocked %d producer requests." - .format(key.keyLabel, satisfied.size)) - - // send any newly unblocked responses - satisfied.foreach(producerRequestPurgatory.respond(_)) + def tryCompleteDelayedProduce(key: DelayedRequestKey) { + val completed = producerRequestPurgatory.checkAndComplete(key) + debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed)) } /** - * Unblock some delayed fetch requests with the request key + * Try to complete some delayed fetch requests with the request key; + * this can be triggered when: + * + * 1. The partition HW has changed; + * 2. A new message set is appended to the local log (for follower fetch) */ - def unblockDelayedFetchRequests(key: DelayedRequestKey) { - val satisfied = fetchRequestPurgatory.update(key) - debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size)) - - // send any newly unblocked responses - satisfied.foreach(fetchRequestPurgatory.respond(_)) + def tryCompleteDelayedFetch(key: DelayedRequestKey) { + val completed = fetchRequestPurgatory.checkAndComplete(key) + debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed)) } def startup() { @@ -237,74 +225,206 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Read from all the offset details given and return a map of - * (topic, partition) -> PartitionData + * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; + * the callback function will be triggered either when timeout or the required acks are satisfied */ - def readMessageSets(fetchRequest: FetchRequest) = { - val isFetchFromFollower = fetchRequest.isFromFollower - fetchRequest.requestInfo.map - { - case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => - val partitionDataAndOffsetInfo = - try { - val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) - BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) - if (isFetchFromFollower) { - debug("Partition [%s,%d] received fetch request from follower %d" - .format(topic, partition, fetchRequest.replicaId)) - } - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, fetchInfo.messageSet), fetchInfo.fetchOffset) - } catch { - // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException - // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request - // for a partition it is the leader for - case utpe: UnknownTopicOrPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) - case nle: NotLeaderForPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) - case t: Throwable => - BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() - error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s" - .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage)) - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) - } - (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) + def appendMessages(timeout: Long, + requiredAcks: Short, + messagesPerPartition: Map[TopicAndPartition, MessageSet], + responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { + + val sTime = SystemTime.milliseconds + val localProduceResults = appendToLocalLog(messagesPerPartition) + debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) + + val produceStatus = localProduceResults.map{ case (topicAndPartition, result) => + topicAndPartition -> + ProducePartitionStatus( + result.info.lastOffset + 1, // required offset + ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status + } + + if(requiredAcks == 0 || + requiredAcks == 1 || + messagesPerPartition.size <= 0 || + localProduceResults.values.count(_.error.isDefined) == messagesPerPartition.size) { + // in case of the following we can respond immediately: + // + // 1. required acks = 0 or 1 + // 2. there is no data to append + // 3. all partition appends have failed + val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) + responseCallback(produceResponseStatus) + } else { + // create delayed produce operation + val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) + val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) + + // create a list of (topic, partition) pairs to use as keys for this delayed request + val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionRequestKey(_)).toSeq + + // try to complete the request immediately, otherwise put it into the purgatory + // this is because while the delayed request is being created, new requests may + // arrive which can make this request completable. + producerRequestPurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + } + } + + /** + * Append the messages to the local replica logs + */ + private def appendToLocalLog(messagesPerPartition: Map[TopicAndPartition, MessageSet]): Map[TopicAndPartition, LogAppendResult] = { + trace("Append [%s] to local log ".format(messagesPerPartition)) + messagesPerPartition.map { case (topicAndPartition, messages) => + try { + val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition) + val info = partitionOpt match { + case Some(partition) => + partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) + case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" + .format(topicAndPartition, localBrokerId)) + } + + val numAppendedMessages = + if (info.firstOffset == -1L || info.lastOffset == -1L) + 0 + else + info.lastOffset - info.firstOffset + 1 + + // update stats for successfully appended bytes and messages as bytesInRate and messageInRate + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) + BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + + trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" + .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) + (topicAndPartition, LogAppendResult(info)) + } catch { + // NOTE: Failed produce requests metric is not incremented for known exceptions + // it is supposed to indicate un-expected failures of a broker in handling a produce request + case e: KafkaStorageException => + fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) + Runtime.getRuntime.halt(1) + (topicAndPartition, null) + case utpe: UnknownTopicOrPartitionException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) + case nle: NotLeaderForPartitionException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) + case e: Throwable => + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + error("Error processing append operation on partition %s".format(topicAndPartition), e) + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) + } + } + } + + /** + * Fetch messages from the leader replica, and wait until enough data can be fetched and return; + * the callback function will be triggered either when timeout or required fetch info is satisfied + */ + def fetchMessages(timeout: Long, + replicaId: Int, + fetchMinBytes: Int, + fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], + responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { + + val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId + val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) + + // read from local logs + val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo) + + // if the fetch comes from the follower, + // update its corresponding log end offset + if(Request.isValidBrokerId(replicaId)) + updateFollowerLEOs(replicaId, logReadResults.mapValues(_.info.fetchOffset)) + + // check if this fetch request can be satisfied right away + val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum + val errorReadingData = logReadResults.values.foldLeft(false) ((errorIncurred, readResult) => + errorIncurred || (readResult.errorCode != ErrorMapping.NoError)) + + // respond immediately if 1) fetch request does not want to wait + // 2) fetch request does not require any data + // 3) has enough data to respond + // 4) some error happens while reading data + if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) { + val fetchPartitionData = logReadResults.mapValues(result => + FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) + responseCallback(fetchPartitionData) + } else { + // construct the fetch results from the read results + val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => + (topicAndPartition, FetchPartitionStatus(result.info.fetchOffset, fetchInfo.get(topicAndPartition).get)) + } + val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, fetchPartitionStatus) + val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback) + + // create a list of (topic, partition) pairs to use as keys for this delayed request + val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionRequestKey(_)).toSeq + + // try to complete the request immediately, otherwise put it into the purgatory; + // this is because while the delayed request is being created, new requests may + // arrive which can make this request completable. + fetchRequestPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } } /** * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, - partition: Int, - offset: Long, - maxSize: Int, - fromReplicaId: Int): (FetchDataInfo, Long) = { - // check if the current broker is the leader for the partitions - val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) - getReplicaOrException(topic, partition) - else - getLeaderReplicaIfLocal(topic, partition) - trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = - if (Request.isValidBrokerId(fromReplicaId)) - None - else - Some(localReplica.highWatermark.messageOffset) - val fetchInfo = localReplica.log match { - case Some(log) => - log.read(offset, maxSize, maxOffsetOpt) - case None => - error("Leader for partition [%s,%d] does not have a local log".format(topic, partition)) - FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + def readFromLocalLog(fetchOnlyFromLeader: Boolean, + readOnlyCommitted: Boolean, + readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = { + + readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => + val partitionDataAndOffsetInfo = + try { + trace("Fetching log segment for topic %s, partition %d, offset %d, size %d".format(topic, partition, offset, fetchSize)) + + // decide whether to only fetch from leader + val localReplica = if (fetchOnlyFromLeader) + getLeaderReplicaIfLocal(topic, partition) + else + getReplicaOrException(topic, partition) + + // decide whether to only fetch committed data (i.e. messages below high watermark) + val maxOffsetOpt = if (readOnlyCommitted) + Some(localReplica.highWatermark.messageOffset) + else + None + + // read on log; note that here we do not record the fetched message count and size + // since it may be re-read in the future; instead we should only record these metrics + // when the responses are sent + val logReadInfo = localReplica.log match { + case Some(log) => + log.read(offset, fetchSize, maxOffsetOpt) + case None => + error("Leader for partition [%s,%d] does not have a local log".format(topic, partition)) + FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + } + + LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, None) + } catch { + // NOTE: Failed fetch requests metric is not incremented for known exceptions since it + // is supposed to indicate un-expected failure of a broker in handling a fetch request + case utpe: UnknownTopicOrPartitionException => + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(utpe)) + case nle: NotLeaderForPartitionException => + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(nle)) + case rnae: ReplicaNotAvailableException => + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(rnae)) + case e: Throwable => + BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() + error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic, partition, offset)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(e)) + } + (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) } - (fetchInfo, localReplica.highWatermark.messageOffset) } def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) { @@ -557,23 +677,19 @@ class ReplicaManager(val config: KafkaConfig, allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) } - def updateReplicaLEOAndPartitionHW(topic: String, partitionId: Int, replicaId: Int, offset: LogOffsetMetadata) = { - getPartition(topic, partitionId) match { - case Some(partition) => - partition.getReplica(replicaId) match { - case Some(replica) => - replica.logEndOffset = offset - // check if we need to update HW and expand Isr - partition.updateLeaderHWAndMaybeExpandIsr(replicaId) - debug("Recorded follower %d position %d for partition [%s,%d].".format(replicaId, offset.messageOffset, topic, partitionId)) - case None => - throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + - " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, - offset.messageOffset, partition.assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) - - } - case None => - warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId)) + private def updateFollowerLEOs(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { + debug("Recording follower broker %d log end offsets: %s ".format(replicaId, offsets)) + offsets.foreach { case (topicAndPartition, offset) => + getPartition(topicAndPartition.topic, topicAndPartition.partition) match { + case Some(partition) => + partition.updateReplicaLEO(replicaId, offset) + + // for producer requests with ack > 1, we need to check + // if they can be unblocked after some follower's log end offsets have moved + tryCompleteDelayedProduce(new TopicPartitionRequestKey(topicAndPartition)) + case None => + warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicAndPartition)) + } } } @@ -598,10 +714,13 @@ class ReplicaManager(val config: KafkaConfig, } } - def shutdown() { - info("Shut down") + def shutdown(checkpointHW: Boolean = true) { + info("Shutting down") replicaFetcherManager.shutdown() - checkpointHighWatermarks() + fetchRequestPurgatory.shutdown() + producerRequestPurgatory.shutdown() + if (checkpointHW) + checkpointHighWatermarks() info("Shut down completely") } } diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index ce06d2c..645f204 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup @@ -30,265 +29,243 @@ import com.yammer.metrics.core.Gauge /** - * A request whose processing needs to be delayed for at most the given delayMs - * The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied, - * for example a key could be a (topic, partition) pair. + * An operation whose processing needs to be delayed for at most the given delayMs. For example + * a delayed produce operation could be waiting for specified number of acks; or + * a delayed fetch operation could be waiting for a given number of bytes to accumulate. */ -class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { - val satisfied = new AtomicBoolean(false) +abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) { + private val completed = new AtomicBoolean(false) + + /* + * Force completing the delayed operation, this function can be triggered when + * + * 1. The operation has been verified to be completable inside tryComplete() + * 2. The operation has expired and hence need to be completed right now + * + * Return true iff the operation is completed by the caller + */ + def forceComplete(): Boolean = { + if (completed.compareAndSet(false, true)) { + complete() + true + } else { + false + } + } + + /** + * Check if the delayed operation is already completed + */ + def isCompleted(): Boolean = completed.get() + + /** + * Process for completing a operation; this function needs to be defined in subclasses + */ + def complete(): Unit + + /* + * Try to complete the delayed operation by first checking if the operation + * can be completed by now; and if yes execute the completion logic by calling forceComplete() + * + * Note that concurrent threads can check if an operation can be completed or not, + * but only the first thread will succeed in completing the operation and return + * true, others will still return false + * + * this function needs to be defined in subclasses + */ + def tryComplete(): Boolean } /** - * A helper class for dealing with asynchronous requests with a timeout. A DelayedRequest has a request to delay - * and also a list of keys that can trigger the action. Implementations can add customized logic to control what it means for a given - * request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition) - * to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request - * to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting). - * - * For us the key is generally a (topic, partition) pair. - * By calling - * val isSatisfiedByMe = checkAndMaybeWatch(delayedRequest) - * we will check if a request is satisfied already, and if not add the request for watch on all its keys. - * - * It is up to the user to then call - * val satisfied = update(key, request) - * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this - * new request. - * - * An implementation provides extends two helper functions - * def checkSatisfied(request: R, delayed: T): Boolean - * this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed - * request delayed. This method will likely also need to do whatever bookkeeping is necessary. - * - * The second function is - * def expire(delayed: T) - * this function handles delayed requests that have hit their time limit without being satisfied. - * + * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. */ -abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) +class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { /* a list of requests watching each key */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - /* the number of requests being watched, duplicates added on different watchers are also counted */ - private val watched = new AtomicInteger(0) - /* background thread expiring requests that have been waiting too long */ - private val expiredRequestReaper = new ExpiredRequestReaper - private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) + private val expirationReaper = new ExpiredOperationReaper newGauge( "PurgatorySize", new Gauge[Int] { - def value = watched.get() + expiredRequestReaper.numRequests + def value = size() } ) newGauge( - "NumDelayedRequests", + "NumDelayedOperations", new Gauge[Int] { - def value = expiredRequestReaper.unsatisfied.get() + def value = expirationReaper.enqueued } ) - expirationThread.start() + expirationReaper.start() /** - * Try to add the request for watch on all keys. Return true iff the request is - * satisfied and the satisfaction is done by the caller. + * Check if the operation can be completed, if not watch it based on the given watch keys + * + * Note that a delayed operation can be watched on multiple keys. It is possible that + * an operation is completed after it has been added to the watch list for some, but + * not all of the keys. In this case, the operation is considered completed and won't + * be added to the watch list of the remaining keys. The expiration reaper thread will + * remove this operation from any watcher list in which the operation exists. * - * Requests can be watched on only a few of the keys if it is found satisfied when - * trying to add it to each one of the keys. In this case the request is still treated as satisfied - * and hence no longer watched. Those already added elements will be later purged by the expire reaper. + * @param operation the delayed operation to be checked + * @param watchKeys keys for bookkeeping the operation + * @return true iff the delayed operations can be completed */ - def checkAndMaybeWatch(delayedRequest: T): Boolean = { - for(key <- delayedRequest.keys) { - val lst = watchersFor(key) - if(!lst.checkAndMaybeAdd(delayedRequest)) { - if(delayedRequest.satisfied.compareAndSet(false, true)) - return true - else - return false + def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { + for(key <- watchKeys) { + // if the operation is already completed, stopping adding it to + // any further lists and return false + if (operation.isCompleted()) + return false + val watchers = watchersFor(key) + // if the operation can by completed by myself, stop adding it to + // any further lists and return true immediately + if(operation synchronized operation.tryComplete()) { + return true + } else { + watchers.watch(operation) } } - // if it is indeed watched, add to the expire queue also - expiredRequestReaper.enqueue(delayedRequest) + // if it cannot be completed by now and hence is watched, add to the expire queue also + if (! operation.isCompleted()) { + expirationReaper.enqueue(operation) + } false } /** - * Update any watchers and return a list of newly satisfied requests. + * Check if some some delayed requests can be completed with the given watch key, + * and if yes complete them. + * + * @return the number of completed requests during this process */ - def update(key: Any): Seq[T] = { - val w = watchersForKey.get(key) - if(w == null) - Seq.empty + def checkAndComplete(key: Any): Int = { + val watchers = watchersForKey.get(key) + if(watchers == null) + 0 else - w.collectSatisfiedRequests() + watchers.tryCompleteWatched() } - private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) - - /** - * Check if this delayed request is already satisfied + /* + * Return the size of the purgatory, which is size of watch lists plus the size of the expire reaper. + * Since an operation may still be in the watch lists even when it has been completed, this number + * may be larger than the number of real operations watched */ - protected def checkSatisfied(request: T): Boolean + protected def size() = watchersForKey.values.map(_.watched).sum + expirationReaper.enqueued - /** - * Handle an expired delayed request + /* + * Return the watch list of the given key */ - protected def expire(delayed: T) + private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) /** * Shutdown the expire reaper thread */ def shutdown() { - expiredRequestReaper.shutdown() + expirationReaper.shutdown() } /** - * A linked list of DelayedRequests watching some key with some associated - * bookkeeping logic. + * A linked list of watched delayed operations based on some key */ private class Watchers { private val requests = new util.ArrayList[T] - // potentially add the element to watch if it is not satisfied yet - def checkAndMaybeAdd(t: T): Boolean = { + def watched = requests.size() + + // add the element to watch + def watch(t: T) { synchronized { - // if it is already satisfied, do not add to the watch list - if (t.satisfied.get) - return false - // synchronize on the delayed request to avoid any race condition - // with expire and update threads on client-side. - if(t synchronized checkSatisfied(t)) { - return false - } requests.add(t) - watched.getAndIncrement() - return true } } - // traverse the list and purge satisfied elements - def purgeSatisfied(): Int = { + // traverse the list and try to complete some watched elements + def tryCompleteWatched(): Int = { + var completed = 0 synchronized { val iter = requests.iterator() - var purged = 0 while(iter.hasNext) { val curr = iter.next - if(curr.satisfied.get()) { + if (curr.isCompleted()) { + // another thread has completed this request, just remove it iter.remove() - watched.getAndDecrement() - purged += 1 + } else { + if(curr synchronized curr.tryComplete()) { + iter.remove() + completed += 1 + } } } - purged } + completed } - // traverse the list and try to satisfy watched elements - def collectSatisfiedRequests(): Seq[T] = { - val response = new mutable.ArrayBuffer[T] + // traverse the list and purge elements that are already completed by others + def purgeCompleted(): Int = { + var purged = 0 synchronized { val iter = requests.iterator() - while(iter.hasNext) { + while (iter.hasNext) { val curr = iter.next - if(curr.satisfied.get) { - // another thread has satisfied this request, remove it + if(curr.isCompleted()) { iter.remove() - } else { - // synchronize on curr to avoid any race condition with expire - // on client-side. - val satisfied = curr synchronized checkSatisfied(curr) - if(satisfied) { - iter.remove() - watched.getAndDecrement() - val updated = curr.satisfied.compareAndSet(false, true) - if(updated == true) { - response += curr - expiredRequestReaper.satisfyRequest() - } - } + purged += 1 } } } - response + purged } } /** - * Runnable to expire requests that have sat unfullfilled past their deadline + * A background reaper to expire delayed operations that have timed out */ - private class ExpiredRequestReaper extends Runnable with Logging { - this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId) + private class ExpiredOperationReaper extends ShutdownableThread( + "ExpirationReaper-%d".format(brokerId), + false) { + /* The queue storing all delayed operations */ private val delayed = new DelayQueue[T] - private val running = new AtomicBoolean(true) - private val shutdownLatch = new CountDownLatch(1) - - /* The count of elements in the delay queue that are unsatisfied */ - private [kafka] val unsatisfied = new AtomicInteger(0) - - def numRequests = delayed.size() - - /** Main loop for the expiry thread */ - def run() { - while(running.get) { - try { - val curr = pollExpired() - if (curr != null) { - curr synchronized { - expire(curr) - } - } - if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge - debug("Beginning purgatory purge") - val purged = purgeSatisfied() - debug("Purged %d requests from delay queue.".format(purged)) - val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum - debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers)) - } - } catch { - case e: Exception => - error("Error in long poll expiry thread: ", e) - } - } - shutdownLatch.countDown() - } - /** Add a request to be expired */ + /* + * Return the number of delayed operations kept by the reaper + */ + def enqueued = delayed.size() + + /* + * Add a operation to be expired + */ def enqueue(t: T) { delayed.add(t) - unsatisfied.incrementAndGet() - } - - /** Shutdown the expiry thread*/ - def shutdown() { - debug("Shutting down.") - running.set(false) - shutdownLatch.await() - debug("Shut down complete.") } - /** Record the fact that we satisfied a request in the stats for the expiry queue */ - def satisfyRequest(): Unit = unsatisfied.getAndDecrement() - /** - * Get the next expired event + * Try to get the next expired event and force completing it */ - private def pollExpired(): T = { - while(true) { + private def expireNext(): Boolean = { + while (true) { val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) - if (curr == null) - return null.asInstanceOf[T] - val updated = curr.satisfied.compareAndSet(false, true) - if(updated) { - unsatisfied.getAndDecrement() - return curr + if (curr != null.asInstanceOf[T]) { + // if the operation gets successfully completed, return; + // otherwise try to get the next expired operation since + // this one has been completed by others + if (curr synchronized curr.forceComplete()) { + debug("Expired delayed request %s and return the error codes".format(curr)) + return true + } + } else { + // if there are no expired operations yet, return + return false } } throw new RuntimeException("This should not happen") @@ -302,9 +279,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt // purge the delayed queue val iter = delayed.iterator() - while(iter.hasNext) { + while (iter.hasNext) { val curr = iter.next() - if(curr.satisfied.get) { + if (curr.isCompleted()) { iter.remove() purged += 1 } @@ -312,6 +289,18 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt purged } - } + override def doWork() { + // try to get the next expired operation and force completing it + expireNext() + // see if we need to force a full purge + if (size() >= purgeInterval) { + debug("Beginning purgatory purge") + val purged = purgeSatisfied() + debug("Purged %d operations from delay queue.".format(purged)) + val numPurgedFromWatchers = watchersForKey.values.map(_.purgeCompleted()).sum + debug("Purged %d operations from watch lists.".format(numPurgedFromWatchers)) + } + } + } } diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala index d727649..a4e0dab 100644 --- a/core/src/main/scala/kafka/utils/DelayedItem.scala +++ b/core/src/main/scala/kafka/utils/DelayedItem.scala @@ -20,7 +20,7 @@ package kafka.utils import java.util.concurrent._ import scala.math._ -class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed with Logging { +class DelayedItem(delay: Long, unit: TimeUnit) extends Delayed with Logging { val createdMs = SystemTime.milliseconds val delayMs = { @@ -29,8 +29,8 @@ class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed w else given } - def this(item: T, delayMs: Long) = - this(item, delayMs, TimeUnit.MILLISECONDS) + def this(delayMs: Long) = + this(delayMs, TimeUnit.MILLISECONDS) /** * The remaining delay time @@ -41,7 +41,7 @@ class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed w } def compareTo(d: Delayed): Int = { - val delayed = d.asInstanceOf[DelayedItem[T]] + val delayed = d.asInstanceOf[DelayedItem] val myEnd = createdMs + delayMs val yourEnd = delayed.createdMs + delayed.delayMs diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 03a424d..8913fc1 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -74,6 +74,9 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw) EasyMock.verify(zkClient) + + // shutdown the replica manager upon test completion + replicaManager.shutdown(false) } def testHighWatermarkPersistenceMultiplePartitions() { @@ -130,6 +133,10 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(10L, topic1Partition0Hw) EasyMock.verify(zkClient) + + // shutdown the replica manager upon test completion + replicaManager.shutdown(false) + } def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index cd302aa..a703d27 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -36,8 +36,21 @@ class IsrExpirationTest extends JUnit3Suite { }) val topic = "foo" + val time = new MockTime + + var replicaManager: ReplicaManager = null + + override def setUp() { + super.setUp() + replicaManager = new ReplicaManager(configs.head, time, null, null, null, new AtomicBoolean(false)) + } + + override def tearDown() { + replicaManager.shutdown(false) + super.tearDown() + } + def testIsrExpirationForStuckFollowers() { - val time = new MockTime val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L // create one partition and all replicas @@ -61,7 +74,6 @@ class IsrExpirationTest extends JUnit3Suite { } def testIsrExpirationForSlowFollowers() { - val time = new MockTime // create leader replica val log = getLogWithLogEndOffset(15L, 1) // add one partition @@ -82,7 +94,6 @@ class IsrExpirationTest extends JUnit3Suite { private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig, localLog: Log): Partition = { val leaderId=config.brokerId - val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false)) val partition = replicaManager.getOrCreatePartition(topic, partitionId) val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a9c4ddc..faa9071 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -42,6 +42,9 @@ class ReplicaManagerTest extends JUnit3Suite { val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() + + // shutdown the replica manager upon test completion + rm.shutdown(false) } @Test @@ -56,5 +59,8 @@ class ReplicaManagerTest extends JUnit3Suite { val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() + + // shutdown the replica manager upon test completion + rm.shutdown(false) } } diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 168712d..bf7a150 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -17,24 +17,17 @@ package kafka.server -import scala.collection._ import org.junit.Test -import junit.framework.Assert._ -import kafka.message._ -import kafka.api._ -import kafka.utils.TestUtils import org.scalatest.junit.JUnit3Suite - +import junit.framework.Assert._ class RequestPurgatoryTest extends JUnit3Suite { - val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes))) - val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes))) - var purgatory: MockRequestPurgatory = null + var purgatory: RequestPurgatory[MockDelayedRequest] = null override def setUp() { super.setUp() - purgatory = new MockRequestPurgatory() + purgatory = new RequestPurgatory[MockDelayedRequest]() } override def tearDown() { @@ -44,49 +37,56 @@ class RequestPurgatoryTest extends JUnit3Suite { @Test def testRequestSatisfaction() { - val r1 = new DelayedRequest(Array("test1"), null, 100000L) - val r2 = new DelayedRequest(Array("test2"), null, 100000L) - assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) - assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) - assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) - assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) - assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) - purgatory.satisfied += r1 - assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) - assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) - purgatory.satisfied += r2 - assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) - assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) + val r1 = new MockDelayedRequest(100000L) + val r2 = new MockDelayedRequest(100000L) + assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.checkAndComplete("test1")) + assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1"))) + assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test1")) + assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) + assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test2")) + r1.completable = true + assertEquals("r1 satisfied", 1, purgatory.checkAndComplete("test1")) + assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test1")) + r2.completable = true + assertEquals("r2 satisfied", 1, purgatory.checkAndComplete("test2")) + assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test2")) } @Test def testRequestExpiry() { val expiration = 20L - val r1 = new DelayedRequest(Array("test1"), null, expiration) - val r2 = new DelayedRequest(Array("test1"), null, 200000L) + val r1 = new MockDelayedRequest(expiration) + val r2 = new MockDelayedRequest(200000L) val start = System.currentTimeMillis - assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) - assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) - purgatory.awaitExpiration(r1) + assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1"))) + assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) + r1.awaitExpiration() val elapsed = System.currentTimeMillis - start - assertTrue("r1 expired", purgatory.expired.contains(r1)) - assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2)) + assertTrue("r1 completed due to expiration", r1.isCompleted()) + assertFalse("r2 hasn't completed", r2.isCompleted()) assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) } - - class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] { - val satisfied = mutable.Set[DelayedRequest]() - val expired = mutable.Set[DelayedRequest]() - def awaitExpiration(delayed: DelayedRequest) = { - delayed synchronized { - delayed.wait() + + // A mock delayed request that can be completed / expired at will + class MockDelayedRequest(delayMs: Long) extends DelayedRequest(delayMs) { + var completable = false + + def awaitExpiration() { + synchronized { + wait() } } - def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) - def expire(delayed: DelayedRequest) { - expired += delayed - delayed synchronized { - delayed.notify() + + override def tryComplete() = { + if (completable) + forceComplete() + else + false + } + + override def complete() { + synchronized { + notify() } } } diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index ab60e9b..9f1ab35 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -106,7 +106,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val newProps = TestUtils.createBrokerConfig(0, port) newProps.setProperty("delete.topic.enable", "true") val newConfig = new KafkaConfig(newProps) - var server = new KafkaServer(newConfig) + val server = new KafkaServer(newConfig) server.startup() server.shutdown() server.awaitShutdown() diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 09ed8f5..eb59459 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -17,18 +17,21 @@ package kafka.server import kafka.api._ -import kafka.cluster.{Partition, Replica} -import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.utils._ +import kafka.cluster.Replica +import kafka.common.TopicAndPartition import kafka.log.Log import kafka.message.{ByteBufferMessageSet, Message} -import kafka.network.RequestChannel -import kafka.utils.{ZkUtils, Time, TestUtils, MockTime} import scala.Some +import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean +import collection.JavaConversions._ import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ class SimpleFetchTest extends JUnit3Suite { @@ -37,215 +40,102 @@ class SimpleFetchTest extends JUnit3Suite { override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L }) - val topic = "foo" - val partitionId = 0 - /** - * The scenario for this test is that there is one topic, "test-topic", one broker "0" that has - * one partition with one follower replica on broker "1". The leader replica on "0" - * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica - * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync - * but is still in ISR (hasn't yet expired from ISR). - * - * When a normal consumer fetches data, it should only see data up to the HW of the leader, - * in this case up an offset of "5". - */ - def testNonReplicaSeesHwWhenFetching() { - /* setup */ - val time = new MockTime - val leo = 20L - val hw = 5 - val fetchSize = 100 - val messages = new Message("test-message".getBytes()) + // set the replica manager with the partition + val time = new MockTime + val leaderLEO = 20L + val followerLEO = 15L + val partitionHW = 5 - // create nice mock since we don't particularly care about zkclient calls - val zkClient = EasyMock.createNiceMock(classOf[ZkClient]) - EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false) - EasyMock.replay(zkClient) + val fetchSize = 100 + val messagesToHW = new Message("messageToHW".getBytes()) + val messagesToLEO = new Message("messageToLEO".getBytes()) - val log = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() - EasyMock.expect(log) - EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn( - new FetchDataInfo( - new LogOffsetMetadata(0L, 0L, leo.toInt), - new ByteBufferMessageSet(messages) - )).anyTimes() - EasyMock.replay(log) + val topic = "test-topic" + val partitionId = 0 + val topicAndPartition = TopicAndPartition(topic, partitionId) - val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes() - EasyMock.replay(logManager) + val fetchInfo = Collections.singletonMap(topicAndPartition, PartitionFetchInfo(0, fetchSize)).toMap - val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) - EasyMock.expect(replicaManager.config).andReturn(configs.head) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) - EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ - val fetchInfo = log.read(0, fetchSize, Some(hw)) - val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) - Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) - }).anyTimes() - EasyMock.replay(replicaManager) - - val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(configs(1).brokerId).get.logEndOffset = new LogOffsetMetadata(leo - 5L, 0L, leo.toInt - 5) - - EasyMock.reset(replicaManager) - EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() - EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() - EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject())) - EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ - val fetchInfo = log.read(0, fetchSize, Some(hw)) - val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) - Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) - }).anyTimes() - - EasyMock.replay(replicaManager) - - val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) - - val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController]) - - // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary) - // don't provide replica or leader callbacks since they will not be tested here - val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) - - val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) - EasyMock.replay(partitionStateInfo) - // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log - val goodFetch = new FetchRequestBuilder() - .replicaId(Request.OrdinaryConsumerId) - .addFetch(topic, partitionId, 0, fetchSize) - .build() - val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch) - - // send the request - apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeMs=1)) - - // make sure the log only reads bytes between 0->HW (5) - EasyMock.verify(log) - } + var replicaManager: ReplicaManager = null - /** - * The scenario for this test is that there is one topic, "test-topic", on broker "0" that has - * one partition with one follower replica on broker "1". The leader replica on "0" - * has HW of "5" and LEO of "20". The follower on broker "1" has a local replica - * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync - * but is still in ISR (hasn't yet expired from ISR). - * - * When the follower from broker "1" fetches data, it should see data upto the log end offset ("20") - */ - def testReplicaSeesLeoWhenFetching() { - /* setup */ - val time = new MockTime - val leo = 20 - val hw = 5 - - val messages = new Message("test-message".getBytes()) - - val followerReplicaId = configs(1).brokerId - val followerLEO = 15 + override def setUp() { + super.setUp() + // create nice mock since we don't particularly care about zkclient calls val zkClient = EasyMock.createNiceMock(classOf[ZkClient]) - EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false) EasyMock.replay(zkClient) - val log = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes() - EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn( + // create nice mock since we don't particularly care about scheduler calls + val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler]) + EasyMock.replay(scheduler) + + // create the log which takes read with either HW max offset or none max offset + val log = EasyMock.createMock(classOf[Log]) + EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes() + EasyMock.expect(log.read(0, fetchSize, Some(partitionHW))).andReturn( new FetchDataInfo( - new LogOffsetMetadata(followerLEO, 0L, followerLEO), - new ByteBufferMessageSet(messages) + new LogOffsetMetadata(0L, 0L, 0), + new ByteBufferMessageSet(messagesToHW) + )).anyTimes() + EasyMock.expect(log.read(0, fetchSize, None)).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(0L, 0L, 0), + new ByteBufferMessageSet(messagesToLEO) )).anyTimes() EasyMock.replay(log) + // create the log manager that is aware of this mock log val logManager = EasyMock.createMock(classOf[kafka.log.LogManager]) - EasyMock.expect(logManager.getLog(TopicAndPartition(topic, 0))).andReturn(Some(log)).anyTimes() + EasyMock.expect(logManager.getLog(topicAndPartition)).andReturn(Some(log)).anyTimes() EasyMock.replay(logManager) - val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager]) - EasyMock.expect(replicaManager.config).andReturn(configs.head) - EasyMock.expect(replicaManager.logManager).andReturn(logManager) - EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) - EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ - val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) - val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) - Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) - }).anyTimes() - EasyMock.replay(replicaManager) - - val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager) - partition.getReplica(followerReplicaId).get.logEndOffset = new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO) - - EasyMock.reset(replicaManager) - EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes() - EasyMock.expect(replicaManager.updateReplicaLEOAndPartitionHW(topic, partitionId, followerReplicaId, new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO))) - EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId)) - EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() - EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject())) - EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({ - val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None) - val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet) - Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset)) - }).anyTimes() - EasyMock.expect(replicaManager.unblockDelayedProduceRequests(EasyMock.anyObject())).anyTimes() - EasyMock.replay(replicaManager) - - val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager]) - - val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController]) - - val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) - val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) - EasyMock.replay(partitionStateInfo) - - /** - * This fetch, coming from a replica, requests all data at offset "15". Because the request is coming - * from a follower, the leader should oblige and read beyond the HW. - */ - val bigFetch = new FetchRequestBuilder() - .replicaId(followerReplicaId) - .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE) - .build() - - val fetchRequestBB = TestUtils.createRequestByteBuffer(bigFetch) - - // send the request - apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeMs=1)) - - /** - * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after - * an offset of 15 - */ - EasyMock.verify(log) - } + // create the replica manager + replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, logManager, new AtomicBoolean(false)) + + // add the partition with two replicas, both in ISR + val partition = replicaManager.getOrCreatePartition(topic, partitionId) - private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, - localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = { - val partition = new Partition(topic, partitionId, time, replicaManager) - val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) + // create the leader replica with the local log + val leaderReplica = new Replica(configs(0).brokerId, partition, time, 0, Some(log)) + leaderReplica.highWatermark = new LogOffsetMetadata(partitionHW) + partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId) - val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica + // create the follower replica with defined log end offset + val followerReplica= new Replica(configs(1).brokerId, partition, time) + followerReplica.logEndOffset = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) + + // add both of them to ISR + val allReplicas = List(leaderReplica, followerReplica) allReplicas.foreach(partition.addReplicaIfNotExists(_)) - // set in sync replicas for this partition to all the assigned replicas partition.inSyncReplicas = allReplicas.toSet - // set the leader and its hw and the hw update time - partition.leaderReplicaIdOpt = Some(leaderId) - leaderReplica.highWatermark = new LogOffsetMetadata(leaderHW) - partition } - private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = { - configs.filter(_.brokerId != leaderId).map { config => - new Replica(config.brokerId, partition, time) - } + override def tearDown() { + replicaManager.shutdown(false) + super.tearDown() } + /** + * The scenario for this test is that there is one topic that has one partition + * with one leader replica on broker "0" and one follower replica on broker "1" + * inside the replica manager's metadata. + * + * The leader replica on "0" has HW of "5" and LEO of "20". The follower on + * broker "1" has a local replica with a HW matching the leader's ("5") and + * LEO of "15", meaning it's not in-sync but is still in ISR (hasn't yet expired from ISR). + * + * When a fetch operation with read committed data turned on is received, the replica manager + * should only return data up to the HW of the partition; when a fetch operation with read + * committed data turned off is received, the replica manager could return data up to the LEO + * of the local leader replica's log. + */ + def testReadFromLog() { + + assertEquals("", messagesToHW, replicaManager.readFromLocalLog(true, true, fetchInfo) + .get(topicAndPartition).get.info.messageSet.head.message) + + assertEquals("", messagesToLEO, replicaManager.readFromLocalLog(true, false, fetchInfo) + .get(topicAndPartition).get.info.messageSet.head.message) + } } -- 1.7.10.2 (Apple Git-33) From 4e365e09671399ecb4cc2a1c487c092904e4431a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 1 Sep 2014 18:06:39 -0700 Subject: [PATCH 2/3] Incorporated Jun's comments round two --- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 14 ++++----- .../main/scala/kafka/server/ReplicaManager.scala | 4 +-- .../main/scala/kafka/server/RequestPurgatory.scala | 33 ++++++++++---------- .../scala/unit/kafka/server/SimpleFetchTest.scala | 8 ++--- 5 files changed, 30 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1c62afc..1c0a8f1 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -319,7 +319,7 @@ class Partition(val topic: String, * this function can be triggered when * * 1. Partition ISR changed - * 2. Any replica's LEO changed (e.g. leader LEO changed and the ISR is down to 1) + * 2. Any replica's LEO changed * * Note There is no need to acquire the leaderIsrUpdate lock here * since all callers of this private API acquire that lock diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 32696c1..996ce99 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -131,7 +131,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Here we only print warnings for known errors; if it is unknown, it will cause // an error message in the replica manager already and hence can be ignored here if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) { - warn("Offset commit request with correlation id %d from client %s on partition %s failed due to %s" + debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s" .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(errorCode))) } @@ -163,7 +163,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Here we only print warnings for known errors; if it is unknown, it will cause // an error message in the replica manager if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) { - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s" + debug("Produce request with correlation id %d from client %s on partition %s failed due to %s" .format(produceRequest.correlationId, produceRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(status.error))) errorInResponse = true @@ -207,7 +207,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Here we only print warnings for known errors; if it is unknown, it will cause // an error message in the replica manager already and hence can be ignored here if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) { - warn("Fetch request with correlation id %d from client %s on partition %s failed due to %s" + debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s" .format(fetchRequest.correlationId, fetchRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(data.error))) } @@ -231,7 +231,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Handle a offset request request + * Handle an offset request */ def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] @@ -263,15 +263,15 @@ class KafkaApis(val requestChannel: RequestChannel, // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages // are typically transient and there is no value in logging the entire stack trace for the same case utpe: UnknownTopicOrPartitionException => - warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage)) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) ) case nle: NotLeaderForPartitionException => - warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( + debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage)) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) case e: Throwable => - warn("Error while responding to offset request", e) + error("Error while responding to offset request", e) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) } }) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 085f076..016e90a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -396,9 +396,7 @@ class ReplicaManager(val config: KafkaConfig, else None - // read on log; note that here we do not record the fetched message count and size - // since it may be re-read in the future; instead we should only record these metrics - // when the responses are sent + // read on log val logReadInfo = localReplica.log match { case Some(log) => log.read(offset, fetchSize, maxOffsetOpt) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 645f204..b208eec 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -40,7 +40,7 @@ abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) { * Force completing the delayed operation, this function can be triggered when * * 1. The operation has been verified to be completable inside tryComplete() - * 2. The operation has expired and hence need to be completed right now + * 2. The operation has expired and hence needs to be completed right now * * Return true iff the operation is completed by the caller */ @@ -59,7 +59,8 @@ abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) { def isCompleted(): Boolean = completed.get() /** - * Process for completing a operation; this function needs to be defined in subclasses + * Process for completing an operation; this function needs to be defined in subclasses + * and should only be called once in forceComplete() */ def complete(): Unit @@ -98,7 +99,7 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In newGauge( "NumDelayedOperations", new Gauge[Int] { - def value = expirationReaper.enqueued + def value = delayed() } ) @@ -156,11 +157,16 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In } /* - * Return the size of the purgatory, which is size of watch lists plus the size of the expire reaper. - * Since an operation may still be in the watch lists even when it has been completed, this number - * may be larger than the number of real operations watched + * Return the size of the purgatory, which is the size of watch lists. + * Since an operation may still be in the watch lists even when it has been + * completed, this number may be larger than the number of real operations watched */ - protected def size() = watchersForKey.values.map(_.watched).sum + expirationReaper.enqueued + protected def size() = watchersForKey.values.map(_.watched).sum + + /* + * Return the number of delayed operations kept by the reaper + */ + protected def delayed() = expirationReaper.delayed.size() /* * Return the watch list of the given key @@ -235,12 +241,7 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In false) { /* The queue storing all delayed operations */ - private val delayed = new DelayQueue[T] - - /* - * Return the number of delayed operations kept by the reaper - */ - def enqueued = delayed.size() + val delayed = new DelayQueue[T] /* * Add a operation to be expired @@ -252,7 +253,7 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In /** * Try to get the next expired event and force completing it */ - private def expireNext(): Boolean = { + private def expireNext() { while (true) { val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) if (curr != null.asInstanceOf[T]) { @@ -261,11 +262,11 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In // this one has been completed by others if (curr synchronized curr.forceComplete()) { debug("Expired delayed request %s and return the error codes".format(curr)) - return true + return } } else { // if there are no expired operations yet, return - return false + return } } throw new RuntimeException("This should not happen") diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index eb59459..ccf5e2e 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -132,10 +132,10 @@ class SimpleFetchTest extends JUnit3Suite { */ def testReadFromLog() { - assertEquals("", messagesToHW, replicaManager.readFromLocalLog(true, true, fetchInfo) - .get(topicAndPartition).get.info.messageSet.head.message) + assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW, + replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) - assertEquals("", messagesToLEO, replicaManager.readFromLocalLog(true, false, fetchInfo) - .get(topicAndPartition).get.info.messageSet.head.message) + assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO, + replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) } } -- 1.7.10.2 (Apple Git-33) From 6a9dea674bc16d570989a023b6aec4aedc44292c Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 2 Sep 2014 13:33:24 -0700 Subject: [PATCH 3/3] Incorporate Jun's comments round three --- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- .../main/scala/kafka/server/RequestPurgatory.scala | 24 +++++++------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1c0a8f1..61cca9e 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -279,7 +279,7 @@ class Partition(val topic: String, replicaManager.isrExpandRate.mark() } // check if the HW of the partition can now be incremented - // since the replica maybe now in the ISR and its LEO has just incremented + // since the replica may now be in the ISR and its LEO has just incremented maybeIncrementLeaderHW(leaderReplica) case None => // nothing to do if no longer leader diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index b208eec..69457da 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -37,7 +37,8 @@ abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) { private val completed = new AtomicBoolean(false) /* - * Force completing the delayed operation, this function can be triggered when + * Force completing the delayed operation, if not already completed. + * This function can be triggered when * * 1. The operation has been verified to be completable inside tryComplete() * 2. The operation has expired and hence needs to be completed right now @@ -60,7 +61,7 @@ abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) { /** * Process for completing an operation; this function needs to be defined in subclasses - * and should only be called once in forceComplete() + * and will be called exactly once in forceComplete() */ def complete(): Unit @@ -254,22 +255,13 @@ class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: In * Try to get the next expired event and force completing it */ private def expireNext() { - while (true) { - val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) - if (curr != null.asInstanceOf[T]) { - // if the operation gets successfully completed, return; - // otherwise try to get the next expired operation since - // this one has been completed by others - if (curr synchronized curr.forceComplete()) { - debug("Expired delayed request %s and return the error codes".format(curr)) - return - } - } else { - // if there are no expired operations yet, return - return + val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) + if (curr != null.asInstanceOf[T]) { + // try to complete the expired operation if it is not completed yet + if (curr synchronized curr.forceComplete()) { + debug("Expired delayed request %s".format(curr)) } } - throw new RuntimeException("This should not happen") } /** -- 1.7.10.2 (Apple Git-33)