From e5e9cd77d31c3a763fbcce0ebe7b7626c6e0007f Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Wed, 6 Feb 2013 02:55:23 -0800 Subject: [PATCH] Network layer cleanup --- .../main/scala/kafka/network/RequestChannel.scala | 83 +++----------- .../main/scala/kafka/network/SocketServer.scala | 7 +- core/src/main/scala/kafka/server/KafkaApis.scala | 120 ++++++++++++-------- .../scala/kafka/server/KafkaRequestHandler.scala | 34 +++++- .../main/scala/kafka/server/RequestPurgatory.scala | 5 +- .../unit/kafka/integration/TopicMetadataTest.scala | 2 +- .../unit/kafka/network/SocketServerTest.scala | 2 +- .../scala/unit/kafka/server/SimpleFetchTest.scala | 20 ++-- 8 files changed, 139 insertions(+), 134 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7747ddd..5e1daf3 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,66 +26,19 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ +import collection.{mutable, BitSet} object RequestChannel extends Logging { - val AllDone = new Request(1, 2, getShutdownReceive(), 0) - - def getShutdownReceive() = { - val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, ByteBufferMessageSet]()) - val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2) - byteBuffer.putShort(RequestKeys.ProduceKey) - emptyProducerRequest.writeTo(byteBuffer) - byteBuffer.rewind() - byteBuffer - } + val AllDone = new Request(1, 2, null, 0) - case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) { - @volatile var dequeueTimeMs = -1L - @volatile var apiLocalCompleteTimeMs = -1L - @volatile var responseCompleteTimeMs = -1L - val requestId = buffer.getShort() - val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) - buffer.rewind() - trace("Received request : %s".format(requestObj)) - - def updateRequestMetrics() { - val endTimeMs = SystemTime.milliseconds - // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes since the remote - // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs. - if (apiLocalCompleteTimeMs < 0) - apiLocalCompleteTimeMs = responseCompleteTimeMs - val queueTime = (dequeueTimeMs - startTimeMs).max(0L) - val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L) - val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L) - val responseSendTime = (endTimeMs - responseCompleteTimeMs).max(0L) - val totalTime = endTimeMs - startTimeMs - var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId))) - if (requestId == RequestKeys.FetchKey) { - val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower - metricsList ::= ( if (isFromFollower) - RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName) - else - RequestMetrics.metricsMap(RequestMetrics.consumerFetchMetricName) ) - } - metricsList.foreach{ - m => m.requestRate.mark() - m.queueTimeHist.update(queueTime) - m.localTimeHist.update(apiLocalTime) - m.remoteTimeHist.update(apiRemoteTime) - m.responseSendTimeHist.update(responseSendTime) - m.totalTimeHist.update(totalTime) - } - trace("Completed request : %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d" - .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) - } - } + case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) - case class Response(processor: Int, request: Request, responseSend: Send) { - request.responseCompleteTimeMs = SystemTime.milliseconds + case class Response(processor: Int, responseSend: Send, requestStartTime: Long, requestKey: Any) { + val responseCompleteTimeMs = SystemTime.milliseconds def this(request: Request, send: Send) = - this(request.processor, request, send) + this(request.processor, send, request.startTimeMs, request.requestKey) } } @@ -132,24 +85,14 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } } -object RequestMetrics { - val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics] - val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer" - val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower" - (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1) - ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) -} - -class RequestMetrics(name: String) extends KafkaMetricsGroup { - val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS) +object NetworkMetrics extends KafkaMetricsGroup{ // time a request spent in a request queue - val queueTimeHist = newHistogram(name + "-QueueTimeMs") - // time a request takes to be processed at the local broker - val localTimeHist = newHistogram(name + "-LocalTimeMs") - // time a request takes to wait on remote brokers (only relevant to fetch and produce requests) - val remoteTimeHist = newHistogram(name + "-RemoteTimeMs") + val queueTimeHist = newHistogram("QueueTimeMs") + // time to send the response to the requester - val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs") - val totalTimeHist = newHistogram(name + "-TotalTimeMs") + val responseSendTimeHist = newHistogram("ResponseSendTimeMs") + + // total time taken to process the request + val totalTimeHist = newHistogram("TotalTimeMs") } diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 8f0053a..9504cf8 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -260,7 +260,7 @@ private[kafka] class Processor(val id: Int, var curr = requestChannel.receiveResponse(id) while(curr != null) { trace("Socket server received response to send, registering for write: " + curr) - val key = curr.request.requestKey.asInstanceOf[SelectionKey] + val key = curr.requestKey.asInstanceOf[SelectionKey] try { key.interestOps(SelectionKey.OP_WRITE) key.attach(curr) @@ -342,7 +342,10 @@ private[kafka] class Processor(val id: Int, val written = responseSend.writeTo(socketChannel) trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress()) if(responseSend.complete) { - response.request.updateRequestMetrics() + val endTime = SystemTime.milliseconds + NetworkMetrics.responseSendTimeHist.update(endTime - response.responseCompleteTimeMs) + NetworkMetrics.totalTimeHist.update(endTime - response.requestStartTime) + // TODO log times here key.attach(null) key.interestOps(SelectionKey.OP_READ) } else { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6df077b..6d0baef 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -53,29 +53,35 @@ class KafkaApis(val requestChannel: RequestChannel, * Top-level method that handles all requests and multiplexes to the right api */ def handle(request: RequestChannel.Request) { + val handleStartTime = SystemTime.milliseconds + val requestId = request.buffer.getShort() + val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(request.buffer) try{ if(requestLogger.isTraceEnabled) - requestLogger.trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) - request.requestId match { - case RequestKeys.ProduceKey => handleProducerRequest(request) - case RequestKeys.FetchKey => handleFetchRequest(request) - case RequestKeys.OffsetsKey => handleOffsetRequest(request) - case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) - case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) - case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) + requestLogger.trace("Handling request: " + requestObj + " from client: " + request.remoteAddress) + requestObj.requestId.get match { + case RequestKeys.ProduceKey => handleProducerRequest(requestObj, request) + case RequestKeys.FetchKey => handleFetchRequest(requestObj, request) + case RequestKeys.OffsetsKey => handleOffsetRequest(requestObj, request) + case RequestKeys.MetadataKey => handleTopicMetadataRequest(requestObj, request) + case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(requestObj, request) + case RequestKeys.StopReplicaKey => handleStopReplicaRequest(requestObj, request) case requestId => throw new KafkaException("No mapping found for handler id " + requestId) } } catch { case e: Throwable => - request.requestObj.handleError(e, requestChannel, request) - error("error when handling request %s".format(request.requestObj), e) - } finally - request.apiLocalCompleteTimeMs = SystemTime.milliseconds + requestObj.handleError(e, requestChannel, request) + error("error when handling request %s".format(requestObj), e) + } finally { + val localTime = SystemTime.milliseconds - handleStartTime + // TODO log localtime + RequestMetrics.getMetricsList(requestObj).foreach(m => m.localTimeHist.update(localTime)) + } } - def handleLeaderAndIsrRequest(request: RequestChannel.Request) { - val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] + def handleLeaderAndIsrRequest(requestObj: RequestOrResponse, request: RequestChannel.Request) { try { + val leaderAndIsrRequest = requestObj.asInstanceOf[LeaderAndIsrRequest] val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) @@ -87,8 +93,8 @@ class KafkaApis(val requestChannel: RequestChannel, } - def handleStopReplicaRequest(request: RequestChannel.Request) { - val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] + def handleStopReplicaRequest(requestObj: RequestOrResponse, request: RequestChannel.Request) { + val stopReplicaRequest = requestObj.asInstanceOf[StopReplicaRequest] val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) @@ -99,30 +105,34 @@ class KafkaApis(val requestChannel: RequestChannel, * Check if a partitionData from a produce request can unblock any * DelayedFetch requests. */ - def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messages: MessageSet) { - val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messages) + def maybeUnblockDelayedFetchRequests(topic: String, partition: Int, messageSizeInBytes: Int) { + val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), messageSizeInBytes) trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) // send any newly unblocked responses for(fetchReq <- satisfied) { val topicData = readMessageSets(fetchReq.fetch) val response = FetchResponse(fetchReq.fetch.correlationId, topicData) - requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) + val remoteTime = SystemTime.milliseconds - fetchReq.createdMs + // TODO log remote time + RequestMetrics.getMetricsList(fetchReq.fetch).foreach(m => m.remoteTimeHist.update(remoteTime)) + val request = fetchReq.item + requestChannel.sendResponse(new RequestChannel.Response(request.processor, new FetchResponseSend(response), request.startTime, request.requestKey)) } } /** * Handle a produce request */ - def handleProducerRequest(request: RequestChannel.Request) { - val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] + def handleProducerRequest(requestObj: RequestOrResponse, request: RequestChannel.Request) { + val produceRequest = requestObj.asInstanceOf[ProducerRequest] val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(produceRequest) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) val numPartitionsInError = localProduceResults.count(_.error.isDefined) produceRequest.data.foreach(partitionAndData => - maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2)) + maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._1.partition, partitionAndData._2.sizeInBytes)) val allPartitionHaveReplicationFactorOne = !produceRequest.data.keySet.exists( @@ -140,11 +150,14 @@ class KafkaApis(val requestChannel: RequestChannel, val producerRequestKeys = produceRequest.data.keys.map( topicAndPartition => new RequestKey(topicAndPartition)).toSeq val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.end + 1)).toMap - val delayedProduce = new DelayedProduce(producerRequestKeys, - request, + val delayedProduce = new DelayedProduce(producerRequestKeys, statuses, - produceRequest, - produceRequest.ackTimeoutMs.toLong) + produceRequest.ackTimeoutMs.toLong, + produceRequest.data.map(r => r._1 -> r._2.sizeInBytes).toMap, + new TrimmedDelayRequest(request.startTimeMs, request.processor, request.requestKey), + produceRequest.correlationId, + produceRequest.requiredAcks + ) producerRequestPurgatory.watch(delayedProduce) /* @@ -209,8 +222,8 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Handle a fetch request */ - def handleFetchRequest(request: RequestChannel.Request) { - val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] + def handleFetchRequest(requestObj: RequestOrResponse, request: RequestChannel.Request) { + val fetchRequest = requestObj.asInstanceOf[FetchRequest] if(fetchRequest.isFromFollower) { maybeUpdatePartitionHw(fetchRequest) // after updating HW, some delayed produce requests may be unblocked @@ -238,7 +251,11 @@ class KafkaApis(val requestChannel: RequestChannel, debug("Putting fetch request into purgatory") // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable) + val delayedFetch = new DelayedFetch(delayedFetchKeys, + new TrimmedDelayRequest(request.startTimeMs, request.processor, request.requestKey), + fetchRequest, + fetchRequest.maxWait, + bytesReadable) fetchRequestPurgatory.watch(delayedFetch) } } @@ -315,8 +332,8 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Service the offset request API */ - def handleOffsetRequest(request: RequestChannel.Request) { - val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] + def handleOffsetRequest(requestObj: RequestOrResponse, request: RequestChannel.Request) { + val offsetRequest = requestObj.asInstanceOf[OffsetRequest] val responseMap = offsetRequest.requestInfo.map(elem => { val (topicAndPartition, partitionOffsetRequestInfo) = elem try { @@ -351,8 +368,8 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Service the topic metadata request API */ - def handleTopicMetadataRequest(request: RequestChannel.Request) { - val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] + def handleTopicMetadataRequest(requestObj: RequestOrResponse, request: RequestChannel.Request) { + val metadataRequest = requestObj.asInstanceOf[TopicMetadataRequest] val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() val config = replicaManager.config val uniqueTopics = { @@ -426,7 +443,7 @@ class KafkaApis(val requestChannel: RequestChannel, /** * A delayed fetch request */ - class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) + class DelayedFetch(keys: Seq[RequestKey], request: TrimmedDelayRequest, val fetch: FetchRequest, delayMs: Long, initialSize: Long) extends DelayedRequest(keys, request, delayMs) { val bytesAccumulated = new AtomicLong(initialSize) } @@ -435,14 +452,14 @@ class KafkaApis(val requestChannel: RequestChannel, * A holding pen for fetch requests waiting to be satisfied */ class FetchRequestPurgatory(requestChannel: RequestChannel, purgeInterval: Int) - extends RequestPurgatory[DelayedFetch, MessageSet](brokerId, purgeInterval) { + extends RequestPurgatory[DelayedFetch, Int](brokerId, purgeInterval) { this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId) /** * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field */ - def checkSatisfied(messages: MessageSet, delayedFetch: DelayedFetch): Boolean = { - val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messages.sizeInBytes) + def checkSatisfied(messageSizeInBytes: Int, delayedFetch: DelayedFetch): Boolean = { + val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageSizeInBytes) accumulatedSize >= delayedFetch.fetch.minBytes } @@ -456,7 +473,11 @@ class KafkaApis(val requestChannel: RequestChannel, val response = FetchResponse(delayed.fetch.correlationId, topicData) val fromFollower = delayed.fetch.isFromFollower delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) - requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) + val remoteTime = SystemTime.milliseconds - delayed.createdMs + RequestMetrics.getMetricsList(delayed.fetch).foreach(m => m.remoteTimeHist.update(remoteTime)) + // TODO log remote time + val request = delayed.item + requestChannel.sendResponse(new RequestChannel.Response(request.processor, new FetchResponseSend(response), request.startTime, request.requestKey)) } catch { case e1: LeaderNotAvailableException => @@ -468,10 +489,13 @@ class KafkaApis(val requestChannel: RequestChannel, } class DelayedProduce(keys: Seq[RequestKey], - request: RequestChannel.Request, initialErrorsAndOffsets: Map[TopicAndPartition, ProducerResponseStatus], - val produce: ProducerRequest, - delayMs: Long) + delayMs: Long, + topicPartitionMessageSizeMap: Map[TopicAndPartition, Int], + request: TrimmedDelayRequest, + correlationId: Int, + requiredAcks: Short + ) extends DelayedRequest(keys, request, delayMs) with Logging { /** @@ -503,10 +527,12 @@ class KafkaApis(val requestChannel: RequestChannel, (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset)) }) - val response = ProducerResponse(produce.correlationId, finalErrorsAndOffsets) - + val response = ProducerResponse(correlationId, finalErrorsAndOffsets) + val remoteTime = SystemTime.milliseconds - createdMs + RequestMetrics.metricsMap(RequestKeys.nameForKey(RequestKeys.ProduceKey)).remoteTimeHist.update(remoteTime) + // TODO log remote time requestChannel.sendResponse(new RequestChannel.Response( - request, new BoundedByteBufferSend(response))) + request.processor, new BoundedByteBufferSend(response), request.startTime, request.requestKey)) } /** @@ -531,7 +557,7 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionOpt = replicaManager.getPartition(topic, partitionId) val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => - partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, produce.requiredAcks) + partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset, requiredAcks) case None => (false, ErrorMapping.UnknownTopicOrPartitionCode) } @@ -543,8 +569,8 @@ class KafkaApis(val requestChannel: RequestChannel, fetchPartitionStatus.error = ErrorMapping.NoError } if (!fetchPartitionStatus.acksPending) { - val messages = produce.data(followerFetchRequestKey.topicAndPartition) - maybeUnblockDelayedFetchRequests(topic, partitionId, messages) + val messageSizeInBytes = topicPartitionMessageSizeMap(followerFetchRequestKey.topicAndPartition) + maybeUnblockDelayedFetchRequests(topic, partitionId, messageSizeInBytes) } } diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 69ca058..9f9856e 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -21,6 +21,8 @@ import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit +import kafka.api.{FetchRequest, RequestOrResponse, RequestKeys} +import scala.collection._ /** * A thread that answers kafka requests. @@ -36,7 +38,8 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha trace("receives shut down command, shut down".format(brokerId, id)) return } - req.dequeueTimeMs = SystemTime.milliseconds + + NetworkMetrics.queueTimeHist.update(SystemTime.milliseconds) debug("handles request " + req) apis.handle(req) } catch { @@ -90,3 +93,32 @@ object BrokerTopicStats extends Logging { stats.getAndMaybePut(topic + "-") } } + +object RequestMetrics +{ + val metricsMap = new mutable.HashMap[String, RequestMetrics] + val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer" + val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower" + (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1) + ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) + + def getMetricsList(request: RequestOrResponse): List[RequestMetrics] = { + var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(request.requestId.get))) + if (request.requestId == RequestKeys.FetchKey) { + val isFromFollower = request.asInstanceOf[FetchRequest].isFromFollower + metricsList ::= ( if (isFromFollower) + RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName) + else + RequestMetrics.metricsMap(RequestMetrics.consumerFetchMetricName) ) + } + metricsList + } +} + +class RequestMetrics(name: String) extends KafkaMetricsGroup { + val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS) + // time a request takes to be processed at the local broker + val localTimeHist = newHistogram(name + "-LocalTimeMs") + // time a request takes to wait on remote brokers (only relevant to fetch and produce requests) + val remoteTimeHist = newHistogram(name + "-RemoteTimeMs") +} diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index afe9e22..843e1f4 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -20,7 +20,6 @@ package kafka.server import scala.collection._ import java.util.concurrent._ import java.util.concurrent.atomic._ -import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup import java.util @@ -32,7 +31,7 @@ import com.yammer.metrics.core.Gauge * The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied, * for example a key could be a (topic, partition) pair. */ -class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { +class DelayedRequest(val keys: Seq[Any], request: TrimmedDelayRequest, delayMs: Long) extends DelayedItem[TrimmedDelayRequest](request, delayMs) { val satisfied = new AtomicBoolean(false) } @@ -283,3 +282,5 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge } } + +case class TrimmedDelayRequest(startTime: Long, processor: Int, requestKey: Any) diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 6db63ba..16181b1 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -129,7 +129,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1) // call the API (to be tested) to get metadata - apis.handleTopicMetadataRequest(new RequestChannel.Request + apis.handleTopicMetadataRequest(request, new RequestChannel.Request (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeMs=1)) val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 7395cbc..3a69b0c 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -62,7 +62,7 @@ class SocketServerTest extends JUnitSuite { val request = channel.receiveRequest val id = request.buffer.getShort val send = new BoundedByteBufferSend(request.buffer.slice) - channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) + channel.sendResponse(new RequestChannel.Response(request, send)) } def connect() = new Socket("localhost", server.port) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 1557047..3b214fa 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -98,7 +98,7 @@ class SimpleFetchTest extends JUnit3Suite { val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch) // send the request - apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeMs=1)) + apis.handleFetchRequest(goodFetch, new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeMs=1)) // make sure the log only reads bytes between 0->HW (5) EasyMock.verify(log) @@ -119,10 +119,10 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.replay(replicaManager) EasyMock.replay(logManager) - apis.handleOffsetRequest(new RequestChannel.Request(processor = 0, - requestKey = 5, - buffer = offsetRequestBB, - startTimeMs = 1)) + apis.handleOffsetRequest(offsetRequest, new RequestChannel.Request(processor = 0, + requestKey = 5, + buffer = offsetRequestBB, + startTimeMs = 1)) val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer) EasyMock.verify(replicaManager) @@ -196,7 +196,7 @@ class SimpleFetchTest extends JUnit3Suite { val fetchRequestBB = TestUtils.createRequestByteBuffer(bigFetch) // send the request - apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeMs=1)) + apis.handleFetchRequest(bigFetch, new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeMs=1)) /** * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after @@ -221,10 +221,10 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.replay(replicaManager) EasyMock.replay(logManager) - apis.handleOffsetRequest(new RequestChannel.Request(processor = 1, - requestKey = 5, - buffer = offsetRequestBB, - startTimeMs = 1)) + apis.handleOffsetRequest(offsetRequest, new RequestChannel.Request(processor = 1, + requestKey = 5, + buffer = offsetRequestBB, + startTimeMs = 1)) val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer) EasyMock.verify(replicaManager) -- 1.7.10.2 (Apple Git-33)