Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1378595) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -33,9 +33,7 @@ import kafka.consumer.ConsumerConfig import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit -import kafka.common.ErrorMapping import kafka.api._ -import collection.mutable.{Map, Set} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (revision 1378595) +++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (working copy) @@ -29,9 +29,8 @@ import kafka.utils.TestUtils._ import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig} import kafka.common.ErrorMapping -import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest} +import kafka.api.{RequestKeys, TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest} - class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) @@ -107,6 +106,7 @@ val topicMetadataRequest = new TopicMetadataRequest(List(topic)) val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2) + serializedMetadataRequest.putShort(RequestKeys.MetadataKey) topicMetadataRequest.writeTo(serializedMetadataRequest) serializedMetadataRequest.rewind() @@ -114,21 +114,13 @@ val requestChannel = new RequestChannel(2, 5) val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1) - // mock the receive API to return the request buffer as created above - val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive]) - EasyMock.expect(receivedRequest.buffer).andReturn(serializedMetadataRequest) - EasyMock.replay(receivedRequest) - // call the API (to be tested) to get metadata - apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) - val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[BoundedByteBufferSend].buffer + apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeNs=1)) + val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer // check assertions val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata - // verify the expected calls to log manager occurred in the right order - EasyMock.verify(receivedRequest) - topicMetadata } } \ No newline at end of file Index: core/src/test/scala/unit/kafka/network/SocketServerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala (revision 1378595) +++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala (working copy) @@ -24,6 +24,9 @@ import kafka.utils.TestUtils import java.util.Random import junit.framework.Assert._ +import kafka.producer.SyncProducerConfig +import kafka.api.{TopicData, ProducerRequest} +import java.nio.ByteBuffer class SocketServerTest extends JUnitSuite { @@ -54,9 +57,9 @@ /* A simple request handler that just echos back the response */ def processRequest(channel: RequestChannel) { val request = channel.receiveRequest - val id = request.request.buffer.getShort - val send = new BoundedByteBufferSend(request.request.buffer.slice) - channel.sendResponse(new RequestChannel.Response(request.processor, request.requestKey, send, request.start, 15)) + val id = request.buffer.getShort + val send = new BoundedByteBufferSend(request.buffer.slice) + channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } def connect() = new Socket("localhost", server.port) @@ -69,10 +72,21 @@ @Test def simpleRequest() { val socket = connect() - sendRequest(socket, 0, "hello".getBytes) + val correlationId = SyncProducerConfig.DefaultCorrelationId + val clientId = SyncProducerConfig.DefaultClientId + val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + val ack = SyncProducerConfig.DefaultRequiredAcks + val emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]()) + + val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes()) + emptyRequest.writeTo(byteBuffer) + byteBuffer.rewind() + val serializedBytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(serializedBytes) + + sendRequest(socket, 0, serializedBytes) processRequest(server.requestChannel) - val response = new String(receiveResponse(socket)) - assertEquals("hello", response) + assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq) } @Test(expected=classOf[IOException]) Index: core/src/main/scala/kafka/cluster/Replica.scala =================================================================== --- core/src/main/scala/kafka/cluster/Replica.scala (revision 1378595) +++ core/src/main/scala/kafka/cluster/Replica.scala (working copy) @@ -36,7 +36,7 @@ val topic = partition.topic val partitionId = partition.partitionId - def logEndOffset_=(newLogEndOffset: Long) = { + def logEndOffset_=(newLogEndOffset: Long) { if (!isLocal) { logEndOffsetValue.set(newLogEndOffset) logEndOffsetUpdateTimeMsValue.set(time.milliseconds) Index: core/src/main/scala/kafka/cluster/Partition.scala =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (revision 1378595) +++ core/src/main/scala/kafka/cluster/Partition.scala (working copy) @@ -45,6 +45,11 @@ private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) + def isUnderReplicated(): Boolean = { + // TODO: need to pass in replication factor from controller + inSyncReplicas.size < replicaManager.config.defaultReplicationFactor + } + def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = { val replicaOpt = getReplica(replicaId) replicaOpt match { @@ -178,6 +183,7 @@ info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache updateISR(newInSyncReplicas) + replicaManager.ISRExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica) case None => // nothing to do if no longer leader @@ -236,6 +242,7 @@ updateISR(newInSyncReplicas) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) + replicaManager.ISRShrinkRate.mark() } case None => // do nothing if no longer leader } Index: core/src/main/scala/kafka/log/LogStats.scala =================================================================== --- core/src/main/scala/kafka/log/LogStats.scala (revision 1378595) +++ core/src/main/scala/kafka/log/LogStats.scala (working copy) @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.util.concurrent.atomic.AtomicLong - -trait LogStatsMBean { - def getName(): String - def getSize(): Long - def getNumberOfSegments: Int - def getCurrentOffset: Long - def getNumAppendedMessages: Long -} - -class LogStats(val log: Log) extends LogStatsMBean { - private val numCumulatedMessages = new AtomicLong(0) - - def getName(): String = log.name - - def getSize(): Long = log.size - - def getNumberOfSegments: Int = log.numberOfSegments - - def getCurrentOffset: Long = log.logEndOffset - - def getNumAppendedMessages: Long = numCumulatedMessages.get - - def recordAppendedMessages(nMessages: Int) = numCumulatedMessages.getAndAdd(nMessages) -} Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1378595) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -26,6 +26,8 @@ import kafka.server.BrokerTopicStat import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet} import kafka.common.{KafkaException, InvalidMessageSizeException, OffsetOutOfRangeException} +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge object Log { val FileSuffix = ".kafka" @@ -130,7 +132,7 @@ @threadsafe private[kafka] class Log( val dir: File, val maxSize: Long, val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean, - time: Time, brokerId: Int = 0) extends Logging { + time: Time, brokerId: Int = 0) extends Logging with KafkaMetricsGroup { this.logIdent = "[Kafka Log on Broker " + brokerId + "], " import kafka.log.Log._ @@ -147,9 +149,19 @@ /* The actual segments of the log */ private[log] val segments: SegmentList[LogSegment] = loadSegments() - private val logStats = new LogStats(this) + newGauge( + name + "-" + "LogSegments", + new Gauge[Int] { + def value() = numberOfSegments + } + ) - Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName) + newGauge( + name + "-" + "LogEndOffset", + new Gauge[Long] { + def value() = logEndOffset + } + ) /* The name of this log */ def name = dir.getName() @@ -242,9 +254,8 @@ numberOfMessages += 1; } - BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages) - BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages) - logStats.recordAppendedMessages(numberOfMessages) + BrokerTopicStat.getBrokerTopicStat(topicName).messageInRate.mark(numberOfMessages) + BrokerTopicStat.getBrokerAllTopicStat.messageInRate.mark(numberOfMessages) // truncate the message set's buffer upto validbytes, before appending it to the on-disk log val validByteBuffer = messages.getBuffer.duplicate() Index: core/src/main/scala/kafka/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1378595) +++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) @@ -23,6 +23,8 @@ import kafka.utils._ import java.util.Random import kafka.common.{ErrorMapping, MessageSizeTooLargeException} +import kafka.metrics.KafkaMetricsGroup +import java.util.concurrent.TimeUnit object SyncProducer { val RequestKey: Short = 0 @@ -57,7 +59,7 @@ val buffer = new BoundedByteBufferSend(request).buffer trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() - if(requestTypeId == RequestKeys.Produce) { + if(requestTypeId == RequestKeys.ProduceKey) { val request = ProducerRequest.readFrom(buffer) trace(request.toString) } @@ -92,7 +94,6 @@ sentOnConnection = 0 lastConnectionTime = System.currentTimeMillis } - SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime) response } } @@ -101,6 +102,7 @@ * Send a message */ def send(producerRequest: ProducerRequest): ProducerResponse = { + ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes) for( topicData <- producerRequest.data ) { for( partitionData <- topicData.partitionDataArray ) { verifyMessageSize(partitionData.messages) @@ -108,7 +110,13 @@ trace("Got message set with " + setSize + " bytes to send") } } - val response = doSend(producerRequest) + val context = ProducerRequestStat.requestTimer.time() + var response: Receive = null + try { + response = doSend(producerRequest) + } finally { + context.stop() + } ProducerResponse.readFrom(response.buffer) } @@ -184,32 +192,7 @@ } } -trait SyncProducerStatsMBean { - def getProduceRequestsPerSecond: Double - def getAvgProduceRequestMs: Double - def getMaxProduceRequestMs: Double - def getNumProduceRequests: Long -} - -@threadsafe -class SyncProducerStats extends SyncProducerStatsMBean { - private val produceRequestStats = new SnapshotStats - - def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs) - - def getProduceRequestsPerSecond: Double = produceRequestStats.getRequestsPerSecond - - def getAvgProduceRequestMs: Double = produceRequestStats.getAvgMetric / (1000.0 * 1000.0) - - def getMaxProduceRequestMs: Double = produceRequestStats.getMaxMetric / (1000.0 * 1000.0) - - def getNumProduceRequests: Long = produceRequestStats.getNumRequests -} - -object SyncProducerStats extends Logging { - private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats" - private val stats = new SyncProducerStats - swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName)) - - def recordProduceRequest(requestMs: Long) = stats.recordProduceRequest(requestMs) -} +object ProducerRequestStat extends KafkaMetricsGroup { + val requestTimer = newTimer("ProduceRequestTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS) + val requestSizeHist = newHistogram("ProducerRequestSize") +} \ No newline at end of file Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1378595) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -20,8 +20,9 @@ import kafka.utils._ import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import kafka.serializer.Encoder -import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} +import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{QueueFullException, InvalidConfigException} +import kafka.metrics.KafkaMetricsGroup class Producer[K,V](config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // for testing only @@ -68,8 +69,10 @@ } private def recordStats(producerData: ProducerData[K,V]*) { - for (data <- producerData) - ProducerTopicStat.getProducerTopicStat(data.getTopic).recordMessagesPerTopic(data.getData.size) + for (data <- producerData) { + ProducerTopicStat.getProducerTopicStat(data.getTopic).messageRate.mark(data.getData.size) + ProducerTopicStat.getProducerAllTopicStat.messageRate.mark(data.getData.size) + } } private def asyncSend(producerData: ProducerData[K,V]*) { @@ -93,7 +96,7 @@ } } if(!added) { - AsyncProducerStats.recordDroppedEvents + AsyncProducerStats.droppedEventRate.mark() error("Event queue is full of unsent messages, could not send event: " + data.toString) throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString) }else { @@ -118,31 +121,20 @@ } } -trait ProducerTopicStatMBean { - def getMessagesPerTopic: Long -} - @threadsafe -class ProducerTopicStat extends ProducerTopicStatMBean { - private val numCumulatedMessagesPerTopic = new AtomicLong(0) - - def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get - - def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) +class ProducerTopicStat(name: String) extends KafkaMetricsGroup { + val messageRate = newMeter(name + "-MessagesPerSec", "messages", TimeUnit.SECONDS) + val byteRate = newMeter(name + "-BytesPerSec", "bytes", TimeUnit.SECONDS) } object ProducerTopicStat extends Logging { - private val stats = new Pool[String, ProducerTopicStat] + private val valueFactory = (k: String) => new ProducerTopicStat(k) + private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory)) + private val allTopicStat = new ProducerTopicStat("Total") + def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat + def getProducerTopicStat(topic: String): ProducerTopicStat = { - var stat = stats.get(topic) - if (stat == null) { - stat = new ProducerTopicStat - if (stats.putIfNotExists(topic, stat) == null) - Utils.registerMBean(stat, "kafka.producer.Producer:type=kafka.ProducerTopicStat." + topic) - else - stat = stats.get(topic) - } - return stat + stats.getAndMaybePut(topic) } } Index: core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (revision 1378595) +++ core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (working copy) @@ -17,22 +17,16 @@ package kafka.producer.async -import java.util.concurrent.atomic.AtomicInteger -import kafka.utils.Utils +import kafka.metrics.KafkaMetricsGroup +import java.util.concurrent.TimeUnit -class AsyncProducerStats extends AsyncProducerStatsMBean { - val droppedEvents = new AtomicInteger(0) +object AsyncProducerStats extends KafkaMetricsGroup { + private val DroppedEventBeanName = "DroppedEvent" + val droppedEventRate = newMeter(DroppedEventBeanName, "drops", TimeUnit.SECONDS) + + private val ResentEventBeanName = "ResentEvent" + val resentEventRate = newMeter(ResentEventBeanName, "resents", TimeUnit.SECONDS) - def getAsyncProducerDroppedEvents: Int = droppedEvents.get - - def recordDroppedEvents = droppedEvents.getAndAdd(1) + private val FailedSendBeanName = "FailedSend" + val failedSendtRate = newMeter(FailedSendBeanName, "failed sends", TimeUnit.SECONDS) } - -object AsyncProducerStats { - private val stats = new AsyncProducerStats - val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats" - - Utils.registerMBean(stats, ProducerMBeanName) - - def recordDroppedEvents = stats.recordDroppedEvents -} Index: core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (revision 1378595) +++ core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (working copy) @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.producer.async - -trait AsyncProducerStatsMBean { - def getAsyncProducerDroppedEvents: Int -} - -trait AsyncProducerQueueSizeStatsMBean { - def getAsyncProducerQueueSize: Int -} Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1378595) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -24,7 +24,7 @@ import kafka.utils.{Utils, Logging} import scala.collection.Map import scala.collection.mutable.{ListBuffer, HashMap} -import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData} +import kafka.api._ class DefaultEventHandler[K,V](config: ProducerConfig, @@ -41,6 +41,11 @@ def handle(events: Seq[ProducerData[K,V]]) { lock synchronized { val serializedData = serialize(events) + serializedData.foreach{ + pd => val dataSize = pd.data.foldLeft(0)(_ + _.payloadSize) + ProducerTopicStat.getProducerTopicStat(pd.topic).byteRate.mark(dataSize) + ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize) + } var outstandingProduceRequests = serializedData var remainingRetries = config.producerRetries + 1 while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { @@ -51,9 +56,11 @@ // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic))) remainingRetries -= 1 + AsyncProducerStats.resentEventRate.mark() } } if(outstandingProduceRequests.size > 0) { + AsyncProducerStats.failedSendtRate.mark() error("Failed to send the following requests: " + outstandingProduceRequests) throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null) } Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1378595) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -21,16 +21,25 @@ import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue} import collection.mutable.ListBuffer import kafka.producer.ProducerData +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge class ProducerSendThread[K,V](val threadName: String, val queue: BlockingQueue[ProducerData[K,V]], val handler: EventHandler[K,V], val queueTime: Long, - val batchSize: Int) extends Thread(threadName) with Logging { + val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup { private val shutdownLatch = new CountDownLatch(1) private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]]) + newGauge( + "ProducerQueueSize-" + getId, + new Gauge[Int] { + def value() = queue.size + } + ) + override def run { try { Index: core/src/main/scala/kafka/message/FileMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/FileMessageSet.scala (revision 1378595) +++ core/src/main/scala/kafka/message/FileMessageSet.scala (working copy) @@ -24,6 +24,8 @@ import kafka.utils._ import kafka.common.KafkaException +import kafka.metrics.KafkaMetricsGroup +import java.util.concurrent.TimeUnit /** * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts @@ -159,10 +161,12 @@ */ def flush() = { checkMutable() - val startTime = SystemTime.milliseconds - channel.force(true) - val elapsedTime = SystemTime.milliseconds - startTime - LogFlushStats.recordFlushRequest(elapsedTime) + val context = LogFlushStats.logFlushTimer.time() + try { + channel.force(true) + } finally { + context.stop() + } } /** @@ -240,38 +244,8 @@ else next } - } -trait LogFlushStatsMBean { - def getFlushesPerSecond: Double - def getAvgFlushMs: Double - def getTotalFlushMs: Long - def getMaxFlushMs: Double - def getNumFlushes: Long +object LogFlushStats extends KafkaMetricsGroup { + val logFlushTimer = newTimer("LogFlush", TimeUnit.MILLISECONDS, TimeUnit.SECONDS) } - -@threadsafe -class LogFlushStats extends LogFlushStatsMBean { - private val flushRequestStats = new SnapshotStats - - def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs) - - def getFlushesPerSecond: Double = flushRequestStats.getRequestsPerSecond - - def getAvgFlushMs: Double = flushRequestStats.getAvgMetric - - def getTotalFlushMs: Long = flushRequestStats.getTotalMetric - - def getMaxFlushMs: Double = flushRequestStats.getMaxMetric - - def getNumFlushes: Long = flushRequestStats.getNumRequests -} - -object LogFlushStats extends Logging { - private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats" - private val stats = new LogFlushStats - Utils.registerMBean(stats, LogFlushStatsMBeanName) - - def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs) -} Index: core/src/main/scala/kafka/network/SocketServerStats.scala =================================================================== --- core/src/main/scala/kafka/network/SocketServerStats.scala (revision 1378595) +++ core/src/main/scala/kafka/network/SocketServerStats.scala (working copy) @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import kafka.utils._ -import kafka.api.RequestKeys - -trait SocketServerStatsMBean { - def getProduceRequestsPerSecond: Double - def getFetchRequestsPerSecond: Double - def getAvgProduceRequestMs: Double - def getMaxProduceRequestMs: Double - def getAvgFetchRequestMs: Double - def getMaxFetchRequestMs: Double - def getBytesReadPerSecond: Double - def getBytesWrittenPerSecond: Double - def getNumFetchRequests: Long - def getNumProduceRequests: Long - def getTotalBytesRead: Long - def getTotalBytesWritten: Long - def getTotalFetchRequestMs: Long - def getTotalProduceRequestMs: Long -} - -@threadsafe -class SocketServerStats(val monitorDurationNs: Long, val time: Time) extends SocketServerStatsMBean { - - def this(monitorDurationNs: Long) = this(monitorDurationNs, SystemTime) - val produceTimeStats = new SnapshotStats(monitorDurationNs) - val fetchTimeStats = new SnapshotStats(monitorDurationNs) - val produceBytesStats = new SnapshotStats(monitorDurationNs) - val fetchBytesStats = new SnapshotStats(monitorDurationNs) - - def recordRequest(requestTypeId: Short, durationNs: Long) { - requestTypeId match { - case r if r == RequestKeys.Produce => - produceTimeStats.recordRequestMetric(durationNs) - case r if r == RequestKeys.Fetch => - fetchTimeStats.recordRequestMetric(durationNs) - case _ => /* not collecting; let go */ - } - } - - def recordBytesWritten(bytes: Int): Unit = fetchBytesStats.recordRequestMetric(bytes) - - def recordBytesRead(bytes: Int): Unit = produceBytesStats.recordRequestMetric(bytes) - - def getProduceRequestsPerSecond: Double = produceTimeStats.getRequestsPerSecond - - def getFetchRequestsPerSecond: Double = fetchTimeStats.getRequestsPerSecond - - def getAvgProduceRequestMs: Double = produceTimeStats.getAvgMetric / (1000.0 * 1000.0) - - def getMaxProduceRequestMs: Double = produceTimeStats.getMaxMetric / (1000.0 * 1000.0) - - def getAvgFetchRequestMs: Double = fetchTimeStats.getAvgMetric / (1000.0 * 1000.0) - - def getMaxFetchRequestMs: Double = fetchTimeStats.getMaxMetric / (1000.0 * 1000.0) - - def getBytesReadPerSecond: Double = produceBytesStats.getAvgMetric - - def getBytesWrittenPerSecond: Double = fetchBytesStats.getAvgMetric - - def getNumFetchRequests: Long = fetchTimeStats.getNumRequests - - def getNumProduceRequests: Long = produceTimeStats.getNumRequests - - def getTotalBytesRead: Long = produceBytesStats.getTotalMetric - - def getTotalBytesWritten: Long = fetchBytesStats.getTotalMetric - - def getTotalFetchRequestMs: Long = fetchTimeStats.getTotalMetric - - def getTotalProduceRequestMs: Long = produceTimeStats.getTotalMetric -} Index: core/src/main/scala/kafka/network/SocketServer.scala =================================================================== --- core/src/main/scala/kafka/network/SocketServer.scala (revision 1378595) +++ core/src/main/scala/kafka/network/SocketServer.scala (working copy) @@ -41,7 +41,6 @@ private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) private var acceptor: Acceptor = new Acceptor(port, processors) - val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs) val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) /** @@ -49,7 +48,7 @@ */ def startup() { for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(i, time, maxRequestSize, requestChannel, stats) + processors(i) = new Processor(i, time, maxRequestSize, requestChannel) Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start() } // register the processor threads for notification of responses @@ -187,8 +186,7 @@ private[kafka] class Processor(val id: Int, val time: Time, val maxRequestSize: Int, - val requestChannel: RequestChannel, - val stats: SocketServerStats) extends AbstractServerThread { + val requestChannel: RequestChannel) extends AbstractServerThread { private val newConnections = new ConcurrentLinkedQueue[SocketChannel](); @@ -240,10 +238,10 @@ var curr = requestChannel.receiveResponse(id) while(curr != null) { trace("Socket server received response to send, registering for write: " + curr) - val key = curr.requestKey.asInstanceOf[SelectionKey] + val key = curr.request.requestKey.asInstanceOf[SelectionKey] try { key.interestOps(SelectionKey.OP_WRITE) - key.attach(curr.response) + key.attach(curr) } catch { case e: CancelledKeyException => { debug("Ignoring response for closed socket.") @@ -288,18 +286,17 @@ */ def read(key: SelectionKey) { val socketChannel = channelFor(key) - var request = key.attachment.asInstanceOf[Receive] + var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { - request = new BoundedByteBufferReceive(maxRequestSize) - key.attach(request) + receive = new BoundedByteBufferReceive(maxRequestSize) + key.attach(receive) } - val read = request.readFrom(socketChannel) - stats.recordBytesRead(read) + val read = receive.readFrom(socketChannel) trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress()) if(read < 0) { close(key) - } else if(request.complete) { - val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds) + } else if(receive.complete) { + val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeNs = time.nanoseconds) requestChannel.sendRequest(req) trace("Recieved request, sending for processing by handler: " + req) key.attach(null) @@ -315,13 +312,14 @@ */ def write(key: SelectionKey) { val socketChannel = channelFor(key) - var response = key.attachment().asInstanceOf[Send] - if(response == null) + val response = key.attachment().asInstanceOf[RequestChannel.Response] + val responseSend = response.responseSend + if(responseSend == null) throw new IllegalStateException("Registered for write interest but no response attached to key.") - val written = response.writeTo(socketChannel) - stats.recordBytesWritten(written) + val written = responseSend.writeTo(socketChannel) trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress()) - if(response.complete) { + if(responseSend.complete) { + response.request.endMetricsTracking() key.attach(null) key.interestOps(SelectionKey.OP_READ) } else { Index: core/src/main/scala/kafka/network/RequestChannel.scala =================================================================== --- core/src/main/scala/kafka/network/RequestChannel.scala (revision 1378595) +++ core/src/main/scala/kafka/network/RequestChannel.scala (working copy) @@ -19,24 +19,79 @@ import java.util.concurrent._ import kafka.utils.SystemTime +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge +import java.nio.ByteBuffer +import kafka.api._ -object RequestChannel { - val AllDone = new Request(1, 2, null, 0) - case class Request(processor: Int, requestKey: Any, request: Receive, start: Long) - case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsedNs: Long) { +object RequestChannel { + val AllDone = new Request(1, 2, getShutdownReceive(), 0) + + def getShutdownReceive() = { + val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Array[TopicData]()) + val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2) + byteBuffer.putShort(RequestKeys.ProduceKey) + val serializedEmptyProducerRequest = emptyProducerRequest.writeTo(byteBuffer) + byteBuffer.rewind() + byteBuffer + } + + case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeNs: Long) { + var dequeueTimeNs = -1L + var apiLocalCompleteTimeNs = -1L + var responseCompletTimeNs = -1L + val requestId = buffer.getShort() + val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) + buffer.rewind() + + def endMetricsTracking() { + val endTimeNs = SystemTime.nanoseconds + val queueTime = (dequeueTimeNs - startTimeNs).max(0L) + val apiLocalTime = (apiLocalCompleteTimeNs - dequeueTimeNs).max(0L) + val apiRemoteTime = (responseCompletTimeNs - apiLocalCompleteTimeNs).max(0L) + val sendTime = (endTimeNs - responseCompletTimeNs).max(0L) + val totalTime = endTimeNs - startTimeNs + var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId))) + if (requestId == RequestKeys.FetchKey) { + val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower + metricsList ::= ( if (isFromFollower) + RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName) + else + RequestMetrics.metricsMap(RequestMetrics.regularFetchMetricName) ) + } + metricsList.foreach{ + m => m.requestRate.mark() + m.queueTimeHist.update(queueTime) + m.localTimeHist.update(apiLocalTime) + m.remoteTimeHist.update(apiRemoteTime) + m.sendTimeHist.update(sendTime) + m.totalTimeHist.update(totalTime) + } + } + } + + case class Response(processor: Int, request: Request, responseSend: Send) { + request.responseCompletTimeNs = SystemTime.nanoseconds + def this(request: Request, send: Send) = - this(request.processor, request.requestKey, send, request.start, SystemTime.nanoseconds - request.start) + this(request.processor, request, send) } } -class RequestChannel(val numProcessors: Int, val queueSize: Int) { +class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup { private var responseListeners: List[(Int) => Unit] = Nil private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) for(i <- 0 until numProcessors) responseQueues(i) = new ArrayBlockingQueue[RequestChannel.Response](queueSize) - + newGauge( + "RequestQueueSize", + new Gauge[Int] { + def value() = requestQueue.size + } + ) + /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { requestQueue.put(request) @@ -60,5 +115,22 @@ def addResponseListener(onResponse: Int => Unit) { responseListeners ::= onResponse } +} +object RequestMetrics { + val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics] + val regularFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Regular" + val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower" + (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1) + ++ List(regularFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) } + +class RequestMetrics(name: String) extends KafkaMetricsGroup { + val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS) + val queueTimeHist = newHistogram(name + "-QueueTime") + val localTimeHist = newHistogram(name + "-LocalTime") + val remoteTimeHist = newHistogram(name + "-RemoteTime") + val sendTimeHist = newHistogram(name + "-SendTime") + val totalTimeHist = newHistogram(name + "-TotalTime") +} + Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1378595) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -21,6 +21,8 @@ import kafka.network._ import kafka.utils._ import kafka.common.ErrorMapping +import kafka.metrics.KafkaMetricsGroup +import java.util.concurrent.TimeUnit /** * A consumer of kafka messages @@ -91,15 +93,16 @@ * @return a set of fetched messages */ def fetch(request: FetchRequest): FetchResponse = { - val startTime = SystemTime.nanoseconds - val response = sendRequest(request) + val context = ConsumerRequestStat.requestTimer.time() + var response: Receive = null + try { + response = sendRequest(request) + } finally { + context.stop() + } val fetchResponse = FetchResponse.readFrom(response.buffer) val fetchedSize = fetchResponse.sizeInBytes - - val endTime = SystemTime.nanoseconds - SimpleConsumerStats.recordFetchRequest(endTime - startTime) - SimpleConsumerStats.recordConsumptionThroughput(fetchedSize) - + ConsumerRequestStat.requestSizeHist.update(fetchedSize) fetchResponse } @@ -125,39 +128,7 @@ } } -trait SimpleConsumerStatsMBean { - def getFetchRequestsPerSecond: Double - def getAvgFetchRequestMs: Double - def getMaxFetchRequestMs: Double - def getNumFetchRequests: Long - def getConsumerThroughput: Double -} - -@threadsafe -class SimpleConsumerStats extends SimpleConsumerStatsMBean { - private val fetchRequestStats = new SnapshotStats - - def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs) - - def recordConsumptionThroughput(data: Long) = fetchRequestStats.recordThroughputMetric(data) - - def getFetchRequestsPerSecond: Double = fetchRequestStats.getRequestsPerSecond - - def getAvgFetchRequestMs: Double = fetchRequestStats.getAvgMetric / (1000.0 * 1000.0) - - def getMaxFetchRequestMs: Double = fetchRequestStats.getMaxMetric / (1000.0 * 1000.0) - - def getNumFetchRequests: Long = fetchRequestStats.getNumRequests - - def getConsumerThroughput: Double = fetchRequestStats.getThroughput -} - -object SimpleConsumerStats extends Logging { - private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats" - private val stats = new SimpleConsumerStats - Utils.registerMBean(stats, simpleConsumerstatsMBeanName) - - def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs) - def recordConsumptionThroughput(data: Long) = stats.recordConsumptionThroughput(data) -} - +object ConsumerRequestStat extends KafkaMetricsGroup { + val requestTimer = newTimer("ConsumerRequestTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS) + val requestSizeHist = newHistogram("ConsumerRequestSize") +} \ No newline at end of file Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1378595) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -47,8 +47,8 @@ currentTopicInfo.resetConsumeOffset(consumedOffset) val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) - ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) - ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1) + ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark() + ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark() item } Index: core/src/main/scala/kafka/consumer/ConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConnector.scala (revision 1378595) +++ core/src/main/scala/kafka/consumer/ConsumerConnector.scala (working copy) @@ -18,7 +18,7 @@ package kafka.consumer import scala.collection._ -import kafka.utils.{Utils, Logging} +import kafka.utils.Logging import kafka.serializer.{DefaultDecoder, Decoder} /** @@ -64,8 +64,6 @@ } object Consumer extends Logging { - private val consumerStatsMBeanName = "kafka:type=kafka.ConsumerStats" - /** * Create a ConsumerConnector * @@ -74,7 +72,6 @@ */ def create(config: ConsumerConfig): ConsumerConnector = { val consumerConnect = new ZookeeperConsumerConnector(config) - Utils.registerMBean(consumerConnect, consumerStatsMBeanName) consumerConnect } @@ -86,7 +83,6 @@ */ def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = { val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config) - Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName) consumerConnect } } Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 1378595) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (working copy) @@ -59,8 +59,8 @@ chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) val newOffset = fetchedOffset.addAndGet(size) debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) - ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size) - ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size) + ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size) + ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size) } } Index: core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (revision 1378595) +++ core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (working copy) @@ -17,44 +17,24 @@ package kafka.consumer -import java.util.concurrent.atomic.AtomicLong -import kafka.utils.{Pool, Utils, threadsafe, Logging} +import kafka.utils.{Pool, threadsafe, Logging} +import java.util.concurrent.TimeUnit +import kafka.metrics.KafkaMetricsGroup -trait ConsumerTopicStatMBean { - def getMessagesPerTopic: Long - def getBytesPerTopic: Long -} - @threadsafe -class ConsumerTopicStat extends ConsumerTopicStatMBean { - private val numCumulatedMessagesPerTopic = new AtomicLong(0) - private val numCumulatedBytesPerTopic = new AtomicLong(0) - - def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get - - def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) - - def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get - - def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes) +class ConsumerTopicStat(name: String) extends KafkaMetricsGroup { + val messageRate = newMeter(name + "-MessagesPerSec", "messages", TimeUnit.SECONDS) + val byteRate = newMeter(name + "-BytesPerSec", "bytes", TimeUnit.SECONDS) } object ConsumerTopicStat extends Logging { - private val stats = new Pool[String, ConsumerTopicStat] - private val allTopicStat = new ConsumerTopicStat - Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat") + private val valueFactory = (k: String) => new ConsumerTopicStat(k) + private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory)) + private val allTopicStat = new ConsumerTopicStat("Total") def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat def getConsumerTopicStat(topic: String): ConsumerTopicStat = { - var stat = stats.get(topic) - if (stat == null) { - stat = new ConsumerTopicStat - if (stats.putIfNotExists(topic, stat) == null) - Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic) - else - stat = stats.get(topic) - } - return stat + stats.getAndMaybePut(topic) } } Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1378595) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -73,21 +73,9 @@ val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L) } -/** - * JMX interface for monitoring consumer - */ -trait ZookeeperConsumerConnectorMBean { - def getPartOwnerStats: String - def getConsumerGroup: String - def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long - def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long - def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long -} - private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) // for testing only - extends ConsumerConnector with ZookeeperConsumerConnectorMBean - with Logging { + extends ConsumerConnector with Logging { private val isShuttingDown = new AtomicBoolean(false) private val rebalanceLock = new Object private var fetcher: Option[ConsumerFetcherManager] = None @@ -260,58 +248,6 @@ } } - // for JMX - def getPartOwnerStats(): String = { - val builder = new StringBuilder - for ((topic, infos) <- topicRegistry) { - builder.append("\n" + topic + ": [") - val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - for(partition <- infos.values) { - builder.append("\n {") - builder.append{partition} - builder.append(",fetch offset:" + partition.getFetchOffset) - builder.append(",consumer offset:" + partition.getConsumeOffset) - builder.append("}") - } - builder.append("\n ]") - } - builder.toString - } - - // for JMX - def getConsumerGroup(): String = config.groupId - - def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long = - getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId) - - def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = { - val partitionInfos = topicRegistry.get(topic) - if (partitionInfos != null) { - val partitionInfo = partitionInfos.get(partitionId) - if (partitionInfo != null) - return partitionInfo.getConsumeOffset - } - - //otherwise, try to get it from zookeeper - try { - val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - val znode = topicDirs.consumerOffsetDir + "/" + partitionId - val offsetString = readDataMaybeNull(zkClient, znode)._1 - if (offsetString != null) - return offsetString.toLong - else - return -1 - } - catch { - case e => - error("error in getConsumedOffset JMX ", e) - } - return -2 - } - - def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = - earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime) - private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = { var simpleConsumer: SimpleConsumer = null var producedOffset: Long = -1L Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1378595) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -20,13 +20,11 @@ import java.io._ import java.nio._ import java.nio.channels._ -import java.util.concurrent.atomic._ import java.lang.management._ import java.util.zip.CRC32 import javax.management._ import scala.collection._ import scala.collection.mutable -import kafka.message.{NoCompressionCodec, CompressionCodec} import org.I0Itec.zkclient.ZkClient import java.util.{Random, Properties} import joptsimple.{OptionSpec, OptionSet, OptionParser} @@ -686,102 +684,8 @@ for (forever <- Stream.continually(1); t <- coll) yield t stream.iterator } - } -class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) { - private val time: Time = SystemTime - - private val complete = new AtomicReference(new Stats()) - private val current = new AtomicReference(new Stats()) - private val total = new AtomicLong(0) - private val numCumulatedRequests = new AtomicLong(0) - - def recordRequestMetric(requestNs: Long) { - val stats = current.get - stats.add(requestNs) - total.getAndAdd(requestNs) - numCumulatedRequests.getAndAdd(1) - val ageNs = time.nanoseconds - stats.start - // if the current stats are too old it is time to swap - if(ageNs >= monitorDurationNs) { - val swapped = current.compareAndSet(stats, new Stats()) - if(swapped) { - complete.set(stats) - stats.end.set(time.nanoseconds) - } - } - } - - def recordThroughputMetric(data: Long) { - val stats = current.get - stats.addData(data) - val ageNs = time.nanoseconds - stats.start - // if the current stats are too old it is time to swap - if(ageNs >= monitorDurationNs) { - val swapped = current.compareAndSet(stats, new Stats()) - if(swapped) { - complete.set(stats) - stats.end.set(time.nanoseconds) - } - } - } - - def getNumRequests(): Long = numCumulatedRequests.get - - def getRequestsPerSecond: Double = { - val stats = complete.get - stats.numRequests / stats.durationSeconds - } - - def getThroughput: Double = { - val stats = complete.get - stats.totalData / stats.durationSeconds - } - - def getAvgMetric: Double = { - val stats = complete.get - if (stats.numRequests == 0) { - 0 - } - else { - stats.totalRequestMetric / stats.numRequests - } - } - - def getTotalMetric: Long = total.get - - def getMaxMetric: Double = complete.get.maxRequestMetric - - class Stats { - val start = time.nanoseconds - var end = new AtomicLong(-1) - var numRequests = 0 - var totalRequestMetric: Long = 0L - var maxRequestMetric: Long = 0L - var totalData: Long = 0L - private val lock = new Object() - - def addData(data: Long) { - lock synchronized { - totalData += data - } - } - - def add(requestNs: Long) { - lock synchronized { - numRequests +=1 - totalRequestMetric += requestNs - maxRequestMetric = scala.math.max(maxRequestMetric, requestNs) - } - } - - def durationSeconds: Double = (end.get - start) / (1000.0 * 1000.0 * 1000.0) - - def durationMs: Double = (end.get - start) / (1000.0 * 1000.0) - } -} - /** * A wrapper that synchronizes JSON in scala, which is not threadsafe. */ Index: core/src/main/scala/kafka/utils/Pool.scala =================================================================== --- core/src/main/scala/kafka/utils/Pool.scala (revision 1378595) +++ core/src/main/scala/kafka/utils/Pool.scala (working copy) @@ -21,12 +21,14 @@ import java.util.concurrent._ import collection.JavaConversions import kafka.common.KafkaException +import java.lang.Object class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] { private val pool = new ConcurrentHashMap[K, V] - + private val createLock = new Object + def this(m: collection.Map[K, V]) { this() m.foreach(kv => pool.put(kv._1, kv._2)) @@ -52,8 +54,12 @@ throw new KafkaException("Empty value factory in pool.") val curr = pool.get(key) if (curr == null) { - pool.putIfAbsent(key, valueFactory.get(key)) - pool.get(key) + createLock synchronized { + val curr = pool.get(key) + if (curr == null) + pool.put(key, valueFactory.get(key)) + pool.get(key) + } } else curr Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1378595) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -18,7 +18,7 @@ package kafka.server import java.io.File -import kafka.network.{SocketServerStats, SocketServer} +import kafka.network.SocketServer import kafka.log.LogManager import kafka.utils._ import java.util.concurrent._ @@ -34,7 +34,6 @@ val CleanShutdownFile = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) - private val statsMBeanName = "kafka:type=kafka.SocketServerStats" var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null @@ -82,8 +81,6 @@ socketServer.startup - Utils.registerMBean(socketServer.stats, statsMBeanName) - /* start client */ kafkaZookeeper = new KafkaZooKeeper(config) // starting relevant replicas and leader election for partitions assigned to this broker @@ -123,7 +120,6 @@ replicaManager.shutdown() if (socketServer != null) socketServer.shutdown() - Utils.unregisterMBean(statsMBeanName) if(logManager != null) logManager.shutdown() @@ -144,8 +140,6 @@ def awaitShutdown(): Unit = shutdownLatch.await() def getLogManager(): LogManager = logManager - - def getStats(): SocketServerStats = socketServer.stats } Index: core/src/main/scala/kafka/server/RequestPurgatory.scala =================================================================== --- core/src/main/scala/kafka/server/RequestPurgatory.scala (revision 1378595) +++ core/src/main/scala/kafka/server/RequestPurgatory.scala (working copy) @@ -33,7 +33,6 @@ * for example a key could be a (topic, partition) pair. */ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { - val creationTimeNs = SystemTime.nanoseconds val satisfied = new AtomicBoolean(false) } @@ -67,34 +66,13 @@ /* a list of requests watching each key */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - private val numDelayedRequestsBeanName = "NumDelayedRequests" - private val timeToSatisfyHistogramBeanName = "TimeToSatisfyInNs" - private val satisfactionRateBeanName = "SatisfactionRate" - private val expirationRateBeanName = "ExpirationRate" - - override def metricsGroupIdent = "" - - val satisfactionRateMeter = newMeter( - satisfactionRateBeanName, - "requests", - TimeUnit.SECONDS - ) - - val timeToSatisfyHistogram = newHistogram(timeToSatisfyHistogramBeanName, biased = true) - newGauge( - numDelayedRequestsBeanName, + "NumDelayedRequests", new Gauge[Int] { def value() = expiredRequestReaper.unsatisfied.get() } ) - val expirationRateMeter = newMeter( - expirationRateBeanName, - "requests", - TimeUnit.SECONDS - ) - /* background thread expiring requests that have been waiting too long */ private val expiredRequestReaper = new ExpiredRequestReaper private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper) @@ -198,10 +176,6 @@ iter.remove() val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { - val requestNs = SystemTime.nanoseconds - curr.creationTimeNs - satisfactionRateMeter.mark() - timeToSatisfyHistogram.update(requestNs) - response += curr liveCount -= 1 expiredRequestReaper.satisfyRequest() @@ -284,7 +258,6 @@ val curr = delayed.take() val updated = curr.satisfied.compareAndSet(false, true) if(updated) { - expirationRateMeter.mark() unsatisfied.getAndDecrement() for(key <- curr.keys) watchersFor(key).decLiveCount() Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1378595) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -24,13 +24,16 @@ import kafka.log.LogManager import kafka.api.{LeaderAndISRRequest, LeaderAndISR} import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping} +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge +import java.util.concurrent.TimeUnit object ReplicaManager { val UnknownLogEndOffset = -1L } class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, - val logManager: LogManager) extends Logging { + val logManager: LogManager) extends Logging with KafkaMetricsGroup { private val allPartitions = new Pool[(String, Int), Partition] private var leaderPartitions = new mutable.HashSet[Partition]() private val leaderPartitionsLock = new Object @@ -41,6 +44,26 @@ val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) + newGauge( + "LeaderCount", + new Gauge[Int] { + def value() = leaderPartitions.size + } + ) + newGauge( + "UnderReplicatedPartitions", + new Gauge[Int] { + def value() = { + leaderPartitionsLock synchronized { + leaderPartitions.count(_.isUnderReplicated) + } + } + } + ) + val ISRExpandRate = newMeter("ISRExpand", "expands", TimeUnit.SECONDS) + val ISRShrinkRate = newMeter("ISRShrink", "shrinks", TimeUnit.SECONDS) + + def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) Index: core/src/main/scala/kafka/server/KafkaController.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaController.scala (revision 1378595) +++ core/src/main/scala/kafka/server/KafkaController.scala (working copy) @@ -29,6 +29,8 @@ import collection.JavaConversions._ import kafka.utils.{ShutdownableThread, ZkUtils, Logging} import java.lang.Object +import com.yammer.metrics.core.Gauge +import kafka.metrics.KafkaMetricsGroup class RequestSendThread(val controllerId: Int, @@ -51,9 +53,9 @@ receive = channel.receive() var response: RequestOrResponse = null request.requestId.get match { - case RequestKeys.LeaderAndISRRequest => + case RequestKeys.LeaderAndISRKey => response = LeaderAndISRResponse.readFrom(receive.buffer) - case RequestKeys.StopReplicaRequest => + case RequestKeys.StopReplicaKey => response = StopReplicaResponse.readFrom(receive.buffer) } trace("got a response %s".format(controllerId, response, toBrokerId)) @@ -144,7 +146,7 @@ } } -class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging { +class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "], " info("startup"); private var isRunning = true @@ -156,6 +158,13 @@ private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null private var allLeaders: mutable.Map[(String, Int), Int] = null + newGauge( + "ActiveControllerCount", + new Gauge[Int] { + def value() = if (isActive) 1 else 0 + } + ) + // Return true if this controller succeeds in the controller competition private def tryToBecomeController(): Boolean = { try { Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandler.scala (revision 1378595) +++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala (working copy) @@ -19,7 +19,8 @@ import kafka.network._ import kafka.utils._ -import java.util.concurrent.atomic.AtomicLong +import kafka.metrics.KafkaMetricsGroup +import java.util.concurrent.TimeUnit /** * A thread that answers kafka requests. @@ -30,10 +31,11 @@ def run() { while(true) { val req = requestChannel.receiveRequest() - if(req == RequestChannel.AllDone){ + if(req eq RequestChannel.AllDone){ trace("receives shut down command, shut down".format(brokerId, id)) return } + req.dequeueTimeNs = SystemTime.nanoseconds debug("handles request " + req) apis.handle(req) } @@ -63,62 +65,24 @@ thread.join info("shutted down completely") } - } -trait BrokerTopicStatMBean { - def getMessagesIn: Long - def getBytesIn: Long - def getBytesOut: Long - def getFailedProduceRequest: Long - def getFailedFetchRequest: Long +class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup { + val messageInRate = newMeter(name + "-MessagesInPerSec", "messages", TimeUnit.SECONDS) + val byteInRate = newMeter(name + "-BytesInPerSec", "bytes", TimeUnit.SECONDS) + val byteOutRate = newMeter(name + "-BytesOutPerSec", "bytes", TimeUnit.SECONDS) + val failedProduceRequestRate = newMeter(name + "-FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS) + val failedFetchRequestRate = newMeter(name + "-FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS) } -@threadsafe -class BrokerTopicStat extends BrokerTopicStatMBean { - private val numCumulatedMessagesIn = new AtomicLong(0) - private val numCumulatedBytesIn = new AtomicLong(0) - private val numCumulatedBytesOut = new AtomicLong(0) - private val numCumulatedFailedProduceRequests = new AtomicLong(0) - private val numCumulatedFailedFetchRequests = new AtomicLong(0) - - def getMessagesIn: Long = numCumulatedMessagesIn.get - - def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages) - - def getBytesIn: Long = numCumulatedBytesIn.get - - def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes) - - def getBytesOut: Long = numCumulatedBytesOut.get - - def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes) - - def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement - - def getFailedProduceRequest = numCumulatedFailedProduceRequests.get() - - def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement - - def getFailedFetchRequest = numCumulatedFailedFetchRequests.get() -} - object BrokerTopicStat extends Logging { - private val stats = new Pool[String, BrokerTopicStat] - private val allTopicStat = new BrokerTopicStat - Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat") + private val valueFactory = (k: String) => new BrokerTopicMetrics(k) + private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) + private val allTopicStat = new BrokerTopicMetrics("Total") - def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat + def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat - def getBrokerTopicStat(topic: String): BrokerTopicStat = { - var stat = stats.get(topic) - if (stat == null) { - stat = new BrokerTopicStat - if (stats.putIfNotExists(topic, stat) == null) - Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic) - else - stat = stats.get(topic) - } - return stat + def getBrokerTopicStat(topic: String): BrokerTopicMetrics = { + stats.getAndMaybePut(topic) } } Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1378595) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -40,8 +40,6 @@ val replicaManager: ReplicaManager, val zkClient: ZkClient, brokerId: Int) extends Logging { - - private val metricsGroup = brokerId.toString private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId) private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel) private val delayedRequestMetrics = new DelayedRequestMetrics @@ -53,20 +51,20 @@ * Top-level method that handles all requests and multiplexes to the right api */ def handle(request: RequestChannel.Request) { - val apiId = request.request.buffer.getShort() - apiId match { - case RequestKeys.Produce => handleProducerRequest(request) - case RequestKeys.Fetch => handleFetchRequest(request) - case RequestKeys.Offsets => handleOffsetRequest(request) - case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request) - case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request) - case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request) - case _ => throw new KafkaException("No mapping found for handler id " + apiId) + request.requestId match { + case RequestKeys.ProduceKey => handleProducerRequest(request) + case RequestKeys.FetchKey => handleFetchRequest(request) + case RequestKeys.OffsetsKey => handleOffsetRequest(request) + case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) + case RequestKeys.LeaderAndISRKey => handleLeaderAndISRRequest(request) + case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) + case requestId => throw new KafkaException("No mapping found for handler id " + requestId) } + request.apiLocalCompleteTimeNs = SystemTime.nanoseconds } def handleLeaderAndISRRequest(request: RequestChannel.Request){ - val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer) + val leaderAndISRRequest = request.requestObj.asInstanceOf[LeaderAndISRRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest) trace("Handling leader and isr request " + leaderAndISRRequest) @@ -78,7 +76,7 @@ def handleStopReplicaRequest(request: RequestChannel.Request){ - val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer) + val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling stop replica request " + stopReplicaRequest) trace("Handling stop replica request " + stopReplicaRequest) @@ -107,10 +105,7 @@ val topicData = readMessageSets(fetchReq.fetch) val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) - val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId - delayedRequestMetrics.recordDelayedFetchSatisfied( - fromFollower, SystemTime.nanoseconds - fetchReq.creationTimeNs, response) - + val fromFollower = fetchReq.fetch.isFromFollower requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) } } @@ -119,7 +114,7 @@ * Handle a produce request */ def handleProducerRequest(request: RequestChannel.Request) { - val produceRequest = ProducerRequest.readFrom(request.request.buffer) + val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] val sTime = SystemTime.milliseconds if(requestLogger.isTraceEnabled) requestLogger.trace("Handling producer request " + request.toString) @@ -178,8 +173,8 @@ for(topicData <- request.data) { for(partitionData <- topicData.partitionDataArray) { msgIndex += 1 - BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes) + BrokerTopicStat.getBrokerTopicStat(topicData.topic).byteInRate.mark(partitionData.messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.byteInRate.mark(partitionData.messages.sizeInBytes) try { val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition) val log = localReplica.log.get @@ -192,8 +187,8 @@ .format(partitionData.messages.sizeInBytes, offsets(msgIndex))) } catch { case e => - BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest - BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest + BrokerTopicStat.getBrokerTopicStat(topicData.topic).failedProduceRequestRate.mark() + BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark() error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e) e match { case _: IOException => @@ -213,7 +208,7 @@ * Handle a fetch request */ def handleFetchRequest(request: RequestChannel.Request) { - val fetchRequest = FetchRequest.readFrom(request.request.buffer) + val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling fetch request " + fetchRequest.toString) trace("Handling fetch request " + fetchRequest.toString) @@ -228,7 +223,7 @@ requestChannel.sendResponse(channelResponse) } - if(fetchRequest.replicaId != FetchRequest.NonFollowerId) { + if(fetchRequest.isFromFollower) { maybeUpdatePartitionHW(fetchRequest) // after updating HW, some delayed produce requests may be unblocked var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] @@ -313,18 +308,18 @@ for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { case Left(err) => - BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest - BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest + BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark() + BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark() fetchRequest.replicaId match { case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty) case _ => new PartitionData(partition, err, offset, -1L, MessageSet.Empty) } case Right(messages) => - BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) + BrokerTopicStat.getBrokerTopicStat(topic).byteOutRate.mark(messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.byteOutRate.mark(messages.sizeInBytes) val leaderReplica = replicaManager.getReplica(topic, partition).get - if (fetchRequest.replicaId != FetchRequest.NonFollowerId) { + if (fetchRequest.isFromFollower) { debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]" .format(topic, partition, fetchRequest.replicaId)) debug("Leader returning %d messages for topic %s partition %d to follower %d" @@ -362,7 +357,7 @@ * Service the offset request API */ def handleOffsetRequest(request: RequestChannel.Request) { - val offsetRequest = OffsetRequest.readFrom(request.request.buffer) + val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling offset request " + offsetRequest.toString) trace("Handling offset request " + offsetRequest.toString) @@ -388,7 +383,7 @@ * Service the topic metadata request API */ def handleTopicMetadataRequest(request: RequestChannel.Request) { - val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer) + val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] if(requestLogger.isTraceEnabled) requestLogger.trace("Handling topic metadata request " + metadataRequest.toString()) trace("Handling topic metadata request " + metadataRequest.toString()) @@ -459,10 +454,8 @@ */ class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) { - this.logIdent = "[etchRequestPurgatory-%d], ".format(brokerId) + this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId) - override def metricsGroupIdent = metricsGroup - /** * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field */ @@ -475,8 +468,8 @@ def expire(delayed: DelayedFetch) { val topicData = readMessageSets(delayed.fetch) val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) - val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId - delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response) + val fromFollower = delayed.fetch.isFromFollower + delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) } } @@ -546,7 +539,6 @@ val partitionId = followerFetchRequestKey.partition val key = RequestKey(topic, partitionId) val fetchPartitionStatus = partitionStatus(key) - val durationNs = SystemTime.nanoseconds - creationTimeNs trace("Checking producer request satisfaction for %s-%d, acksPending = %b" .format(topic, partitionId, fetchPartitionStatus.acksPending)) if (fetchPartitionStatus.acksPending) { @@ -562,17 +554,12 @@ if (!fetchPartitionStatus.acksPending) { val topicData = produce.data.find(_.topic == topic).get val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get - delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key, - durationNs, - partitionData.sizeInBytes) maybeUnblockDelayedFetchRequests(topic, Array(partitionData)) } } // unblocked if there are no partitions with pending acks val satisfied = ! partitionStatus.exists(p => p._2.acksPending) - if (satisfied) - delayedRequestMetrics.recordDelayedProduceSatisfied(durationNs) satisfied } @@ -598,8 +585,6 @@ this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId) - override def metricsGroupIdent = metricsGroup - protected def checkSatisfied(followerFetchRequestKey: RequestKey, delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(followerFetchRequestKey) @@ -617,51 +602,15 @@ private class DelayedRequestMetrics { private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { - override def metricsGroupIdent = metricsGroup - val caughtUpFollowerFetchRequestMeter = - newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS) - val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel) - Some(newHistogram("FollowerCatchUpTimeInNs", biased = true)) - else None - - /* - * Note that throughput is updated on individual key satisfaction. - * Therefore, it is an upper bound on throughput since the - * DelayedProducerRequest may get expired. - */ - val throughputMeter = newMeter("Throughput-" + keyLabel, "bytes", TimeUnit.SECONDS) - val expiredRequestMeter = newMeter("ExpiredRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS) - - val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel) - Some(newMeter("SatisfiedRequestsPerSecond", "requests", TimeUnit.SECONDS)) - else None - val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel) - Some(newHistogram("SatisfactionTimeInNs", biased = true)) - else None + val expiredRequestMeter = newMeter("ExpiresPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS) } private class DelayedFetchRequestMetrics(forFollower: Boolean, keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { - private val metricPrefix = if (forFollower) "Follower" else "NonFollower" + private val metricPrefix = if (forFollower) "Follower" else "Regular" - override def metricsGroupIdent = metricsGroup - val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel) - Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond", - "requests", TimeUnit.SECONDS)) - else None - - val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel) - Some(newHistogram(metricPrefix + "-SatisfactionTimeInNs", biased = true)) - else None - - val expiredRequestMeter = if (keyLabel == MetricKey.globalLabel) - Some(newMeter(metricPrefix + "-ExpiredRequestsPerSecond", - "requests", TimeUnit.SECONDS)) - else None - - val throughputMeter = newMeter("%s-Throughput-%s".format(metricPrefix, keyLabel), - "bytes", TimeUnit.SECONDS) + val expiredRequestMeter = newMeter(metricPrefix + "-ExpiresPerSecond", "requests", TimeUnit.SECONDS) } private val producerRequestMetricsForKey = { @@ -674,76 +623,17 @@ private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) - private val followerFetchRequestMetricsForKey = { - val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = true, k.keyLabel) - new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory)) - } - - private val nonFollowerFetchRequestMetricsForKey = { - val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = false, k.keyLabel) - new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory)) - } - def recordDelayedProducerKeyExpired(key: MetricKey) { val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) } - - def recordDelayedProducerKeyCaughtUp(key: MetricKey, timeToCatchUpNs: Long, bytes: Int) { - val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) - List(keyMetrics, aggregateProduceRequestMetrics).foreach(m => { - m.caughtUpFollowerFetchRequestMeter.mark() - m.followerCatchUpTimeHistogram.foreach(_.update(timeToCatchUpNs)) - m.throughputMeter.mark(bytes) - }) - } - - - def recordDelayedProduceSatisfied(timeToSatisfyNs: Long) { - aggregateProduceRequestMetrics.satisfiedRequestMeter.foreach(_.mark()) - aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs)) - } - - - private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) { + def recordDelayedFetchExpired(forFollower: Boolean) { val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics else aggregateNonFollowerFetchRequestMetrics - metrics.throughputMeter.mark(response.sizeInBytes) - - response.topicMap.foreach(topicAndData => { - val topic = topicAndData._1 - topicAndData._2.partitionDataArray.foreach(partitionData => { - val key = RequestKey(topic, partitionData.partition) - val keyMetrics = if (forFollower) - followerFetchRequestMetricsForKey.getAndMaybePut(key) - else - nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key) - keyMetrics.throughputMeter.mark(partitionData.sizeInBytes) - }) - }) - } - - - def recordDelayedFetchExpired(forFollower: Boolean, response: FetchResponse) { - val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - metrics.expiredRequestMeter.foreach(_.mark()) - - recordDelayedFetchThroughput(forFollower, response) + metrics.expiredRequestMeter.mark() } - - - def recordDelayedFetchSatisfied(forFollower: Boolean, timeToSatisfyNs: Long, response: FetchResponse) { - val aggregateMetrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - - aggregateMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs)) - aggregateMetrics.satisfiedRequestMeter.foreach(_.mark()) - - recordDelayedFetchThroughput(forFollower, response) - } } } Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala (revision 1378595) +++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala (working copy) @@ -23,7 +23,10 @@ import collection.mutable import kafka.message.ByteBufferMessageSet import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder} -import kafka.utils.ShutdownableThread +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge +import java.util.concurrent.atomic.AtomicLong +import kafka.utils.{Pool, ShutdownableThread} /** @@ -94,6 +97,7 @@ case ErrorMapping.NoError => processPartitionData(topic, currentOffset.get, partitionData) val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes + FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset fetchMap.put(key, newOffset) case ErrorMapping.OffsetOutOfRangeCode => val newOffset = handleOffsetOutOfRange(topic, partitionId) @@ -140,4 +144,30 @@ fetchMap.size } } +} + +class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup { + private val lock = new Object + private[this] var lagVal = new AtomicLong(-1L) + newGauge( + name._1 + "-" + name._2 + "-ConsumerLag", + new Gauge[Long] { + def value() = lagVal.get + } + ) + + def lag_=(newLag: Long) { + lagVal.set(newLag) + } + + def lag = lagVal.get +} + +object FetcherLagMetrics { + private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k) + private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory)) + + def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = { + stats.getAndMaybePut( (topic, partitionId) ) + } } \ No newline at end of file Index: core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala =================================================================== --- core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (revision 1378595) +++ core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (working copy) @@ -27,38 +27,14 @@ trait KafkaMetricsGroup extends Logging { /** - * This method enables the user to form logical sub-groups of this - * KafkaMetricsGroup by inserting a sub-group identifier in the package - * string. - * - * @return The sub-group identifier. - */ - def metricsGroupIdent: String - - /** * Creates a new MetricName object for gauges, meters, etc. created for this - * metrics group. It uses the metricsGroupIdent to create logical sub-groups. - * This is currently specifically of use to classes under kafka, with - * broker-id being the most common metrics grouping strategy. - * + * metrics group. * @param name Descriptive name of the metric. * @return Sanitized metric name object. */ private def metricName(name: String) = { - val ident = metricsGroupIdent val klass = this.getClass - val pkg = { - val actualPkg = if (klass.getPackage == null) "" else klass.getPackage.getName - if (ident.nonEmpty) { - // insert the sub-group identifier after the top-level package - if (actualPkg.contains(".")) - actualPkg.replaceFirst("""\.""", ".%s.".format(ident)) - else - actualPkg + "." + ident - } - else - actualPkg - } + val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName val simpleName = klass.getSimpleName.replaceAll("\\$$", "") new MetricName(pkg, simpleName, name) } Index: core/src/main/scala/kafka/api/StopReplicaRequest.scala =================================================================== --- core/src/main/scala/kafka/api/StopReplicaRequest.scala (revision 1378595) +++ core/src/main/scala/kafka/api/StopReplicaRequest.scala (working copy) @@ -45,7 +45,7 @@ clientId: String, ackTimeoutMs: Int, stopReplicaSet: Set[(String, Int)]) - extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) { + extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { def this(stopReplicaSet: Set[(String, Int)]) = { this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet) } Index: core/src/main/scala/kafka/api/OffsetRequest.scala =================================================================== --- core/src/main/scala/kafka/api/OffsetRequest.scala (revision 1378595) +++ core/src/main/scala/kafka/api/OffsetRequest.scala (working copy) @@ -45,7 +45,7 @@ topic: String, partition: Int, time: Long, - maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.Offsets)) { + maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) = this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets) Index: core/src/main/scala/kafka/api/FetchRequest.scala =================================================================== --- core/src/main/scala/kafka/api/FetchRequest.scala (revision 1378595) +++ core/src/main/scala/kafka/api/FetchRequest.scala (working copy) @@ -105,7 +105,7 @@ replicaId: Int = FetchRequest.DefaultReplicaId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, - offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) { + offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.FetchKey)) { // ensure that a topic "X" appears in at most one OffsetDetail def validate() { @@ -144,6 +144,8 @@ def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes()) def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size) + + def isFromFollower(): Boolean = replicaId != FetchRequest.NonFollowerId } Index: core/src/main/scala/kafka/api/LeaderAndISRRequest.scala =================================================================== --- core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (revision 1378595) +++ core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (working copy) @@ -94,7 +94,7 @@ isInit: Boolean, ackTimeoutMs: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) - extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) { + extends RequestOrResponse(Some(RequestKeys.LeaderAndISRKey)) { def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = { this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, LeaderAndISRRequest.DefaultAckTimeout, leaderAndISRInfos) } Index: core/src/main/scala/kafka/api/RequestKeys.scala =================================================================== --- core/src/main/scala/kafka/api/RequestKeys.scala (revision 1378595) +++ core/src/main/scala/kafka/api/RequestKeys.scala (working copy) @@ -17,11 +17,36 @@ package kafka.api +import kafka.common.KafkaException +import java.nio.ByteBuffer + object RequestKeys { - val Produce: Short = 0 - val Fetch: Short = 1 - val Offsets: Short = 2 - val TopicMetadata: Short = 3 - val LeaderAndISRRequest: Short = 4 - val StopReplicaRequest: Short = 5 + val ProduceKey: Short = 0 + val FetchKey: Short = 1 + val OffsetsKey: Short = 2 + val MetadataKey: Short = 3 + val LeaderAndISRKey: Short = 4 + val StopReplicaKey: Short = 5 + + val keyToNameAndDeserializerMap : Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= + Map( ProduceKey -> ("Produce", ProducerRequest.readFrom), + FetchKey -> ("Fetch", FetchRequest.readFrom), + OffsetsKey -> ("Offsets", OffsetRequest.readFrom), + MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), + LeaderAndISRKey -> ("LeaderAndISR", LeaderAndISRRequest.readFrom), + StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom) ) + + def nameForKey(key: Short): String = { + keyToNameAndDeserializerMap.get(key) match { + case Some(nameAndSerializer) => nameAndSerializer._1 + case None => throw new KafkaException("Wrong request type %d".format(key)) + } + } + + def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = { + keyToNameAndDeserializerMap.get(key) match { + case Some(nameAndSerializer) => nameAndSerializer._2 + case None => throw new KafkaException("Wrong request type %d".format(key)) + } + } } Index: core/src/main/scala/kafka/api/TopicMetadataRequest.scala =================================================================== --- core/src/main/scala/kafka/api/TopicMetadataRequest.scala (revision 1378595) +++ core/src/main/scala/kafka/api/TopicMetadataRequest.scala (working copy) @@ -78,7 +78,7 @@ val topics: Seq[String], val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata, val timestamp: Option[Long] = None, val count: Option[Int] = None) - extends RequestOrResponse(Some(RequestKeys.TopicMetadata)){ + extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ def this(topics: Seq[String]) = this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None) Index: core/src/main/scala/kafka/api/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1378595) +++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) @@ -24,7 +24,7 @@ object ProducerRequest { val CurrentVersion: Short = 0 - + def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort val correlationId: Int = buffer.getInt @@ -58,7 +58,7 @@ clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) { + data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1378595) +++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy) @@ -24,7 +24,7 @@ val clientId: String, val requiredAcks: Short, val ackTimeoutMs: Int, - val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) { + val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)