Index: system_test/metrics.json =================================================================== --- system_test/metrics.json (revision 1383739) +++ system_test/metrics.json (working copy) @@ -4,136 +4,118 @@ "role": "broker", "graphs": [ { - "graph_name": "SocketServerThroughput", - "y_label": "bytes-read-per-second,bytes-written-per-second", - "bean_name": "kafka:type=kafka.SocketServerStats", - "attributes": "BytesReadPerSecond,BytesWrittenPerSecond" + "graph_name": "Produce-Request-Rate", + "y_label": "requests-per-sec", + "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RequestsPerSec", + "attributes": "OneMinuteRate" }, { - "graph_name": "FetchRequestPurgatoryNumDelayedRequests", - "y_label": "num-delayed-requests", - "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests", - "attributes": "Value" + "graph_name": "Produce-Request-Time", + "y_label": "ns,ns", + "bean_name": "kafka.network:type=RequestMetrics,name=Produce-TotalTimeNs", + "attributes": "Mean,99thPercentile" }, { - "graph_name": "MeanFetchRequestPurgatorySatisfactionRate", - "y_label": "mean-request-satisfaction-rate", - "bean_name": "kafka.server:type=FetchRequestPurgatory,name=SatisfactionRate", - "attributes": "MeanRate" + "graph_name": "Produce-Request-Remote-Time", + "y_label": "ns,ns", + "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RemoteTimeNs", + "attributes": "Mean,99thPercentile" }, { - "graph_name": "FetchRequestPurgatoryTimeToSatisfy", - "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns", - "bean_name": "kafka.server:type=FetchRequestPurgatory,name=TimeToSatisfyInNs", - "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile" + "graph_name": "Fetch-Consumer-Request-Rate", + "y_label": "requests-per-sec", + "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RequestsPerSec", + "attributes": "OneMinuteRate" }, { - "graph_name": "FetchRequestPurgatoryExpirationRate", - "y_label": "expiration-rate", - "bean_name": "kafka.server:type=FetchRequestPurgatory,name=ExpirationRate", - "attributes": "MeanRate" + "graph_name": "Fetch-Consumer-Request-Time", + "y_label": "ns,ns", + "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-TotalTimeNs", + "attributes": "Mean,99thPercentile" }, - { - "graph_name": "ProducerRequestPurgatoryNumDelayedRequests", - "y_label": "num-delayed-requests", - "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests", - "attributes": "Value" + { + "graph_name": "Fetch-Consumer-Request-Remote-Time", + "y_label": "ns,ns", + "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RemoteTimeNs", + "attributes": "Mean,99thPercentile" }, - { - "graph_name": "MeanProducerRequestPurgatorySatisfactionRate", - "y_label": "mean-request-satisfaction-rate", - "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=SatisfactionRate", - "attributes": "MeanRate" + { + "graph_name": "Fetch-Follower-Request-Rate", + "y_label": "requests-per-sec", + "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RequestsPerSec", + "attributes": "OneMinuteRate" }, - { - "graph_name": "ProducerRequestPurgatoryExpirationRate", - "y_label": "expiration-rate", - "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=ExpirationRate", - "attributes": "MeanRate" + { + "graph_name": "Fetch-Follower-Request-Time", + "y_label": "ns,ns", + "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-TotalTimeNs", + "attributes": "Mean,99thPercentile" }, - { - "graph_name": "DelayedProducerRequests-CaughtUpFollowerFetchRequestsPerSecond", - "y_label": "mean-caught-up-follower-fetch-requests-per-second", - "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=CaughtUpFollowerRequestsPerSecond-all", - "attributes": "MeanRate" + { + "graph_name": "Fetch-Follower-Request-Remote-Time", + "y_label": "ns,ns", + "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RemoteTimeNs", + "attributes": "Mean,99thPercentile" }, - { - "graph_name": "DelayedProducerRequests-ExpiredRequestRate", - "y_label": "mean-expired-request-rate", - "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=ExpiredRequestsPerSecond-all", - "attributes": "MeanRate" + { + "graph_name": "ProducePurgatoryExpirationRate", + "y_label": "expirations-per-sec", + "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=AllExpiresPerSecond", + "attributes": "OneMinuteRate" }, - { - "graph_name": "DelayedProducerRequests-FollowerCatchUpLatency", - "y_label": "mean-follower-catchup-time-ns,95th-percentile-follower-catchup-time-ns,99th-percentile-follower-catchup-time-ns,999th-percentile-follower-catchup-time-ns", - "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=FollowerCatchUpTimeInNs", - "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile" + { + "graph_name": "FetchConsumerPurgatoryExpirationRate", + "y_label": "expirations-per-sec", + "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=ConsumerExpiresPerSecond", + "attributes": "OneMinuteRate" }, { - "graph_name": "DelayedProducerRequests-SatisfactionTimeInNs", - "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns", - "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfactionTimeInNs", - "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile" + "graph_name": "FetchFollowerPurgatoryExpirationRate", + "y_label": "expirations-per-sec", + "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=FollowerExpiresPerSecond", + "attributes": "OneMinuteRate" }, { - "graph_name": "DelayedProducerRequests-SatisfiedRequestsPerSecond", - "y_label": "mean-satisfaction-requests-per-second", - "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfiedRequestsPerSecond", - "attributes": "MeanRate" + "graph_name": "ProducePurgatoryQueueSize", + "y_label": "size", + "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests", + "attributes": "Value" }, { - "graph_name": "DelayedProducerRequests-Throughput-all", - "y_label": "mean-purgatory-throughput-all", - "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=Throughput-all", - "attributes": "MeanRate" + "graph_name": "FetchPurgatoryQueueSize", + "y_label": "size", + "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests", + "attributes": "Value" }, { - "graph_name": "DelayedFetchRequests-Follower-ExpiredRequestRate", - "y_label": "mean-expired-request-rate", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-ExpiredRequestsPerSecond", - "attributes": "MeanRate" + "graph_name": "ControllerLeaderElectionRateAndTime", + "y_label": "elections-per-sec,ms,ms", + "bean_name": "kafka.server:type=ControllerStat,name=LeaderElectionRateAndTimeMs", + "attributes": "OneMinuteRate,Mean,99thPercentile" }, { - "graph_name": "DelayedFetchRequests-Follower-SatisfactionTimeInNs", - "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfactionTimeInNs", - "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile" + "graph_name": "LogFlushRateAndTime", + "y_label": "flushes-per-sec,ms,ms", + "bean_name": "kafka.message:type=LogFlushStats,name=LogFlushRateAndTimeMs", + "attributes": "OneMinuteRate,Mean,99thPercentile" }, { - "graph_name": "DelayedProducerRequests-Follower-SatisfiedRequestsPerSecond", - "y_label": "mean-satisfaction-requests-per-second", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfiedRequestsPerSecond", - "attributes": "MeanRate" + "graph_name": "AllBytesOutRate", + "y_label": "bytes-per-sec", + "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesOutPerSec", + "attributes": "OneMinuteRate" }, { - "graph_name": "DelayedFetchRequests-Follower-Throughput-all", - "y_label": "mean-purgatory-throughput-all", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-Throughput-all", - "attributes": "MeanRate" + "graph_name": "AllBytesInRate", + "y_label": "bytes-per-sec", + "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec", + "attributes": "OneMinuteRate" }, { - "graph_name": "DelayedFetchRequests-NonFollower-ExpiredRequestRate", - "y_label": "mean-expired-request-rate", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-ExpiredRequestsPerSecond", - "attributes": "MeanRate" - }, - { - "graph_name": "DelayedFetchRequests-NonFollower-SatisfactionTimeInNs", - "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfactionTimeInNs", - "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile" - }, - { - "graph_name": "DelayedFetchRequests-NonFollower-SatisfiedRequestsPerSecond", - "y_label": "mean-satisfaction-requests-per-second", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfiedRequestsPerSecond", - "attributes": "MeanRate" - }, - { - "graph_name": "DelayedFetchRequests-NonFollower-Throughput-all", - "y_label": "mean-purgatory-throughput-all", - "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-Throughput-all", - "attributes": "MeanRate" + "graph_name": "AllMessagesInRate", + "y_label": "messages-per-sec", + "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsMessagesInPerSec", + "attributes": "OneMinuteRate" } ] }, @@ -141,10 +123,16 @@ "role": "producer_performance", "graphs": [ { - "graph_name": "ProducerStats", - "y_label": "avg-producer-latency-ms,max-producer-latency-ms,produce-request-throughput", - "bean_name": "kafka:type=kafka.KafkaProducerStats", - "attributes": "AvgProduceRequestMs,MaxProduceRequestMs,ProduceRequestsPerSecond" + "graph_name": "ProduceRequestRateAndTime", + "y_label": "requests-per-sec,ms,ms", + "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProduceRequestRateAndTimeMs", + "attributes": "OneMinuteRate,Mean,99thPercentile" + }, + { + "graph_name": "ProduceRequestSize", + "y_label": "bytes,bytes", + "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProducerRequestSize", + "attributes": "Mean,99thPercentile" } ] }, @@ -152,10 +140,22 @@ "role": "console_consumer", "graphs": [ { - "graph_name": "SimpleConsumerRequestStats", - "y_label": "simple-consumer-throughput,simple-consumer-throughput-bytes,simple-consumer-latency-ms", - "bean_name": "kafka:type=kafka.SimpleConsumerStats", - "attributes": "FetchRequestsPerSecond,ConsumerThroughput,AvgFetchRequestMs" + "graph_name": "FetchRequestRateAndTime", + "y_label": "requests-per-sec,ms,ms", + "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchRequestRateAndTimeMs", + "attributes": "OneMinuteRate,Mean,99thPercentile" + }, + { + "graph_name": "FetchResponseSize", + "y_label": "bytes,bytes", + "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchResponseSize", + "attributes": "Mean,99thPercentile" + }, + { + "graph_name": "ConsumedMessageRate", + "y_label": "messages-per-sec", + "bean_name": "kafka.consumer:type=ConsumerTopicStat,name=AllTopicsMessagesPerSec", + "attributes": "OneMinuteRate" } ] }, Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1383739) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -468,6 +468,14 @@ } } } + + def createRequestByteBuffer(request: RequestOrResponse): ByteBuffer = { + val byteBuffer = ByteBuffer.allocate(request.sizeInBytes + 2) + byteBuffer.putShort(request.requestId.get) + request.writeTo(byteBuffer) + byteBuffer.rewind() + byteBuffer + } } object TestZKUtils { Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (revision 1383739) +++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (working copy) @@ -29,9 +29,8 @@ import kafka.utils.TestUtils._ import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig} import kafka.common.ErrorMapping -import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest} +import kafka.api.{RequestKeys, TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest} - class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) @@ -106,29 +105,20 @@ // create a topic metadata request val topicMetadataRequest = new TopicMetadataRequest(List(topic)) - val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2) - topicMetadataRequest.writeTo(serializedMetadataRequest) - serializedMetadataRequest.rewind() + val serializedMetadataRequest = TestUtils.createRequestByteBuffer(topicMetadataRequest) // create the kafka request handler val requestChannel = new RequestChannel(2, 5) val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1) - // mock the receive API to return the request buffer as created above - val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) - EasyMock.expect(receivedRequest.buffer).andReturn(serializedMetadataRequest) - EasyMock.replay(receivedRequest) - // call the API (to be tested) to get metadata - apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) - val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[BoundedByteBufferSend].buffer + apis.handleTopicMetadataRequest(new RequestChannel.Request + (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeNs=1)) + val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer // check assertions val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata - // verify the expected calls to log manager occurred in the right order - EasyMock.verify(receivedRequest) - topicMetadata } } \ No newline at end of file Index: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (revision 1383739) +++ core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (working copy) @@ -16,16 +16,15 @@ */ package kafka.server -import java.nio.ByteBuffer -import kafka.api.{FetchRequest, FetchRequestBuilder} import kafka.cluster.{Partition, Replica} import kafka.log.Log import kafka.message.{ByteBufferMessageSet, Message} -import kafka.network.{BoundedByteBufferReceive, RequestChannel} +import kafka.network.RequestChannel import kafka.utils.{Time, TestUtils, MockTime} import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite +import kafka.api.{FetchRequest, FetchRequestBuilder} class SimpleFetchTest extends JUnit3Suite { @@ -92,16 +91,10 @@ .replicaId(FetchRequest.NonFollowerId) .addFetch(topic, partitionId, 0, hw*2) .build() - val goodFetchBB = ByteBuffer.allocate(goodFetch.sizeInBytes) - goodFetch.writeTo(goodFetchBB) - goodFetchBB.rewind() + val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch) - val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) - EasyMock.expect(receivedRequest.buffer).andReturn(goodFetchBB) - EasyMock.replay(receivedRequest) - // send the request - apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, request=receivedRequest, start=1)) + apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeNs=1)) // make sure the log only reads bytes between 0->HW (5) EasyMock.verify(log) @@ -170,16 +163,10 @@ .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE) .build() - val fetchRequest = ByteBuffer.allocate(bigFetch.sizeInBytes) - bigFetch.writeTo(fetchRequest) - fetchRequest.rewind() + val fetchRequestBB = TestUtils.createRequestByteBuffer(bigFetch) - val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) - EasyMock.expect(receivedRequest.buffer).andReturn(fetchRequest) - EasyMock.replay(receivedRequest) - // send the request - apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) + apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeNs=1)) /** * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after Index: core/src/test/scala/unit/kafka/network/SocketServerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala (revision 1383739) +++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala (working copy) @@ -24,6 +24,9 @@ import kafka.utils.TestUtils import java.util.Random import junit.framework.Assert._ +import kafka.producer.SyncProducerConfig +import kafka.api.{TopicData, ProducerRequest} +import java.nio.ByteBuffer class SocketServerTest extends JUnitSuite { @@ -54,9 +57,9 @@ /* A simple request handler that just echos back the response */ def processRequest(channel: RequestChannel) { val request = channel.receiveRequest - val id = request.request.buffer.getShort - val send = new BoundedByteBufferSend(request.request.buffer.slice) - channel.sendResponse(new RequestChannel.Response(request.processor, request.requestKey, send, request.start, 15)) + val id = request.buffer.getShort + val send = new BoundedByteBufferSend(request.buffer.slice) + channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } def connect() = new Socket("localhost", server.port) @@ -69,10 +72,21 @@ @Test def simpleRequest() { val socket = connect() - sendRequest(socket, 0, "hello".getBytes) + val correlationId = SyncProducerConfig.DefaultCorrelationId + val clientId = SyncProducerConfig.DefaultClientId + val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + val ack = SyncProducerConfig.DefaultRequiredAcks + val emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]()) + + val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes()) + emptyRequest.writeTo(byteBuffer) + byteBuffer.rewind() + val serializedBytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(serializedBytes) + + sendRequest(socket, 0, serializedBytes) processRequest(server.requestChannel) - val response = new String(receiveResponse(socket)) - assertEquals("hello", response) + assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq) } @Test(expected=classOf[IOException]) Index: core/src/main/scala/kafka/cluster/Replica.scala =================================================================== --- core/src/main/scala/kafka/cluster/Replica.scala (revision 1383739) +++ core/src/main/scala/kafka/cluster/Replica.scala (working copy) @@ -36,7 +36,7 @@ val topic = partition.topic val partitionId = partition.partitionId - def logEndOffset_=(newLogEndOffset: Long) = { + def logEndOffset_=(newLogEndOffset: Long) { if (!isLocal) { logEndOffsetValue.set(newLogEndOffset) logEndOffsetUpdateTimeMsValue.set(time.milliseconds) Index: core/src/main/scala/kafka/cluster/Partition.scala =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (revision 1383739) +++ core/src/main/scala/kafka/cluster/Partition.scala (working copy) @@ -22,6 +22,8 @@ import kafka.api.LeaderAndIsr import kafka.server.ReplicaManager import kafka.common.ErrorMapping +import com.yammer.metrics.core.Gauge +import kafka.metrics.KafkaMetricsGroup /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR @@ -29,7 +31,7 @@ class Partition(val topic: String, val partitionId: Int, time: Time, - val replicaManager: ReplicaManager) extends Logging { + val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup { private val localBrokerId = replicaManager.config.brokerId private val logManager = replicaManager.logManager private val replicaFetcherManager = replicaManager.replicaFetcherManager @@ -45,6 +47,20 @@ private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) + newGauge( + topic + "-" + partitionId + "UnderReplicated", + new Gauge[Int] { + def value() = { + if (isUnderReplicated) 1 else 0 + } + } + ) + + def isUnderReplicated(): Boolean = { + // TODO: need to pass in replication factor from controller + inSyncReplicas.size < replicaManager.config.defaultReplicationFactor + } + def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = { val replicaOpt = getReplica(replicaId) replicaOpt match { @@ -182,6 +198,7 @@ info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache updateISR(newInSyncReplicas) + replicaManager.isrExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica) case None => // nothing to do if no longer leader @@ -240,6 +257,7 @@ updateISR(newInSyncReplicas) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) + replicaManager.isrShrinkRate.mark() } case None => // do nothing if no longer leader } Index: core/src/main/scala/kafka/log/LogStats.scala =================================================================== --- core/src/main/scala/kafka/log/LogStats.scala (revision 1383739) +++ core/src/main/scala/kafka/log/LogStats.scala (working copy) @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.util.concurrent.atomic.AtomicLong - -trait LogStatsMBean { - def getName(): String - def getSize(): Long - def getNumberOfSegments: Int - def getCurrentOffset: Long - def getNumAppendedMessages: Long -} - -class LogStats(val log: Log) extends LogStatsMBean { - private val numCumulatedMessages = new AtomicLong(0) - - def getName(): String = log.name - - def getSize(): Long = log.size - - def getNumberOfSegments: Int = log.numberOfSegments - - def getCurrentOffset: Long = log.logEndOffset - - def getNumAppendedMessages: Long = numCumulatedMessages.get - - def recordAppendedMessages(nMessages: Int) = numCumulatedMessages.getAndAdd(nMessages) -} Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1383739) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -26,6 +26,8 @@ import kafka.server.BrokerTopicStat import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet} import kafka.common.{KafkaException, InvalidMessageSizeException, OffsetOutOfRangeException} +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge object Log { val FileSuffix = ".kafka" @@ -130,7 +132,7 @@ @threadsafe private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean, time: Time, - brokerId: Int = 0) extends Logging { + brokerId: Int = 0) extends Logging with KafkaMetricsGroup { this.logIdent = "[Kafka Log on Broker " + brokerId + "], " import kafka.log.Log._ @@ -147,9 +149,19 @@ /* The actual segments of the log */ private[log] val segments: SegmentList[LogSegment] = loadSegments() - private val logStats = new LogStats(this) + newGauge( + name + "-" + "NumLogSegments", + new Gauge[Int] { + def value() = numberOfSegments + } + ) - Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName) + newGauge( + name + "-" + "LogEndOffset", + new Gauge[Long] { + def value() = logEndOffset + } + ) /* The name of this log */ def name = dir.getName() @@ -243,9 +255,8 @@ numberOfMessages += 1; } - BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages) - BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages) - logStats.recordAppendedMessages(numberOfMessages) + BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(numberOfMessages) + BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(numberOfMessages) // truncate the message set's buffer upto validbytes, before appending it to the on-disk log val validByteBuffer = messages.buffer.duplicate() Index: core/src/main/scala/kafka/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1383739) +++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) @@ -18,11 +18,12 @@ package kafka.producer import kafka.api._ -import kafka.message.MessageSet import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} import kafka.utils._ import java.util.Random -import kafka.common.{ErrorMapping, MessageSizeTooLargeException} +import kafka.common.ErrorMapping +import java.util.concurrent.TimeUnit +import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} object SyncProducer { val RequestKey: Short = 0 @@ -57,7 +58,7 @@ val buffer = new BoundedByteBufferSend(request).buffer trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() - if(requestTypeId == RequestKeys.Produce) { + if(requestTypeId == RequestKeys.ProduceKey) { val request = ProducerRequest.readFrom(buffer) trace(request.toString) } @@ -92,7 +93,6 @@ sentOnConnection = 0 lastConnectionTime = System.currentTimeMillis } - SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime) response } } @@ -101,7 +101,11 @@ * Send a message */ def send(producerRequest: ProducerRequest): ProducerResponse = { - val response = doSend(producerRequest) + ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes) + var response: Receive = null + ProducerRequestStat.requestTimer.time { + response = doSend(producerRequest) + } ProducerResponse.readFrom(response.buffer) } @@ -171,34 +175,7 @@ } } -trait SyncProducerStatsMBean { - def getProduceRequestsPerSecond: Double - def getAvgProduceRequestMs: Double - def getMaxProduceRequestMs: Double - def getNumProduceRequests: Long -} - -@threadsafe -class SyncProducerStats(monitoringDurationNs: Long) extends SyncProducerStatsMBean { - private val produceRequestStats = new SnapshotStats(monitoringDurationNs) - - def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs) - - def getProduceRequestsPerSecond: Double = produceRequestStats.getRequestsPerSecond - - def getAvgProduceRequestMs: Double = produceRequestStats.getAvgMetric / (1000.0 * 1000.0) - - def getMaxProduceRequestMs: Double = produceRequestStats.getMaxMetric / (1000.0 * 1000.0) - - def getNumProduceRequests: Long = produceRequestStats.getNumRequests -} - -object SyncProducerStats extends Logging { - private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats" - private val stats = new SyncProducerStats(1L * 1000 * 1000 * 1000) - swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName)) - - def recordProduceRequest(requestMs: Long) = { - stats.recordProduceRequest(requestMs) - } -} +object ProducerRequestStat extends KafkaMetricsGroup { + val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + val requestSizeHist = newHistogram("ProducerRequestSize") +} \ No newline at end of file Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1383739) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -20,8 +20,9 @@ import kafka.utils._ import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import kafka.serializer.Encoder -import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} +import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{QueueFullException, InvalidConfigException} +import kafka.metrics.KafkaMetricsGroup class Producer[K,V](config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // for testing only @@ -68,8 +69,10 @@ } private def recordStats(producerData: ProducerData[K,V]*) { - for (data <- producerData) - ProducerTopicStat.getProducerTopicStat(data.getTopic).recordMessagesPerTopic(data.getData.size) + for (data <- producerData) { + ProducerTopicStat.getProducerTopicStat(data.getTopic).messageRate.mark(data.getData.size) + ProducerTopicStat.getProducerAllTopicStat.messageRate.mark(data.getData.size) + } } private def asyncSend(producerData: ProducerData[K,V]*) { @@ -93,7 +96,7 @@ } } if(!added) { - AsyncProducerStats.recordDroppedEvents + AsyncProducerStats.droppedMessageRate.mark() error("Event queue is full of unsent messages, could not send event: " + data.toString) throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString) }else { @@ -118,31 +121,27 @@ } } -trait ProducerTopicStatMBean { - def getMessagesPerTopic: Long -} - @threadsafe -class ProducerTopicStat extends ProducerTopicStatMBean { - private val numCumulatedMessagesPerTopic = new AtomicLong(0) - - def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get - - def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) +class ProducerTopicStat(name: String) extends KafkaMetricsGroup { + val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS) + val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS) } -object ProducerTopicStat extends Logging { - private val stats = new Pool[String, ProducerTopicStat] +object ProducerTopicStat { + private val valueFactory = (k: String) => new ProducerTopicStat(k) + private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory)) + private val allTopicStat = new ProducerTopicStat("AllTopics") + def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat + def getProducerTopicStat(topic: String): ProducerTopicStat = { - var stat = stats.get(topic) - if (stat == null) { - stat = new ProducerTopicStat - if (stats.putIfNotExists(topic, stat) == null) - Utils.registerMBean(stat, "kafka.producer.Producer:type=kafka.ProducerTopicStat." + topic) - else - stat = stats.get(topic) - } - return stat + stats.getAndMaybePut(topic + "-") } } + +object ProducerStats extends KafkaMetricsGroup { + val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS) + val resendRate = newMeter( "ResendsPerSec", "resents", TimeUnit.SECONDS) + val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS) +} + Index: core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (revision 1383739) +++ core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (working copy) @@ -17,22 +17,9 @@ package kafka.producer.async -import java.util.concurrent.atomic.AtomicInteger -import kafka.utils.Utils +import kafka.metrics.KafkaMetricsGroup +import java.util.concurrent.TimeUnit -class AsyncProducerStats extends AsyncProducerStatsMBean { - val droppedEvents = new AtomicInteger(0) - - def getAsyncProducerDroppedEvents: Int = droppedEvents.get - - def recordDroppedEvents = droppedEvents.getAndAdd(1) +object AsyncProducerStats extends KafkaMetricsGroup { + val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS) } - -object AsyncProducerStats { - private val stats = new AsyncProducerStats - val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats" - - Utils.registerMBean(stats, ProducerMBeanName) - - def recordDroppedEvents = stats.recordDroppedEvents -} Index: core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (revision 1383739) +++ core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (working copy) @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.producer.async - -trait AsyncProducerStatsMBean { - def getAsyncProducerDroppedEvents: Int -} - -trait AsyncProducerQueueSizeStatsMBean { - def getAsyncProducerQueueSize: Int -} Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1383739) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -24,7 +24,7 @@ import kafka.utils.{Utils, Logging} import scala.collection.Map import scala.collection.mutable.{ListBuffer, HashMap} -import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData} +import kafka.api._ class DefaultEventHandler[K,V](config: ProducerConfig, @@ -33,6 +33,10 @@ private val producerPool: ProducerPool, private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata]) extends EventHandler[K,V] with Logging { + val isSync = config.producerType match { + case "sync" => true + case _ => false + } val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos) @@ -41,6 +45,11 @@ def handle(events: Seq[ProducerData[K,V]]) { lock synchronized { val serializedData = serialize(events) + serializedData.foreach{ + pd => val dataSize = pd.data.foldLeft(0)(_ + _.payloadSize) + ProducerTopicStat.getProducerTopicStat(pd.topic).byteRate.mark(dataSize) + ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize) + } var outstandingProduceRequests = serializedData var remainingRetries = config.producerRetries + 1 while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { @@ -51,9 +60,11 @@ // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic))) remainingRetries -= 1 + ProducerStats.resendRate.mark() } } if(outstandingProduceRequests.size > 0) { + ProducerStats.failedSendRate.mark() error("Failed to send the following requests: " + outstandingProduceRequests) throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null) } @@ -90,7 +101,25 @@ } def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = { - events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m)))) + val serializedProducerData = new ListBuffer[ProducerData[K,Message]] + events.foreach {e => + val serializedMessages = new ListBuffer[Message] + for (d <- e.getData) { + try { + serializedMessages += encoder.toMessage(d) + } catch { + case t => + ProducerStats.serializationErrorRate.mark() + if (isSync) + throw t + else + error("Error serializing message " + t) + } + } + if (serializedMessages.size > 0) + serializedProducerData += new ProducerData[K,Message](e.getTopic, e.getKey, serializedMessages) + } + serializedProducerData } def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = { Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1383739) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -21,16 +21,25 @@ import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue} import collection.mutable.ListBuffer import kafka.producer.ProducerData +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge class ProducerSendThread[K,V](val threadName: String, val queue: BlockingQueue[ProducerData[K,V]], val handler: EventHandler[K,V], val queueTime: Long, - val batchSize: Int) extends Thread(threadName) with Logging { + val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup { private val shutdownLatch = new CountDownLatch(1) private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]]) + newGauge( + "ProducerQueueSize-" + getId, + new Gauge[Int] { + def value() = queue.size + } + ) + override def run { try { Index: core/src/main/scala/kafka/message/FileMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/FileMessageSet.scala (revision 1383739) +++ core/src/main/scala/kafka/message/FileMessageSet.scala (working copy) @@ -24,6 +24,8 @@ import kafka.utils._ import kafka.common.KafkaException +import java.util.concurrent.TimeUnit +import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} /** * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts @@ -157,10 +159,9 @@ */ def flush() = { checkMutable() - val startTime = SystemTime.milliseconds - channel.force(true) - val elapsedTime = SystemTime.milliseconds - startTime - LogFlushStats.recordFlushRequest(elapsedTime) + LogFlushStats.logFlushTimer.time { + channel.force(true) + } } /** @@ -238,38 +239,8 @@ else next } - } -trait LogFlushStatsMBean { - def getFlushesPerSecond: Double - def getAvgFlushMs: Double - def getTotalFlushMs: Long - def getMaxFlushMs: Double - def getNumFlushes: Long +object LogFlushStats extends KafkaMetricsGroup { + val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) } - -@threadsafe -class LogFlushStats(monitorDurationNs: Long) extends LogFlushStatsMBean { - private val flushRequestStats = new SnapshotStats(monitorDurationNs) - - def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs) - - def getFlushesPerSecond: Double = flushRequestStats.getRequestsPerSecond - - def getAvgFlushMs: Double = flushRequestStats.getAvgMetric - - def getTotalFlushMs: Long = flushRequestStats.getTotalMetric - - def getMaxFlushMs: Double = flushRequestStats.getMaxMetric - - def getNumFlushes: Long = flushRequestStats.getNumRequests -} - -object LogFlushStats extends Logging { - private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats" - private val stats = new LogFlushStats(1L * 1000 * 1000 * 1000) - Utils.registerMBean(stats, LogFlushStatsMBeanName) - - def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs) -} Index: core/src/main/scala/kafka/network/SocketServerStats.scala =================================================================== --- core/src/main/scala/kafka/network/SocketServerStats.scala (revision 1383739) +++ core/src/main/scala/kafka/network/SocketServerStats.scala (working copy) @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import kafka.utils._ -import kafka.api.RequestKeys - -trait SocketServerStatsMBean { - def getProduceRequestsPerSecond: Double - def getFetchRequestsPerSecond: Double - def getAvgProduceRequestMs: Double - def getMaxProduceRequestMs: Double - def getAvgFetchRequestMs: Double - def getMaxFetchRequestMs: Double - def getBytesReadPerSecond: Double - def getBytesWrittenPerSecond: Double - def getNumFetchRequests: Long - def getNumProduceRequests: Long - def getTotalBytesRead: Long - def getTotalBytesWritten: Long - def getTotalFetchRequestMs: Long - def getTotalProduceRequestMs: Long -} - -@threadsafe -class SocketServerStats(val monitorDurationNs: Long, val time: Time) extends SocketServerStatsMBean { - - def this(monitorDurationNs: Long) = this(monitorDurationNs, SystemTime) - val produceTimeStats = new SnapshotStats(monitorDurationNs) - val fetchTimeStats = new SnapshotStats(monitorDurationNs) - val produceBytesStats = new SnapshotStats(monitorDurationNs) - val fetchBytesStats = new SnapshotStats(monitorDurationNs) - - def recordRequest(requestTypeId: Short, durationNs: Long) { - requestTypeId match { - case r if r == RequestKeys.Produce => - produceTimeStats.recordRequestMetric(durationNs) - case r if r == RequestKeys.Fetch => - fetchTimeStats.recordRequestMetric(durationNs) - case _ => /* not collecting; let go */ - } - } - - def recordBytesWritten(bytes: Int): Unit = fetchBytesStats.recordRequestMetric(bytes) - - def recordBytesRead(bytes: Int): Unit = produceBytesStats.recordRequestMetric(bytes) - - def getProduceRequestsPerSecond: Double = produceTimeStats.getRequestsPerSecond - - def getFetchRequestsPerSecond: Double = fetchTimeStats.getRequestsPerSecond - - def getAvgProduceRequestMs: Double = produceTimeStats.getAvgMetric / (1000.0 * 1000.0) - - def getMaxProduceRequestMs: Double = produceTimeStats.getMaxMetric / (1000.0 * 1000.0) - - def getAvgFetchRequestMs: Double = fetchTimeStats.getAvgMetric / (1000.0 * 1000.0) - - def getMaxFetchRequestMs: Double = fetchTimeStats.getMaxMetric / (1000.0 * 1000.0) - - def getBytesReadPerSecond: Double = produceBytesStats.getAvgMetric - - def getBytesWrittenPerSecond: Double = fetchBytesStats.getAvgMetric - - def getNumFetchRequests: Long = fetchTimeStats.getNumRequests - - def getNumProduceRequests: Long = produceTimeStats.getNumRequests - - def getTotalBytesRead: Long = produceBytesStats.getTotalMetric - - def getTotalBytesWritten: Long = fetchBytesStats.getTotalMetric - - def getTotalFetchRequestMs: Long = fetchTimeStats.getTotalMetric - - def getTotalProduceRequestMs: Long = produceTimeStats.getTotalMetric -} Index: core/src/main/scala/kafka/network/SocketServer.scala =================================================================== --- core/src/main/scala/kafka/network/SocketServer.scala (revision 1383739) +++ core/src/main/scala/kafka/network/SocketServer.scala (working copy) @@ -41,7 +41,6 @@ private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) private var acceptor: Acceptor = new Acceptor(port, processors) - val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs) val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) /** @@ -49,7 +48,7 @@ */ def startup() { for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(i, time, maxRequestSize, requestChannel, stats) + processors(i) = new Processor(i, time, maxRequestSize, requestChannel) Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start() } // register the processor threads for notification of responses @@ -187,8 +186,7 @@ private[kafka] class Processor(val id: Int, val time: Time, val maxRequestSize: Int, - val requestChannel: RequestChannel, - val stats: SocketServerStats) extends AbstractServerThread { + val requestChannel: RequestChannel) extends AbstractServerThread { private val newConnections = new ConcurrentLinkedQueue[SocketChannel](); @@ -240,10 +238,10 @@ var curr = requestChannel.receiveResponse(id) while(curr != null) { trace("Socket server received response to send, registering for write: " + curr) - val key = curr.requestKey.asInstanceOf[SelectionKey] + val key = curr.request.requestKey.asInstanceOf[SelectionKey] try { key.interestOps(SelectionKey.OP_WRITE) - key.attach(curr.response) + key.attach(curr) } catch { case e: CancelledKeyException => { debug("Ignoring response for closed socket.") @@ -288,18 +286,17 @@ */ def read(key: SelectionKey) { val socketChannel = channelFor(key) - var request = key.attachment.asInstanceOf[Receive] + var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { - request = new BoundedByteBufferReceive(maxRequestSize) - key.attach(request) + receive = new BoundedByteBufferReceive(maxRequestSize) + key.attach(receive) } - val read = request.readFrom(socketChannel) - stats.recordBytesRead(read) + val read = receive.readFrom(socketChannel) trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress()) if(read < 0) { close(key) - } else if(request.complete) { - val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds) + } else if(receive.complete) { + val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeNs = time.nanoseconds) requestChannel.sendRequest(req) trace("Recieved request, sending for processing by handler: " + req) key.attach(null) @@ -315,13 +312,14 @@ */ def write(key: SelectionKey) { val socketChannel = channelFor(key) - var response = key.attachment().asInstanceOf[Send] - if(response == null) + val response = key.attachment().asInstanceOf[RequestChannel.Response] + val responseSend = response.responseSend + if(responseSend == null) throw new IllegalStateException("Registered for write interest but no response attached to key.") - val written = response.writeTo(socketChannel) - stats.recordBytesWritten(written) + val written = responseSend.writeTo(socketChannel) trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress()) - if(response.complete) { + if(responseSend.complete) { + response.request.updateRequestMetrics() key.attach(null) key.interestOps(SelectionKey.OP_READ) } else { Index: core/src/main/scala/kafka/network/RequestChannel.scala =================================================================== --- core/src/main/scala/kafka/network/RequestChannel.scala (revision 1383739) +++ core/src/main/scala/kafka/network/RequestChannel.scala (working copy) @@ -19,24 +19,79 @@ import java.util.concurrent._ import kafka.utils.SystemTime +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge +import java.nio.ByteBuffer +import kafka.api._ -object RequestChannel { - val AllDone = new Request(1, 2, null, 0) - case class Request(processor: Int, requestKey: Any, request: Receive, start: Long) - case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsedNs: Long) { +object RequestChannel { + val AllDone = new Request(1, 2, getShutdownReceive(), 0) + + def getShutdownReceive() = { + val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Array[TopicData]()) + val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2) + byteBuffer.putShort(RequestKeys.ProduceKey) + emptyProducerRequest.writeTo(byteBuffer) + byteBuffer.rewind() + byteBuffer + } + + case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeNs: Long) { + var dequeueTimeNs = -1L + var apiLocalCompleteTimeNs = -1L + var responseCompleteTimeNs = -1L + val requestId = buffer.getShort() + val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) + buffer.rewind() + + def updateRequestMetrics() { + val endTimeNs = SystemTime.nanoseconds + val queueTime = (dequeueTimeNs - startTimeNs).max(0L) + val apiLocalTime = (apiLocalCompleteTimeNs - dequeueTimeNs).max(0L) + val apiRemoteTime = (responseCompleteTimeNs - apiLocalCompleteTimeNs).max(0L) + val responseSendTime = (endTimeNs - responseCompleteTimeNs).max(0L) + val totalTime = endTimeNs - startTimeNs + var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId))) + if (requestId == RequestKeys.FetchKey) { + val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower + metricsList ::= ( if (isFromFollower) + RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName) + else + RequestMetrics.metricsMap(RequestMetrics.consumerFetchMetricName) ) + } + metricsList.foreach{ + m => m.requestRate.mark() + m.queueTimeHist.update(queueTime) + m.localTimeHist.update(apiLocalTime) + m.remoteTimeHist.update(apiRemoteTime) + m.responseSendTimeHist.update(responseSendTime) + m.totalTimeHist.update(totalTime) + } + } + } + + case class Response(processor: Int, request: Request, responseSend: Send) { + request.responseCompleteTimeNs = SystemTime.nanoseconds + def this(request: Request, send: Send) = - this(request.processor, request.requestKey, send, request.start, SystemTime.nanoseconds - request.start) + this(request.processor, request, send) } } -class RequestChannel(val numProcessors: Int, val queueSize: Int) { +class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) for(i <- 0 until numProcessors) responseQueues(i) = new ArrayBlockingQueue[RequestChannel.Response](queueSize) - + newGauge( + "RequestQueueSize", + new Gauge[Int] { + def value() = requestQueue.size + } + ) + /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { requestQueue.put(request) @@ -60,5 +115,26 @@ def addResponseListener(onResponse: Int => Unit) { responseListeners ::= onResponse } +} +object RequestMetrics { + val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics] + val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer" + val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower" + (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1) + ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) } + +class RequestMetrics(name: String) extends KafkaMetricsGroup { + val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS) + // time a request spent in a request queue + val queueTimeHist = newHistogram(name + "-QueueTimeNs") + // time a request takes to be processed at the local broker + val localTimeHist = newHistogram(name + "-LocalTimeNs") + // time a request takes to wait on remote brokers (only relevant to fetch and produce requests) + val remoteTimeHist = newHistogram(name + "-RemoteTimeNs") + // time to send the response to the requester + val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeNs") + val totalTimeHist = newHistogram(name + "-TotalTimeNs") +} + Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1383739) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -21,6 +21,8 @@ import kafka.network._ import kafka.utils._ import kafka.common.ErrorMapping +import java.util.concurrent.TimeUnit +import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} /** * A consumer of kafka messages @@ -91,15 +93,13 @@ * @return a set of fetched messages */ def fetch(request: FetchRequest): FetchResponse = { - val startTime = SystemTime.nanoseconds - val response = sendRequest(request) + var response: Receive = null + FetchRequestAndResponseStat.requestTimer.time { + response = sendRequest(request) + } val fetchResponse = FetchResponse.readFrom(response.buffer) val fetchedSize = fetchResponse.sizeInBytes - - val endTime = SystemTime.nanoseconds - SimpleConsumerStats.recordFetchRequest(endTime - startTime) - SimpleConsumerStats.recordConsumptionThroughput(fetchedSize) - + FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize) fetchResponse } @@ -125,39 +125,7 @@ } } -trait SimpleConsumerStatsMBean { - def getFetchRequestsPerSecond: Double - def getAvgFetchRequestMs: Double - def getMaxFetchRequestMs: Double - def getNumFetchRequests: Long - def getConsumerThroughput: Double -} - -@threadsafe -class SimpleConsumerStats(monitoringDurationNs: Long) extends SimpleConsumerStatsMBean { - private val fetchRequestStats = new SnapshotStats(monitoringDurationNs) - - def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs) - - def recordConsumptionThroughput(data: Long) = fetchRequestStats.recordThroughputMetric(data) - - def getFetchRequestsPerSecond: Double = fetchRequestStats.getRequestsPerSecond - - def getAvgFetchRequestMs: Double = fetchRequestStats.getAvgMetric / (1000.0 * 1000.0) - - def getMaxFetchRequestMs: Double = fetchRequestStats.getMaxMetric / (1000.0 * 1000.0) - - def getNumFetchRequests: Long = fetchRequestStats.getNumRequests - - def getConsumerThroughput: Double = fetchRequestStats.getThroughput -} - -object SimpleConsumerStats extends Logging { - private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats" - private val stats = new SimpleConsumerStats(1 * 1000L * 1000L * 1000L) - Utils.registerMBean(stats, simpleConsumerstatsMBeanName) - - def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs) - def recordConsumptionThroughput(data: Long) = stats.recordConsumptionThroughput(data) -} - +object FetchRequestAndResponseStat extends KafkaMetricsGroup { + val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + val respondSizeHist = newHistogram("FetchResponseSize") +} \ No newline at end of file Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1383739) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -47,8 +47,8 @@ currentTopicInfo.resetConsumeOffset(consumedOffset) val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) - ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) - ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1) + ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark() + ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark() item } Index: core/src/main/scala/kafka/consumer/ConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConnector.scala (revision 1383739) +++ core/src/main/scala/kafka/consumer/ConsumerConnector.scala (working copy) @@ -18,7 +18,7 @@ package kafka.consumer import scala.collection._ -import kafka.utils.{Utils, Logging} +import kafka.utils.Logging import kafka.serializer.{DefaultDecoder, Decoder} /** @@ -64,8 +64,6 @@ } object Consumer extends Logging { - private val consumerStatsMBeanName = "kafka:type=kafka.ConsumerStats" - /** * Create a ConsumerConnector * @@ -74,7 +72,6 @@ */ def create(config: ConsumerConfig): ConsumerConnector = { val consumerConnect = new ZookeeperConsumerConnector(config) - Utils.registerMBean(consumerConnect, consumerStatsMBeanName) consumerConnect } @@ -86,7 +83,6 @@ */ def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = { val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config) - Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName) consumerConnect } } Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 1383739) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (working copy) @@ -21,7 +21,6 @@ import java.util.concurrent.atomic._ import kafka.message._ import kafka.utils.Logging -import kafka.common.ErrorMapping private[consumer] class PartitionTopicInfo(val topic: String, val brokerId: Int, @@ -59,8 +58,8 @@ chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) val newOffset = fetchedOffset.addAndGet(size) debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) - ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size) - ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size) + ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size) + ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size) } } Index: core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (revision 1383739) +++ core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (working copy) @@ -17,44 +17,24 @@ package kafka.consumer -import java.util.concurrent.atomic.AtomicLong -import kafka.utils.{Pool, Utils, threadsafe, Logging} +import kafka.utils.{Pool, threadsafe, Logging} +import java.util.concurrent.TimeUnit +import kafka.metrics.KafkaMetricsGroup -trait ConsumerTopicStatMBean { - def getMessagesPerTopic: Long - def getBytesPerTopic: Long -} - @threadsafe -class ConsumerTopicStat extends ConsumerTopicStatMBean { - private val numCumulatedMessagesPerTopic = new AtomicLong(0) - private val numCumulatedBytesPerTopic = new AtomicLong(0) - - def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get - - def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) - - def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get - - def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes) +class ConsumerTopicStat(name: String) extends KafkaMetricsGroup { + val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS) + val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS) } object ConsumerTopicStat extends Logging { - private val stats = new Pool[String, ConsumerTopicStat] - private val allTopicStat = new ConsumerTopicStat - Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat") + private val valueFactory = (k: String) => new ConsumerTopicStat(k) + private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory)) + private val allTopicStat = new ConsumerTopicStat("AllTopics") def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat def getConsumerTopicStat(topic: String): ConsumerTopicStat = { - var stat = stats.get(topic) - if (stat == null) { - stat = new ConsumerTopicStat - if (stats.putIfNotExists(topic, stat) == null) - Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic) - else - stat = stats.get(topic) - } - return stat + stats.getAndMaybePut(topic + "-") } } Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1383739) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -32,6 +32,8 @@ import kafka.serializer.Decoder import kafka.utils.ZkUtils._ import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException} +import com.yammer.metrics.core.Gauge +import kafka.metrics.KafkaMetricsGroup /** @@ -73,21 +75,9 @@ val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L) } -/** - * JMX interface for monitoring consumer - */ -trait ZookeeperConsumerConnectorMBean { - def getPartOwnerStats: String - def getConsumerGroup: String - def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long - def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long - def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long -} - private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) // for testing only - extends ConsumerConnector with ZookeeperConsumerConnectorMBean - with Logging { + extends ConsumerConnector with Logging with KafkaMetricsGroup { private val isShuttingDown = new AtomicBoolean(false) private val rebalanceLock = new Object private var fetcher: Option[ConsumerFetcherManager] = None @@ -260,58 +250,6 @@ } } - // for JMX - def getPartOwnerStats(): String = { - val builder = new StringBuilder - for ((topic, infos) <- topicRegistry) { - builder.append("\n" + topic + ": [") - val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - for(partition <- infos.values) { - builder.append("\n {") - builder.append{partition} - builder.append(",fetch offset:" + partition.getFetchOffset) - builder.append(",consumer offset:" + partition.getConsumeOffset) - builder.append("}") - } - builder.append("\n ]") - } - builder.toString - } - - // for JMX - def getConsumerGroup(): String = config.groupId - - def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long = - getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId) - - def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = { - val partitionInfos = topicRegistry.get(topic) - if (partitionInfos != null) { - val partitionInfo = partitionInfos.get(partitionId) - if (partitionInfo != null) - return partitionInfo.getConsumeOffset - } - - // otherwise, try to get it from zookeeper - try { - val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - val znode = topicDirs.consumerOffsetDir + "/" + partitionId - val offsetString = readDataMaybeNull(zkClient, znode)._1 - offsetString match { - case Some(offset) => offset.toLong - case None => -1L - } - } - catch { - case e => - error("error in getConsumedOffset JMX ", e) - -2L - } - } - - def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = - earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime) - private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = { var simpleConsumer: SimpleConsumer = null var producedOffset: Long = -1L @@ -728,6 +666,12 @@ val topicThreadId = e._1 val q = e._2._1 topicThreadIdAndQueues.put(topicThreadId, q) + newGauge( + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", + new Gauge[Int] { + def value() = q.size + } + ) }) val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1383739) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -20,7 +20,6 @@ import java.io._ import java.nio._ import java.nio.channels._ -import java.util.concurrent.atomic._ import java.lang.management._ import java.util.zip.CRC32 import javax.management._ @@ -685,102 +684,8 @@ for (forever <- Stream.continually(1); t <- coll) yield t stream.iterator } - } -class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) { - private val time: Time = SystemTime - - private val complete = new AtomicReference(new Stats()) - private val current = new AtomicReference(new Stats()) - private val total = new AtomicLong(0) - private val numCumulatedRequests = new AtomicLong(0) - - def recordRequestMetric(requestNs: Long) { - val stats = current.get - stats.add(requestNs) - total.getAndAdd(requestNs) - numCumulatedRequests.getAndAdd(1) - val ageNs = time.nanoseconds - stats.start - // if the current stats are too old it is time to swap - if(ageNs >= monitorDurationNs) { - val swapped = current.compareAndSet(stats, new Stats()) - if(swapped) { - complete.set(stats) - stats.end.set(time.nanoseconds) - } - } - } - - def recordThroughputMetric(data: Long) { - val stats = current.get - stats.addData(data) - val ageNs = time.nanoseconds - stats.start - // if the current stats are too old it is time to swap - if(ageNs >= monitorDurationNs) { - val swapped = current.compareAndSet(stats, new Stats()) - if(swapped) { - complete.set(stats) - stats.end.set(time.nanoseconds) - } - } - } - - def getNumRequests(): Long = numCumulatedRequests.get - - def getRequestsPerSecond: Double = { - val stats = complete.get - stats.numRequests / stats.durationSeconds - } - - def getThroughput: Double = { - val stats = complete.get - stats.totalData / stats.durationSeconds - } - - def getAvgMetric: Double = { - val stats = complete.get - if (stats.numRequests == 0) { - 0 - } - else { - stats.totalRequestMetric / stats.numRequests - } - } - - def getTotalMetric: Long = total.get - - def getMaxMetric: Double = complete.get.maxRequestMetric - - class Stats { - val start = time.nanoseconds - var end = new AtomicLong(-1) - var numRequests = 0 - var totalRequestMetric: Long = 0L - var maxRequestMetric: Long = 0L - var totalData: Long = 0L - private val lock = new Object() - - def addData(data: Long) { - lock synchronized { - totalData += data - } - } - - def add(requestNs: Long) { - lock synchronized { - numRequests +=1 - totalRequestMetric += requestNs - maxRequestMetric = scala.math.max(maxRequestMetric, requestNs) - } - } - - def durationSeconds: Double = (end.get - start) / (1000.0 * 1000.0 * 1000.0) - - def durationMs: Double = (end.get - start) / (1000.0 * 1000.0) - } -} - /** * A wrapper that synchronizes JSON in scala, which is not threadsafe. */ Index: core/src/main/scala/kafka/utils/Pool.scala =================================================================== --- core/src/main/scala/kafka/utils/Pool.scala (revision 1383739) +++ core/src/main/scala/kafka/utils/Pool.scala (working copy) @@ -21,12 +21,14 @@ import java.util.concurrent._ import collection.JavaConversions import kafka.common.KafkaException +import java.lang.Object class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] { private val pool = new ConcurrentHashMap[K, V] - + private val createLock = new Object + def this(m: collection.Map[K, V]) { this() m.foreach(kv => pool.put(kv._1, kv._2)) @@ -52,8 +54,12 @@ throw new KafkaException("Empty value factory in pool.") val curr = pool.get(key) if (curr == null) { - pool.putIfAbsent(key, valueFactory.get(key)) - pool.get(key) + createLock synchronized { + val curr = pool.get(key) + if (curr == null) + pool.put(key, valueFactory.get(key)) + pool.get(key) + } } else curr Index: core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala =================================================================== --- core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (revision 1383739) +++ core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (working copy) @@ -27,38 +27,14 @@ trait KafkaMetricsGroup extends Logging { /** - * This method enables the user to form logical sub-groups of this - * KafkaMetricsGroup by inserting a sub-group identifier in the package - * string. - * - * @return The sub-group identifier. - */ - def metricsGroupIdent: String = "" - - /** * Creates a new MetricName object for gauges, meters, etc. created for this - * metrics group. It uses the metricsGroupIdent to create logical sub-groups. - * This is currently specifically of use to classes under kafka, with - * broker-id being the most common metrics grouping strategy. - * + * metrics group. * @param name Descriptive name of the metric. * @return Sanitized metric name object. */ private def metricName(name: String) = { - val ident = metricsGroupIdent val klass = this.getClass - val pkg = { - val actualPkg = if (klass.getPackage == null) "" else klass.getPackage.getName - if (ident.nonEmpty) { - // insert the sub-group identifier after the top-level package - if (actualPkg.contains(".")) - actualPkg.replaceFirst("""\.""", ".%s.".format(ident)) - else - actualPkg + "." + ident - } - else - actualPkg - } + val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName val simpleName = klass.getSimpleName.replaceAll("\\$$", "") new MetricName(pkg, simpleName, name) } Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1383739) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -18,7 +18,7 @@ package kafka.server import java.io.File -import kafka.network.{SocketServerStats, SocketServer} +import kafka.network.SocketServer import kafka.log.LogManager import kafka.utils._ import java.util.concurrent._ @@ -34,7 +34,6 @@ val CleanShutdownFile = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) - private val statsMBeanName = "kafka:type=kafka.SocketServerStats" var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null @@ -82,8 +81,6 @@ socketServer.startup - Utils.registerMBean(socketServer.stats, statsMBeanName) - /* start client */ kafkaZookeeper = new KafkaZooKeeper(config) // starting relevant replicas and leader election for partitions assigned to this broker @@ -123,7 +120,6 @@ replicaManager.shutdown() if (socketServer != null) socketServer.shutdown() - Utils.unregisterMBean(statsMBeanName) if(logManager != null) logManager.shutdown() @@ -144,8 +140,6 @@ def awaitShutdown(): Unit = shutdownLatch.await() def getLogManager(): LogManager = logManager - - def getStats(): SocketServerStats = socketServer.stats } Index: core/src/main/scala/kafka/server/RequestPurgatory.scala =================================================================== --- core/src/main/scala/kafka/server/RequestPurgatory.scala (revision 1383739) +++ core/src/main/scala/kafka/server/RequestPurgatory.scala (working copy) @@ -33,7 +33,6 @@ * for example a key could be a (topic, partition) pair. */ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { - val creationTimeNs = SystemTime.nanoseconds val satisfied = new AtomicBoolean(false) } @@ -67,32 +66,13 @@ /* a list of requests watching each key */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - private val numDelayedRequestsBeanName = "NumDelayedRequests" - private val timeToSatisfyHistogramBeanName = "TimeToSatisfyInNs" - private val satisfactionRateBeanName = "SatisfactionRate" - private val expirationRateBeanName = "ExpirationRate" - - val satisfactionRateMeter = newMeter( - satisfactionRateBeanName, - "requests", - TimeUnit.SECONDS - ) - - val timeToSatisfyHistogram = newHistogram(timeToSatisfyHistogramBeanName, biased = true) - newGauge( - numDelayedRequestsBeanName, + "NumDelayedRequests", new Gauge[Int] { def value() = expiredRequestReaper.unsatisfied.get() } ) - val expirationRateMeter = newMeter( - expirationRateBeanName, - "requests", - TimeUnit.SECONDS - ) - /* background thread expiring requests that have been waiting too long */ private val expiredRequestReaper = new ExpiredRequestReaper private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper) @@ -196,10 +176,6 @@ iter.remove() val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { - val requestNs = SystemTime.nanoseconds - curr.creationTimeNs - satisfactionRateMeter.mark() - timeToSatisfyHistogram.update(requestNs) - response += curr liveCount -= 1 expiredRequestReaper.satisfyRequest() @@ -282,7 +258,6 @@ val curr = delayed.take() val updated = curr.satisfied.compareAndSet(false, true) if(updated) { - expirationRateMeter.mark() unsatisfied.getAndDecrement() for(key <- curr.keys) watchersFor(key).decLiveCount() Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1383739) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -24,13 +24,16 @@ import kafka.log.LogManager import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr} import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping} +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge +import java.util.concurrent.TimeUnit object ReplicaManager { val UnknownLogEndOffset = -1L } class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, - val logManager: LogManager) extends Logging { + val logManager: LogManager) extends Logging with KafkaMetricsGroup { private val allPartitions = new Pool[(String, Int), Partition] private var leaderPartitions = new mutable.HashSet[Partition]() private val leaderPartitionsLock = new Object @@ -41,6 +44,26 @@ val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) + newGauge( + "LeaderCount", + new Gauge[Int] { + def value() = leaderPartitions.size + } + ) + newGauge( + "UnderReplicatedPartitions", + new Gauge[Int] { + def value() = { + leaderPartitionsLock synchronized { + leaderPartitions.count(_.isUnderReplicated) + } + } + } + ) + val isrExpandRate = newMeter("ISRExpand", "expands", TimeUnit.SECONDS) + val isrShrinkRate = newMeter("ISRShrink", "shrinks", TimeUnit.SECONDS) + + def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) Index: core/src/main/scala/kafka/server/KafkaController.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaController.scala (revision 1383739) +++ core/src/main/scala/kafka/server/KafkaController.scala (working copy) @@ -28,8 +28,10 @@ import collection.JavaConversions._ import kafka.utils.{ShutdownableThread, ZkUtils, Logging} import java.lang.Object -import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} -import kafka.common.{KafkaException, PartitionOfflineException} +import com.yammer.metrics.core.Gauge +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue} +import kafka.common.{PartitionOfflineException, KafkaException} +import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} class RequestSendThread(val controllerId: Int, @@ -52,9 +54,9 @@ receive = channel.receive() var response: RequestOrResponse = null request.requestId.get match { - case RequestKeys.LeaderAndISRRequest => + case RequestKeys.LeaderAndIsrKey => response = LeaderAndISRResponse.readFrom(receive.buffer) - case RequestKeys.StopReplicaRequest => + case RequestKeys.StopReplicaKey => response = StopReplicaResponse.readFrom(receive.buffer) } trace("got a response %s".format(controllerId, response, toBrokerId)) @@ -144,7 +146,7 @@ messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], requestSendThread: RequestSendThread) -class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging { +class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup{ this.logIdent = "[Controller " + config.brokerId + "], " private var isRunning = true private val controllerLock = new Object @@ -155,6 +157,13 @@ private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null private var allLeaders: mutable.Map[(String, Int), Int] = null + newGauge( + "ActiveControllerCount", + new Gauge[Int] { + def value() = if (isActive) 1 else 0 + } + ) + // Return true if this controller succeeds in the controller leader election private def tryToBecomeController(): Boolean = { val controllerStatus = @@ -369,6 +378,7 @@ } else{ warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), liveBrokerIds)) + ControllerStat.offlinePartitionRate.mark() } } @@ -479,10 +489,13 @@ debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s" .format(liveAssignedReplicasToThisPartition.mkString(","))) liveAssignedReplicasToThisPartition.isEmpty match { - case true => throw new PartitionOfflineException(("No replica for partition " + - "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) + - " Assigned replicas are: [%s]".format(assignedReplicas)) + case true => + ControllerStat.offlinePartitionRate.mark() + throw new PartitionOfflineException(("No replica for partition " + + "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) + + " Assigned replicas are: [%s]".format(assignedReplicas)) case false => + ControllerStat.uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicasToThisPartition.head warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) + "There's potential data loss") @@ -509,18 +522,20 @@ class BrokerChangeListener() extends IZkChildListener with Logging { this.logIdent = "[Controller " + config.brokerId + "], " def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { - controllerLock synchronized { - val curBrokerIds = currentBrokerList.map(_.toInt).toSet - val newBrokerIds = curBrokerIds -- liveBrokerIds - val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) - val deletedBrokerIds = liveBrokerIds -- curBrokerIds - liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) - liveBrokerIds = liveBrokers.map(_.id) - info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s" - .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(","))) - newBrokers.foreach(controllerChannelManager.addBroker(_)) - deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_)) - onBrokerChange(newBrokerIds) + ControllerStat.leaderElectionTimer.time { + controllerLock synchronized { + val curBrokerIds = currentBrokerList.map(_.toInt).toSet + val newBrokerIds = curBrokerIds -- liveBrokerIds + val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) + val deletedBrokerIds = liveBrokerIds -- curBrokerIds + liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) + liveBrokerIds = liveBrokers.map(_.id) + info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s" + .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(","))) + newBrokers.foreach(controllerChannelManager.addBroker(_)) + deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_)) + onBrokerChange(newBrokerIds) + } } } } @@ -591,4 +606,10 @@ } } } -} \ No newline at end of file +} + +object ControllerStat extends KafkaMetricsGroup { + val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS) + val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) + val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) +} Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandler.scala (revision 1383739) +++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala (working copy) @@ -19,7 +19,8 @@ import kafka.network._ import kafka.utils._ -import java.util.concurrent.atomic.AtomicLong +import kafka.metrics.KafkaMetricsGroup +import java.util.concurrent.TimeUnit /** * A thread that answers kafka requests. @@ -30,10 +31,11 @@ def run() { while(true) { val req = requestChannel.receiveRequest() - if(req == RequestChannel.AllDone){ + if(req eq RequestChannel.AllDone){ trace("receives shut down command, shut down".format(brokerId, id)) return } + req.dequeueTimeNs = SystemTime.nanoseconds debug("handles request " + req) apis.handle(req) } @@ -63,62 +65,24 @@ thread.join info("shutted down completely") } - } -trait BrokerTopicStatMBean { - def getMessagesIn: Long - def getBytesIn: Long - def getBytesOut: Long - def getFailedProduceRequest: Long - def getFailedFetchRequest: Long +class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup { + val messagesInRate = newMeter(name + "MessagesInPerSec", "messages", TimeUnit.SECONDS) + val bytesInRate = newMeter(name + "BytesInPerSec", "bytes", TimeUnit.SECONDS) + val bytesOutRate = newMeter(name + "BytesOutPerSec", "bytes", TimeUnit.SECONDS) + val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS) + val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS) } -@threadsafe -class BrokerTopicStat extends BrokerTopicStatMBean { - private val numCumulatedMessagesIn = new AtomicLong(0) - private val numCumulatedBytesIn = new AtomicLong(0) - private val numCumulatedBytesOut = new AtomicLong(0) - private val numCumulatedFailedProduceRequests = new AtomicLong(0) - private val numCumulatedFailedFetchRequests = new AtomicLong(0) - - def getMessagesIn: Long = numCumulatedMessagesIn.get - - def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages) - - def getBytesIn: Long = numCumulatedBytesIn.get - - def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes) - - def getBytesOut: Long = numCumulatedBytesOut.get - - def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes) - - def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement - - def getFailedProduceRequest = numCumulatedFailedProduceRequests.get() - - def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement - - def getFailedFetchRequest = numCumulatedFailedFetchRequests.get() -} - object BrokerTopicStat extends Logging { - private val stats = new Pool[String, BrokerTopicStat] - private val allTopicStat = new BrokerTopicStat - Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat") + private val valueFactory = (k: String) => new BrokerTopicMetrics(k) + private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) + private val allTopicStat = new BrokerTopicMetrics("AllTopics") - def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat + def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat - def getBrokerTopicStat(topic: String): BrokerTopicStat = { - var stat = stats.get(topic) - if (stat == null) { - stat = new BrokerTopicStat - if (stats.putIfNotExists(topic, stat) == null) - Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic) - else - stat = stats.get(topic) - } - return stat + def getBrokerTopicStat(topic: String): BrokerTopicMetrics = { + stats.getAndMaybePut(topic + "-") } } Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1383739) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -40,8 +40,6 @@ val replicaManager: ReplicaManager, val zkClient: ZkClient, brokerId: Int) extends Logging { - - private val metricsGroup = brokerId.toString private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId) private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel) private val delayedRequestMetrics = new DelayedRequestMetrics @@ -54,20 +52,20 @@ * Top-level method that handles all requests and multiplexes to the right api */ def handle(request: RequestChannel.Request) { - val apiId = request.request.buffer.getShort() - apiId match { - case RequestKeys.Produce => handleProducerRequest(request) - case RequestKeys.Fetch => handleFetchRequest(request) - case RequestKeys.Offsets => handleOffsetRequest(request) - case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request) - case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request) - case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request) - case _ => throw new KafkaException("No mapping found for handler id " + apiId) + request.requestId match { + case RequestKeys.ProduceKey => handleProducerRequest(request) + case RequestKeys.FetchKey => handleFetchRequest(request) + case RequestKeys.OffsetsKey => handleOffsetRequest(request) + case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) + case RequestKeys.LeaderAndIsrKey => handleLeaderAndISRRequest(request) + case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) + case requestId => throw new KafkaException("No mapping found for handler id " + requestId) } + request.apiLocalCompleteTimeNs = SystemTime.nanoseconds } def handleLeaderAndISRRequest(request: RequestChannel.Request){ - val leaderAndISRRequest = LeaderAndIsrRequest.readFrom(request.request.buffer) + val leaderAndISRRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest) trace("Handling leader and isr request " + leaderAndISRRequest) @@ -79,7 +77,7 @@ def handleStopReplicaRequest(request: RequestChannel.Request){ - val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer) + val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling stop replica request " + stopReplicaRequest) trace("Handling stop replica request " + stopReplicaRequest) @@ -107,11 +105,6 @@ for(fetchReq <- satisfied) { val topicData = readMessageSets(fetchReq.fetch) val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) - - val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId - delayedRequestMetrics.recordDelayedFetchSatisfied( - fromFollower, SystemTime.nanoseconds - fetchReq.creationTimeNs, response) - requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) } } @@ -120,7 +113,7 @@ * Handle a produce request */ def handleProducerRequest(request: RequestChannel.Request) { - val produceRequest = ProducerRequest.readFrom(request.request.buffer) + val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] val sTime = SystemTime.milliseconds if(requestLogger.isTraceEnabled) requestLogger.trace("Handling producer request " + request.toString) @@ -179,8 +172,8 @@ for(topicData <- request.data) { for(partitionData <- topicData.partitionDataArray) { msgIndex += 1 - BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes) + BrokerTopicStat.getBrokerTopicStat(topicData.topic).bytesInRate.mark(partitionData.messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes) try { val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition) val log = localReplica.log.get @@ -193,8 +186,8 @@ .format(partitionData.messages.sizeInBytes, offsets(msgIndex))) } catch { case e => - BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest - BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest + BrokerTopicStat.getBrokerTopicStat(topicData.topic).failedProduceRequestRate.mark() + BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark() error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e) e match { case _: IOException => @@ -214,7 +207,7 @@ * Handle a fetch request */ def handleFetchRequest(request: RequestChannel.Request) { - val fetchRequest = FetchRequest.readFrom(request.request.buffer) + val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling fetch request " + fetchRequest.toString) trace("Handling fetch request " + fetchRequest.toString) @@ -229,7 +222,7 @@ requestChannel.sendResponse(channelResponse) } - if(fetchRequest.replicaId != FetchRequest.NonFollowerId) { + if(fetchRequest.isFromFollower) { maybeUpdatePartitionHW(fetchRequest) // after updating HW, some delayed produce requests may be unblocked var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] @@ -272,7 +265,7 @@ debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) try { val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i)) - val end = if(fetchRequest.replicaId == FetchRequest.NonFollowerId) { + val end = if (!fetchRequest.isFromFollower) { leader.highWatermark } else { leader.logEndOffset @@ -317,12 +310,12 @@ val topic = offsetDetail.topic val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { - val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId + val isFetchFromFollower = fetchRequest.isFromFollower() val partitionInfo = try { val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) - BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) + BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes) if (!isFetchFromFollower) { new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) } else { @@ -335,8 +328,8 @@ } catch { case e => - BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest - BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest + BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark() + BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark() error("error when processing request " + (topic, partition, offset, fetchSize), e) new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), offset, -1L, MessageSet.Empty) @@ -375,7 +368,7 @@ * Service the offset request API */ def handleOffsetRequest(request: RequestChannel.Request) { - val offsetRequest = OffsetRequest.readFrom(request.request.buffer) + val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling offset request " + offsetRequest.toString) trace("Handling offset request " + offsetRequest.toString) @@ -402,7 +395,7 @@ * Service the topic metadata request API */ def handleTopicMetadataRequest(request: RequestChannel.Request) { - val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer) + val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling topic metadata request " + metadataRequest.toString()) trace("Handling topic metadata request " + metadataRequest.toString()) @@ -456,7 +449,7 @@ def keyLabel: String } private [kafka] object MetricKey { - val globalLabel = "all" + val globalLabel = "All" } private [kafka] case class RequestKey(topic: String, partition: Int) @@ -476,7 +469,6 @@ this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId) - /** * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field */ @@ -489,8 +481,8 @@ def expire(delayed: DelayedFetch) { val topicData = readMessageSets(delayed.fetch) val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) - val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId - delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response) + val fromFollower = delayed.fetch.isFromFollower + delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) } } @@ -560,7 +552,6 @@ val partitionId = followerFetchRequestKey.partition val key = RequestKey(topic, partitionId) val fetchPartitionStatus = partitionStatus(key) - val durationNs = SystemTime.nanoseconds - creationTimeNs trace("Checking producer request satisfaction for %s-%d, acksPending = %b" .format(topic, partitionId, fetchPartitionStatus.acksPending)) if (fetchPartitionStatus.acksPending) { @@ -576,17 +567,12 @@ if (!fetchPartitionStatus.acksPending) { val topicData = produce.data.find(_.topic == topic).get val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get - delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key, - durationNs, - partitionData.sizeInBytes) maybeUnblockDelayedFetchRequests(topic, Array(partitionData)) } } // unblocked if there are no partitions with pending acks val satisfied = ! partitionStatus.exists(p => p._2.acksPending) - if (satisfied) - delayedRequestMetrics.recordDelayedProduceSatisfied(durationNs) satisfied } @@ -629,53 +615,18 @@ private class DelayedRequestMetrics { private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { - val caughtUpFollowerFetchRequestMeter = - newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS) - val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel) - Some(newHistogram("FollowerCatchUpTimeInNs", biased = true)) - else None - - /* - * Note that throughput is updated on individual key satisfaction. - * Therefore, it is an upper bound on throughput since the - * DelayedProducerRequest may get expired. - */ - val throughputMeter = newMeter("Throughput-" + keyLabel, "bytes", TimeUnit.SECONDS) - val expiredRequestMeter = newMeter("ExpiredRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS) - - val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel) - Some(newMeter("SatisfiedRequestsPerSecond", "requests", TimeUnit.SECONDS)) - else None - val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel) - Some(newHistogram("SatisfactionTimeInNs", biased = true)) - else None + val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) } - private class DelayedFetchRequestMetrics(forFollower: Boolean, - keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { - private val metricPrefix = if (forFollower) "Follower" else "NonFollower" + private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { + private val metricPrefix = if (forFollower) "Follower" else "Consumer" - val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel) - Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond", - "requests", TimeUnit.SECONDS)) - else None - - val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel) - Some(newHistogram(metricPrefix + "-SatisfactionTimeInNs", biased = true)) - else None - - val expiredRequestMeter = if (keyLabel == MetricKey.globalLabel) - Some(newMeter(metricPrefix + "-ExpiredRequestsPerSecond", - "requests", TimeUnit.SECONDS)) - else None - - val throughputMeter = newMeter("%s-Throughput-%s".format(metricPrefix, keyLabel), - "bytes", TimeUnit.SECONDS) + val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) } private val producerRequestMetricsForKey = { - val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel) + val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory)) } @@ -684,75 +635,17 @@ private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) - private val followerFetchRequestMetricsForKey = { - val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = true, k.keyLabel) - new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory)) - } - - private val nonFollowerFetchRequestMetricsForKey = { - val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = false, k.keyLabel) - new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory)) - } - def recordDelayedProducerKeyExpired(key: MetricKey) { val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) } - - def recordDelayedProducerKeyCaughtUp(key: MetricKey, timeToCatchUpNs: Long, bytes: Int) { - val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) - List(keyMetrics, aggregateProduceRequestMetrics).foreach(m => { - m.caughtUpFollowerFetchRequestMeter.mark() - m.followerCatchUpTimeHistogram.foreach(_.update(timeToCatchUpNs)) - m.throughputMeter.mark(bytes) - }) - } - - - def recordDelayedProduceSatisfied(timeToSatisfyNs: Long) { - aggregateProduceRequestMetrics.satisfiedRequestMeter.foreach(_.mark()) - aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs)) - } - - private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) { + def recordDelayedFetchExpired(forFollower: Boolean) { val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics else aggregateNonFollowerFetchRequestMetrics - metrics.throughputMeter.mark(response.sizeInBytes) - - response.topicMap.foreach(topicAndData => { - val topic = topicAndData._1 - topicAndData._2.partitionDataArray.foreach(partitionData => { - val key = RequestKey(topic, partitionData.partition) - val keyMetrics = if (forFollower) - followerFetchRequestMetricsForKey.getAndMaybePut(key) - else - nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key) - keyMetrics.throughputMeter.mark(partitionData.sizeInBytes) - }) - }) - } - - - def recordDelayedFetchExpired(forFollower: Boolean, response: FetchResponse) { - val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - metrics.expiredRequestMeter.foreach(_.mark()) - - recordDelayedFetchThroughput(forFollower, response) + metrics.expiredRequestMeter.mark() } - - - def recordDelayedFetchSatisfied(forFollower: Boolean, timeToSatisfyNs: Long, response: FetchResponse) { - val aggregateMetrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - - aggregateMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs)) - aggregateMetrics.satisfiedRequestMeter.foreach(_.mark()) - - recordDelayedFetchThroughput(forFollower, response) - } } } Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala (revision 1383739) +++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala (working copy) @@ -23,7 +23,11 @@ import collection.mutable import kafka.message.ByteBufferMessageSet import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder} -import kafka.utils.ShutdownableThread +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge +import java.util.concurrent.atomic.AtomicLong +import kafka.utils.{Pool, ShutdownableThread} +import java.util.concurrent.TimeUnit /** @@ -35,6 +39,8 @@ private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map private val fetchMapLock = new Object val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) + val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id) + // callbacks to be defined in subclass // process fetched data @@ -79,6 +85,7 @@ } } } + fetcherMetrics.requestRate.mark() if (response != null) { // process fetched data @@ -93,8 +100,11 @@ partitionData.error match { case ErrorMapping.NoError => processPartitionData(topic, currentOffset.get, partitionData) - val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes + val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes + val newOffset = currentOffset.get + validBytes fetchMap.put(key, newOffset) + FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset + fetcherMetrics.byteRate.mark(validBytes) case ErrorMapping.OffsetOutOfRangeCode => val newOffset = handleOffsetOutOfRange(topic, partitionId) fetchMap.put(key, newOffset) @@ -140,4 +150,43 @@ fetchMap.size } } -} \ No newline at end of file +} + +class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup { + private[this] var lagVal = new AtomicLong(-1L) + newGauge( + name._1 + "-" + name._2 + "-ConsumerLag", + new Gauge[Long] { + def value() = lagVal.get + } + ) + + def lag_=(newLag: Long) { + lagVal.set(newLag) + } + + def lag = lagVal.get +} + +object FetcherLagMetrics { + private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k) + private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory)) + + def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = { + stats.getAndMaybePut( (topic, partitionId) ) + } +} + +class FetcherStat(name: String) extends KafkaMetricsGroup { + val requestRate = newMeter(name + "RequestsPerSec", "requests", TimeUnit.SECONDS) + val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS) +} + +object FetcherStat { + private val valueFactory = (k: String) => new FetcherStat(k) + private val stats = new Pool[String, FetcherStat](Some(valueFactory)) + + def getFetcherStat(name: String): FetcherStat = { + stats.getAndMaybePut(name) + } +} Index: core/src/main/scala/kafka/api/StopReplicaRequest.scala =================================================================== --- core/src/main/scala/kafka/api/StopReplicaRequest.scala (revision 1383739) +++ core/src/main/scala/kafka/api/StopReplicaRequest.scala (working copy) @@ -45,7 +45,7 @@ clientId: String, ackTimeoutMs: Int, stopReplicaSet: Set[(String, Int)]) - extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) { + extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { def this(stopReplicaSet: Set[(String, Int)]) = { this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet) } Index: core/src/main/scala/kafka/api/OffsetRequest.scala =================================================================== --- core/src/main/scala/kafka/api/OffsetRequest.scala (revision 1383739) +++ core/src/main/scala/kafka/api/OffsetRequest.scala (working copy) @@ -45,7 +45,7 @@ topic: String, partition: Int, time: Long, - maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.Offsets)) { + maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) = this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets) Index: core/src/main/scala/kafka/api/FetchRequest.scala =================================================================== --- core/src/main/scala/kafka/api/FetchRequest.scala (revision 1383739) +++ core/src/main/scala/kafka/api/FetchRequest.scala (working copy) @@ -105,7 +105,7 @@ replicaId: Int = FetchRequest.DefaultReplicaId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, - offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) { + offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.FetchKey)) { // ensure that a topic "X" appears in at most one OffsetDetail def validate() { @@ -144,6 +144,8 @@ def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes()) def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size) + + def isFromFollower(): Boolean = replicaId != FetchRequest.NonFollowerId } Index: core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala =================================================================== --- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (revision 1383739) +++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (working copy) @@ -94,7 +94,7 @@ isInit: Boolean, ackTimeoutMs: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) - extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) { + extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = { this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos) } Index: core/src/main/scala/kafka/api/RequestKeys.scala =================================================================== --- core/src/main/scala/kafka/api/RequestKeys.scala (revision 1383739) +++ core/src/main/scala/kafka/api/RequestKeys.scala (working copy) @@ -17,11 +17,36 @@ package kafka.api +import kafka.common.KafkaException +import java.nio.ByteBuffer + object RequestKeys { - val Produce: Short = 0 - val Fetch: Short = 1 - val Offsets: Short = 2 - val TopicMetadata: Short = 3 - val LeaderAndISRRequest: Short = 4 - val StopReplicaRequest: Short = 5 + val ProduceKey: Short = 0 + val FetchKey: Short = 1 + val OffsetsKey: Short = 2 + val MetadataKey: Short = 3 + val LeaderAndIsrKey: Short = 4 + val StopReplicaKey: Short = 5 + + val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= + Map( ProduceKey -> ("Produce", ProducerRequest.readFrom), + FetchKey -> ("Fetch", FetchRequest.readFrom), + OffsetsKey -> ("Offsets", OffsetRequest.readFrom), + MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), + LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), + StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom) ) + + def nameForKey(key: Short): String = { + keyToNameAndDeserializerMap.get(key) match { + case Some(nameAndSerializer) => nameAndSerializer._1 + case None => throw new KafkaException("Wrong request type %d".format(key)) + } + } + + def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = { + keyToNameAndDeserializerMap.get(key) match { + case Some(nameAndSerializer) => nameAndSerializer._2 + case None => throw new KafkaException("Wrong request type %d".format(key)) + } + } } Index: core/src/main/scala/kafka/api/TopicMetadataRequest.scala =================================================================== --- core/src/main/scala/kafka/api/TopicMetadataRequest.scala (revision 1383739) +++ core/src/main/scala/kafka/api/TopicMetadataRequest.scala (working copy) @@ -78,7 +78,7 @@ val topics: Seq[String], val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata, val timestamp: Option[Long] = None, val count: Option[Int] = None) - extends RequestOrResponse(Some(RequestKeys.TopicMetadata)){ + extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ def this(topics: Seq[String]) = this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None) Index: core/src/main/scala/kafka/api/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1383739) +++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) @@ -24,7 +24,7 @@ object ProducerRequest { val CurrentVersion: Short = 0 - + def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort val correlationId: Int = buffer.getInt @@ -58,7 +58,7 @@ clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) { + data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1383739) +++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy) @@ -24,7 +24,7 @@ val clientId: String, val requiredAcks: Short, val ackTimeoutMs: Int, - val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) { + val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)