diff --git core/src/main/scala/kafka/Kafka.scala core/src/main/scala/kafka/Kafka.scala index 8e2537d..fe2e627 100644 --- core/src/main/scala/kafka/Kafka.scala +++ core/src/main/scala/kafka/Kafka.scala @@ -17,6 +17,8 @@ package kafka + +import metrics.{KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig} import server.{KafkaConfig, KafkaServerStartable, KafkaServer} import utils.{Utils, Logging} import org.apache.log4j.jmx.LoggerDynamicMBean @@ -36,6 +38,13 @@ object Kafka extends Logging { try { val props = Utils.loadProps(args(0)) val serverConfig = new KafkaConfig(props) + val metricsConfig = new KafkaMetricsConfig(props) + metricsConfig.reporters.foreach(reporterType => { + val reporter = Utils.getObject[KafkaMetricsReporter](reporterType) + reporter.init(props) + if (reporter.isInstanceOf[KafkaMetricsReporterMBean]) + Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) + }) val kafkaServerStartble = new KafkaServerStartable(serverConfig) diff --git core/src/main/scala/kafka/api/FetchResponse.scala core/src/main/scala/kafka/api/FetchResponse.scala index c475ed7..1d581d4 100644 --- core/src/main/scala/kafka/api/FetchResponse.scala +++ core/src/main/scala/kafka/api/FetchResponse.scala @@ -73,7 +73,6 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send { } } - object TopicData { def readFrom(buffer: ByteBuffer): TopicData = { val topic = Utils.readShortString(buffer, "UTF-8") diff --git core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala new file mode 100644 index 0000000..3c6420c --- /dev/null +++ core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala @@ -0,0 +1,80 @@ +/** + * + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.metrics + +import java.util.Properties +import com.yammer.metrics.Metrics +import java.io.File +import com.yammer.metrics.reporting.CsvReporter +import kafka.utils.{nonthreadsafe, Logging, Utils} +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean + +private class KafkaCSVMetricsReporter extends KafkaMetricsReporter + with KafkaCSVMetricsReporterMBean + with Logging { + + private var csvDir: File = null + private var underlying: CsvReporter = null + private var running = false + private val initialized = new AtomicBoolean(false) + + + @nonthreadsafe + override def init(props: Properties) { + val metricsConfig = new KafkaMetricsConfig(props) + csvDir = new File(Utils.getString(props, "kafka.csv.metrics.dir", "kafka_metrics")) + if (!csvDir.exists()) + csvDir.mkdirs() + underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir) + if (Utils.getBoolean(props, "kafka.csv.metrics.reporter.enabled", false)) + startReporter(metricsConfig.pollingIntervalSecs) + initialized.set(true) + } + + + override def getMBeanName = "kafka:type=kafka.metrics.KafkaCSVMetricsReporter" + + + override def startReporter(pollingPeriodSecs: Long) { + if (initialized.get()) synchronized { + if (running == false) { + underlying.start(pollingPeriodSecs, TimeUnit.SECONDS) + info("Started Kafka CSV metrics reporter with polling period %d seconds".format(pollingPeriodSecs)) + } + } + } + + + override def stopReporter() { + if (initialized.get()) synchronized { + if (running == true) { + underlying.shutdown() + info("Stopped Kafka CSV metrics reporter") + underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir) + } + } + } + +} + diff --git core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala new file mode 100644 index 0000000..771160c --- /dev/null +++ core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala @@ -0,0 +1,38 @@ +/** + * + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.metrics + +import java.util.Properties +import kafka.utils.Utils + +class KafkaMetricsConfig(props: Properties) { + + /** + * Comma-separated list of reporter types. These classes should be on the + * classpath and will be instantiated at run-time. + */ + val reporters = Utils.getCSVList(Utils.getString(props, "kafka.metrics.reporters", "")) + + /** + * The metrics polling interval (in seconds). + */ + val pollingIntervalSecs = Utils.getInt(props, "kafka.metrics.polling.interval.secs", 10) +} diff --git core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala new file mode 100644 index 0000000..2de04cb --- /dev/null +++ core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.metrics + + +import com.yammer.metrics.core.{Gauge, MetricName} +import kafka.utils.Logging +import java.util.concurrent.TimeUnit +import com.yammer.metrics.Metrics + + +trait KafkaMetricsGroup extends Logging { + + /** + * This method enables the user to form logical sub-groups of this + * KafkaMetricsGroup by inserting a sub-group identifier in the package + * string. + * + * @return The sub-group identifier. + */ + def metricsGroupIdent: String + + /** + * Creates a new MetricName object for gauges, meters, etc. created for this + * metrics group. It uses the metricsGroupIdent to create logical sub-groups. + * This is currently specifically of use to classes under kafka, with + * broker-id being the most common metrics grouping strategy. + * + * @param name Descriptive name of the metric. + * @return Sanitized metric name object. + */ + private def metricName(name: String) = { + val ident = metricsGroupIdent + val klass = this.getClass + val pkg = { + val actualPkg = if (klass.getPackage == null) "" else klass.getPackage.getName + if (ident.nonEmpty) { + // insert the sub-group identifier after the top-level package + if (actualPkg.contains(".")) + actualPkg.replaceFirst("""\.""", ".%s.".format(ident)) + else + actualPkg + "." + ident + } + else + actualPkg + } + val simpleName = klass.getSimpleName.replaceAll("\\$$", "") + new MetricName(pkg, simpleName, name) + } + + def newGauge[T](name: String, metric: Gauge[T]) = + Metrics.newGauge(metricName(name), metric) + + def newMeter(name: String, eventType: String, timeUnit: TimeUnit) = + Metrics.newMeter(metricName(name), eventType, timeUnit) + + def newHistogram(name: String, biased: Boolean = false) = Metrics.newHistogram(metricName(name), biased) + + def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) = + Metrics.newTimer(metricName(name), durationUnit, rateUnit) + +} diff --git core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala new file mode 100644 index 0000000..b2ad920 --- /dev/null +++ core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala @@ -0,0 +1,47 @@ +/** + * + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.metrics + +import java.util.Properties + +/** + * Base trait for reporter MBeans. If a client wants to expose these JMX + * operations on a custom reporter (that implements + * [[kafka.metrics.KafkaMetricsReporter]]), the custom reporter needs to + * additionally implement an MBean trait that extends this trait so that the + * registered MBean is compliant with the standard MBean convention. + */ +trait KafkaMetricsReporterMBean { + def startReporter(pollingPeriodInSeconds: Long) + def stopReporter() + + /** + * + * @return The name with which the MBean will be registered. + */ + def getMBeanName: String +} + + +trait KafkaMetricsReporter { + def init(props: Properties) +} + diff --git core/src/main/scala/kafka/metrics/KafkaTimer.scala core/src/main/scala/kafka/metrics/KafkaTimer.scala new file mode 100644 index 0000000..f89a14f --- /dev/null +++ core/src/main/scala/kafka/metrics/KafkaTimer.scala @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.metrics + +import com.yammer.metrics.core.Timer + +/** + * A wrapper around metrics timer object that provides a convenient mechanism + * to time code blocks. This pattern was borrowed from the metrics-scala_2.9.1 + * package. + * @param metric The underlying timer object. + */ +class KafkaTimer(metric: Timer) { + + def time[A](f: => A): A = { + val ctx = metric.time + try { + f + } + finally { + ctx.stop() + } + } +} + diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index 0c86260..1188d4b 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -18,21 +18,23 @@ package kafka.server import java.io.IOException -import java.util.concurrent.atomic._ import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.api._ import kafka.common._ import kafka.log._ import kafka.message._ import kafka.network._ +import kafka.utils.{ZkUtils, Pool, SystemTime, Logging} import org.apache.log4j.Logger import scala.collection._ import mutable.HashMap import scala.math._ import kafka.network.RequestChannel.Response -import kafka.utils.{ZkUtils, SystemTime, Logging} +import java.util.concurrent.TimeUnit +import kafka.metrics.KafkaMetricsGroup import kafka.cluster.Replica + /** * Logic to handle the various Kafka requests */ @@ -44,10 +46,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, becomeFollower: (Replica, LeaderAndISR) => Short, brokerId: Int) extends Logging { - private val produceRequestPurgatory = new ProducerRequestPurgatory(brokerId) - private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel) + private val metricsGroup = brokerId.toString + private val producerRequestPurgatory = new ProducerRequestPurgatory + private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel) + private val delayedRequestMetrics = new DelayedRequestMetrics + private val requestLogger = Logger.getLogger("kafka.request.logger") - this.logIdent = "KafkaApi on Broker " + brokerId + ", " + this.logIdent = "KafkaApis-%d ".format(brokerId) /** * Top-level method that handles all requests and multiplexes to the right api @@ -69,7 +74,9 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, def handleLeaderAndISRRequest(request: RequestChannel.Request){ val responseMap = new HashMap[(String, Int), Short] val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer) - info("handling leader and isr request " + leaderAndISRRequest) + if(requestLogger.isTraceEnabled) + requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest) + trace("Handling leader and isr request " + leaderAndISRRequest) for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){ var errorCode = ErrorMapping.NoError @@ -78,12 +85,12 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, // If the partition does not exist locally, create it if(replicaManager.getPartition(topic, partition) == None){ - trace("the partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica".format(topic, partition)) + trace("The partition (%s, %d) does not exist locally, check if current broker is in assigned replicas, if so, start the local replica".format(topic, partition)) val assignedReplicas = ZkUtils.getReplicasForPartition(kafkaZookeeper.getZookeeperClient, topic, partition) - trace("assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString)) + trace("Assigned replicas list for topic [%s] partition [%d] is [%s]".format(topic, partition, assignedReplicas.toString)) if(assignedReplicas.contains(brokerId)) { val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet) - info("starting replica for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId)) + info("Starting replica for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId)) } } val replica = replicaManager.getReplica(topic, partition).get @@ -91,11 +98,11 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val requestedLeaderId = leaderAndISR.leader // If the broker is requested to be the leader and it's not current the leader (the leader id is set and not equal to broker id) if(requestedLeaderId == brokerId && (!replica.partition.leaderId().isDefined || replica.partition.leaderId().get != brokerId)){ - info("becoming the leader for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest)) + info("Becoming the leader for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest)) errorCode = becomeLeader(replica, leaderAndISR) } else if (requestedLeaderId != brokerId) { - info("becoming the follower for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest)) + info("Becoming the follower for partition [%s, %d] at the leader and isr request %s".format(topic, partition, leaderAndISRRequest)) errorCode = becomeFollower(replica, leaderAndISR) } @@ -105,7 +112,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){ replicaManager.startHighWaterMarksCheckPointThread val partitionsToRemove = replicaManager.allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).keySet - info("init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove)) + info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove)) partitionsToRemove.foreach(p => stopReplicaCbk(p._1, p._2)) } @@ -116,6 +123,10 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, def handleStopReplicaRequest(request: RequestChannel.Request){ val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer) + if(requestLogger.isTraceEnabled) + requestLogger.trace("Handling stop replica request " + stopReplicaRequest) + trace("Handling stop replica request " + stopReplicaRequest) + val responseMap = new HashMap[(String, Int), Short] for((topic, partition) <- stopReplicaRequest.stopReplicaSet){ @@ -133,12 +144,17 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) { var satisfied = new mutable.ArrayBuffer[DelayedFetch] for(partitionData <- partitionDatas) - satisfied ++= fetchRequestPurgatory.update((topic, partitionData.partition), partitionData) - trace("produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size)) + satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null) + trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size)) // send any newly unblocked responses for(fetchReq <- satisfied) { val topicData = readMessageSets(fetchReq.fetch) val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) + + val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId + delayedRequestMetrics.recordDelayedFetchSatisfied( + fromFollower, SystemTime.nanoseconds - fetchReq.creationTimeNs, response) + requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) } } @@ -150,43 +166,45 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val produceRequest = ProducerRequest.readFrom(request.request.buffer) val sTime = SystemTime.milliseconds if(requestLogger.isTraceEnabled) - requestLogger.trace("producer request %s".format(produceRequest.toString)) - trace("Broker %s received produce request %s".format(brokerId, produceRequest.toString)) + requestLogger.trace("Handling producer request " + request.toString) + trace("Handling producer request " + request.toString) val response = produceToLocalLog(produceRequest) - debug("produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - + debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) + + for (topicData <- produceRequest.data) + maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray) + if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 || - produceRequest.data.size <= 0) { + produceRequest.data.size <= 0) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) - - for (topicData <- produceRequest.data) - maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray) - } else { // create a list of (topic, partition) pairs to use as keys for this delayed request - val topicPartitionPairs = produceRequest.data.flatMap(topicData => { + val producerRequestKeys = produceRequest.data.flatMap(topicData => { val topic = topicData.topic topicData.partitionDataArray.map(partitionData => { - (topic, partitionData.partition) + RequestKey(topic, partitionData.partition) }) }) + val delayedProduce = new DelayedProduce( - topicPartitionPairs, request, + producerRequestKeys, request, response.errors, response.offsets, produceRequest, produceRequest.ackTimeoutMs.toLong) - produceRequestPurgatory.watch(delayedProduce) + producerRequestPurgatory.watch(delayedProduce) + /* * Replica fetch requests may have arrived (and potentially satisfied) - * delayedProduce requests before they even made it to the purgatory. + * delayedProduce requests while they were being added to the purgatory. * Here, we explicitly check if any of them can be satisfied. */ var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] - topicPartitionPairs.foreach(topicPartition => - satisfiedProduceRequests ++= - produceRequestPurgatory.update(topicPartition, topicPartition)) - debug("%d DelayedProduce requests unblocked after produce to local log.".format(satisfiedProduceRequests.size)) + producerRequestKeys.foreach(key => + satisfiedProduceRequests ++= + producerRequestPurgatory.update(key, key)) + debug(satisfiedProduceRequests.size + + " producer requests unblocked during produce to local log.") satisfiedProduceRequests.foreach(_.respond()) } } @@ -195,10 +213,11 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, * Helper method for handling a parsed producer request */ private def produceToLocalLog(request: ProducerRequest): ProducerResponse = { - trace("produce [%s] to local log ".format(request.toString)) + trace("Produce [%s] to local log ".format(request.toString)) val requestSize = request.topicPartitionCount val errors = new Array[Short](requestSize) val offsets = new Array[Long](requestSize) + var msgIndex = -1 for(topicData <- request.data) { for(partitionData <- topicData.partitionDataArray) { @@ -212,12 +231,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset) offsets(msgIndex) = log.logEndOffset errors(msgIndex) = ErrorMapping.NoError.toShort - trace(partitionData.messages.sizeInBytes + " bytes written to logs.") + trace("%d bytes written to logs, nextAppendOffset = %d" + .format(partitionData.messages.sizeInBytes, offsets(msgIndex))) } catch { case e => BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest - error("error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition, e) + error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e) e match { case _: IOException => fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) @@ -229,8 +249,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, } } } - val ret = new ProducerResponse(request.versionId, request.correlationId, errors, offsets) - ret + new ProducerResponse(request.versionId, request.correlationId, errors, offsets) } /** @@ -238,7 +257,10 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, */ def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = FetchRequest.readFrom(request.request.buffer) - trace("handling fetch request: " + fetchRequest.toString) + if(requestLogger.isTraceEnabled) + requestLogger.trace("Handling fetch request " + fetchRequest.toString) + trace("Handling fetch request " + fetchRequest.toString) + // validate the request try { fetchRequest.validate() @@ -255,12 +277,12 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] fetchRequest.offsetInfo.foreach(topicOffsetInfo => { topicOffsetInfo.partitions.foreach(partition => { - satisfiedProduceRequests ++= produceRequestPurgatory.update( - (topicOffsetInfo.topic, partition), (topicOffsetInfo.topic, partition) - ) + val key = RequestKey(topicOffsetInfo.topic, partition) + satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key) }) }) - debug("replica %d fetch unblocked %d DelayedProduce requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size)) + debug("Replica %d fetch unblocked %d producer requests." + .format(fetchRequest.replicaId, satisfiedProduceRequests.size)) satisfiedProduceRequests.foreach(_.respond()) } @@ -270,14 +292,15 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, availableBytes >= fetchRequest.minBytes || fetchRequest.numPartitions <= 0) { val topicData = readMessageSets(fetchRequest) - debug("returning fetch response %s for fetch request with correlation id %d".format(topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId)) + debug("Returning fetch response %s for fetch request with correlation id %d".format( + topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId)) val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { - debug("putting fetch request into purgatory") + debug("Putting fetch request into purgatory") // create a list of (topic, partition) pairs to use as keys for this delayed request - val topicPartitionPairs: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _))) - val delayedFetch = new DelayedFetch(topicPartitionPairs, request, fetchRequest, fetchRequest.maxWait, availableBytes) + val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(RequestKey(o.topic, _))) + val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait) fetchRequestPurgatory.watch(delayedFetch) } } @@ -298,16 +321,18 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, totalBytes += math.min(offsetDetail.fetchSizes(i), available) } catch { case e: InvalidPartitionException => - info("invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'") + info("Invalid partition %d in fetch request from client %d." + .format(offsetDetail.partitions(i), fetchRequest.clientId)) } } } + trace(totalBytes + " available bytes for fetch request.") totalBytes } private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) { val offsets = fetchRequest.offsetInfo - debug("act on update partition HW, check offset detail: %s ".format(offsets)) + debug("Act on update partition HW, check offset detail: %s ".format(offsets)) for(offsetDetail <- offsets) { val topic = offsetDetail.topic val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets) @@ -343,17 +368,20 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) val leaderReplicaOpt = replicaManager.getReplica(topic, partition, brokerId) - assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) + " must exist on leader broker %d".format(brokerId)) + assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d must exist on leader broker %d".format(topic, partition, brokerId)) val leaderReplica = leaderReplicaOpt.get fetchRequest.replicaId match { - case FetchRequest.NonFollowerId => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas + case FetchRequest.NonFollowerId => + // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) case _ => // fetch request from a follower val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId) assert(replicaOpt.isDefined, "No replica %d in replica manager on %d".format(fetchRequest.replicaId, brokerId)) val replica = replicaOpt.get - debug("leader [%d] for topic [%s] partition [%d] received fetch request from follower [%d]".format(brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) - debug("Leader %d returning %d messages for topic %s partition %d to follower %d".format(brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) + debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]" + .format(replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) + debug("Leader returning %d messages for topic %s partition %d to follower %d" + .format(messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId)) new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages) } } @@ -372,7 +400,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, try { // check if the current broker is the leader for the partitions kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition) - trace("fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) + trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) val log = logManager.getLog(topic, partition) response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty }) } catch { @@ -389,7 +417,9 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = OffsetRequest.readFrom(request.request.buffer) if(requestLogger.isTraceEnabled) - requestLogger.trace("offset request " + offsetRequest.toString) + requestLogger.trace("Handling offset request " + offsetRequest.toString) + trace("Handling offset request " + offsetRequest.toString) + var response: OffsetResponse = null try { kafkaZookeeper.ensurePartitionLeaderOnThisBroker(offsetRequest.topic, offsetRequest.partition) @@ -412,11 +442,14 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer) if(requestLogger.isTraceEnabled) - requestLogger.trace("topic metadata request " + metadataRequest.toString()) + requestLogger.trace("Handling topic metadata request " + metadataRequest.toString()) + trace("Handling topic metadata request " + metadataRequest.toString()) + val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() val zkClient = kafkaZookeeper.getZookeeperClient var errorCode = ErrorMapping.NoError val config = logManager.config + try { val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient) metadataRequest.topics.zip(topicMetadataList).foreach( @@ -452,33 +485,43 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, } def close() { - debug("shut down") + debug("Shutting down.") fetchRequestPurgatory.shutdown() - produceRequestPurgatory.shutdown() - debug("shutted down completely") + producerRequestPurgatory.shutdown() + debug("Shut down complete.") } + private [kafka] trait MetricKey { + def keyLabel: String + } + private [kafka] object MetricKey { + val globalLabel = "all" + } + + private [kafka] case class RequestKey(topic: String, partition: Int) + extends MetricKey { + override def keyLabel = "%s-%d".format(topic, partition) + } /** * A delayed fetch request */ - class DelayedFetch(keys: Seq[Any], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) extends DelayedRequest(keys, request, delayMs) { - val bytesAccumulated = new AtomicLong(initialSize) - } + class DelayedFetch(keys: Seq[RequestKey], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long) + extends DelayedRequest(keys, request, delayMs) /** * A holding pen for fetch requests waiting to be satisfied */ - class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData]("Fetch Request Purgatory on Broker " + brokerId + ", ") { + class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) { + + this.logIdent = "FetchRequestPurgatory-%d ".format(brokerId) + + override def metricsGroupIdent = metricsGroup /** * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field */ - def checkSatisfied(partitionData: PartitionData, delayedFetch: DelayedFetch): Boolean = { - val messageDataSize = partitionData.messages.sizeInBytes - val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize) - debug("fetch request check, accm size: " + accumulatedSize + " delay fetch min bytes: " + delayedFetch.fetch.minBytes) - accumulatedSize >= delayedFetch.fetch.minBytes - } + def checkSatisfied(n: Null, delayedFetch: DelayedFetch): Boolean = + availableFetchBytes(delayedFetch.fetch) >= delayedFetch.fetch.minBytes /** * When a request expires just answer it with whatever data is present @@ -486,11 +529,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, def expire(delayed: DelayedFetch) { val topicData = readMessageSets(delayed.fetch) val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) + val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId + delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) } } - class DelayedProduce(keys: Seq[Any], + class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, localErrors: Array[Short], requiredOffsets: Array[Long], @@ -504,7 +549,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, * values are effectively synchronized by the ProducerRequestPurgatory's * update method */ - private val partitionStatus = keys.map(key => { + private [kafka] val partitionStatus = keys.map(key => { val keyIndex = keys.indexOf(key) // if there was an error in writing to the local replica's log, then don't // wait for acks on this partition @@ -525,13 +570,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, def respond() { val errorsAndOffsets: (List[Short], List[Long]) = ( - keys.foldRight - ((List[Short](), List[Long]())) - ((key: Any, result: (List[Short], List[Long])) => { - val status = partitionStatus(key) - (status.error :: result._1, status.requiredOffset :: result._2) - }) - ) + keys.foldRight + ((List[Short](), List[Long]())) + ((key: RequestKey, result: (List[Short], List[Long])) => { + val status = partitionStatus(key) + (status.error :: result._1, status.requiredOffset :: result._2) + }) + ) val response = new ProducerResponse(produce.versionId, produce.correlationId, errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray) @@ -550,9 +595,14 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, * As partitions become acknowledged, we may be able to unblock * DelayedFetchRequests that are pending on those partitions. */ - def isSatisfied(followerFetchPartition: (String, Int)) = { - val (topic, partitionId) = followerFetchPartition - val fetchPartitionStatus = partitionStatus(followerFetchPartition) + def isSatisfied(followerFetchRequestKey: RequestKey) = { + val topic = followerFetchRequestKey.topic + val partitionId = followerFetchRequestKey.partition + val key = RequestKey(topic, partitionId) + val fetchPartitionStatus = partitionStatus(key) + val durationNs = SystemTime.nanoseconds - creationTimeNs + trace("Checking producer request satisfaction for %s-%d, acksPending = %b" + .format(topic, partitionId, fetchPartitionStatus.acksPending)) if (fetchPartitionStatus.acksPending) { val leaderReplica = replicaManager.getLeaderReplica(topic, partitionId) leaderReplica match { @@ -560,14 +610,16 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, if (leader.isLocal) { val isr = leader.partition.inSyncReplicas val numAcks = isr.count(r => { - if (!r.isLocal) - r.logEndOffset() >= partitionStatus(followerFetchPartition).requiredOffset + if (!r.isLocal) { + r.logEndOffset() >= partitionStatus(key).requiredOffset + } else true /* also count the local (leader) replica */ }) - trace("Received %d/%d acks for produce request to %s-%d".format( + + trace("Received %d/%d acks for producer request to %s-%d; isr size = %d".format( numAcks, produce.requiredAcks, - topic, partitionId)) + topic, partitionId, isr.size)) if ((produce.requiredAcks < 0 && numAcks >= isr.size) || (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) { /* @@ -575,12 +627,16 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, * are fully caught up to the (local) leader's offset * corresponding to this produce request. */ + fetchPartitionStatus.acksPending = false fetchPartitionStatus.error = ErrorMapping.NoError val topicData = produce.data.find(_.topic == topic).get val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get + delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key, + durationNs, + partitionData.sizeInBytes) maybeUnblockDelayedFetchRequests( topic, Array(partitionData)) } @@ -597,7 +653,10 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, } // unblocked if there are no partitions with pending acks - ! partitionStatus.exists(p => p._2.acksPending) + val satisfied = ! partitionStatus.exists(p => p._2.acksPending) + if (satisfied) + delayedRequestMetrics.recordDelayedProduceSatisfied(durationNs) + satisfied } class PartitionStatus(var acksPending: Boolean, @@ -618,18 +677,156 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, /** * A holding pen for produce requests waiting to be satisfied. */ - private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, (String, Int)]("Producer Request Purgatory on Broker " + brokerId + ", ") { + private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) { - protected def checkSatisfied(fetchRequestPartition: (String, Int), + + this.logIdent = "ProducerRequestPurgatory-%d ".format(brokerId) + + override def metricsGroupIdent = metricsGroup + + protected def checkSatisfied(followerFetchRequestKey: RequestKey, delayedProduce: DelayedProduce) = - delayedProduce.isSatisfied(fetchRequestPartition) + delayedProduce.isSatisfied(followerFetchRequestKey) /** * Handle an expired delayed request */ protected def expire(delayedProduce: DelayedProduce) { + for (partitionStatus <- delayedProduce.partitionStatus if partitionStatus._2.acksPending) + delayedRequestMetrics.recordDelayedProducerKeyExpired(partitionStatus._1) + delayedProduce.respond() } } + + private class DelayedRequestMetrics { + + + private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { + override def metricsGroupIdent = metricsGroup + val caughtUpFollowerFetchRequestMeter = + newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS) + val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel) + Some(newHistogram("FollowerCatchUpTimeInNs", biased = true)) + else None + + /* + * Note that throughput is updated on individual key satisfaction. + * Therefore, it is an upper bound on throughput since the + * DelayedProducerRequest may get expired. + */ + val throughputMeter = newMeter("Throughput-" + keyLabel, "bytes", TimeUnit.SECONDS) + val expiredRequestMeter = newMeter("ExpiredRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS) + + val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel) + Some(newMeter("SatisfiedRequestsPerSecond", "requests", TimeUnit.SECONDS)) + else None + val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel) + Some(newHistogram("SatisfactionTimeInNs", biased = true)) + else None + } + + + private class DelayedFetchRequestMetrics(forFollower: Boolean, + keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { + private val metricPrefix = if (forFollower) "Follower" else "NonFollower" + + override def metricsGroupIdent = metricsGroup + val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel) + Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond", + "requests", TimeUnit.SECONDS)) + else None + + val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel) + Some(newHistogram(metricPrefix + "-SatisfactionTimeInNs", biased = true)) + else None + + val expiredRequestMeter = if (keyLabel == MetricKey.globalLabel) + Some(newMeter(metricPrefix + "-ExpiredRequestsPerSecond", + "requests", TimeUnit.SECONDS)) + else None + + val throughputMeter = newMeter("%s-Throughput-%s".format(metricPrefix, keyLabel), + "bytes", TimeUnit.SECONDS) + } + + + private val producerRequestMetricsForKey = + new Pool[MetricKey, DelayedProducerRequestMetrics] + private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics + + private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) + private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) + + private val followerFetchRequestMetricsForKey = + new Pool[MetricKey, DelayedFetchRequestMetrics] + private val nonFollowerFetchRequestMetricsForKey = + new Pool[MetricKey, DelayedFetchRequestMetrics] + + + def recordDelayedProducerKeyExpired(key: MetricKey) { + val keyMetrics = producerRequestMetricsForKey.getAndMaybePut( + key, () => new DelayedProducerRequestMetrics(key.keyLabel)) + List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) + } + + + def recordDelayedProducerKeyCaughtUp(key: MetricKey, timeToCatchUpNs: Long, bytes: Int) { + val keyMetrics = producerRequestMetricsForKey.getAndMaybePut( + key, () => new DelayedProducerRequestMetrics(key.keyLabel)) + List(keyMetrics, aggregateProduceRequestMetrics).foreach(m => { + m.caughtUpFollowerFetchRequestMeter.mark() + m.followerCatchUpTimeHistogram.foreach(_.update(timeToCatchUpNs)) + m.throughputMeter.mark(bytes) + }) + } + + + def recordDelayedProduceSatisfied(timeToSatisfyNs: Long) { + aggregateProduceRequestMetrics.satisfiedRequestMeter.foreach(_.mark()) + aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs)) + } + + + private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) { + val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics + else aggregateNonFollowerFetchRequestMetrics + metrics.throughputMeter.mark(response.sizeInBytes) + + response.topicMap.foreach(topicAndData => { + val topic = topicAndData._1 + topicAndData._2.partitionDataArray.foreach(partitionData => { + val key = RequestKey(topic, partitionData.partition) + val makeNewMetrics = () => new DelayedFetchRequestMetrics(forFollower, key.keyLabel) + val keyMetrics = if (forFollower) + followerFetchRequestMetricsForKey.getAndMaybePut(key, makeNewMetrics) + else + nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key, makeNewMetrics) + keyMetrics.throughputMeter.mark(partitionData.sizeInBytes) + }) + }) + } + + + def recordDelayedFetchExpired(forFollower: Boolean, response: FetchResponse) { + val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics + else aggregateNonFollowerFetchRequestMetrics + + metrics.expiredRequestMeter.foreach(_.mark()) + + recordDelayedFetchThroughput(forFollower, response) + } + + + def recordDelayedFetchSatisfied(forFollower: Boolean, timeToSatisfyNs: Long, response: FetchResponse) { + val aggregateMetrics = if (forFollower) aggregateFollowerFetchRequestMetrics + else aggregateNonFollowerFetchRequestMetrics + + aggregateMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs)) + aggregateMetrics.satisfiedRequestMeter.foreach(_.mark()) + + recordDelayedFetchThroughput(forFollower, response) + } + } } diff --git core/src/main/scala/kafka/server/KafkaConfig.scala core/src/main/scala/kafka/server/KafkaConfig.scala index b39a090..d261a51 100644 --- core/src/main/scala/kafka/server/KafkaConfig.scala +++ core/src/main/scala/kafka/server/KafkaConfig.scala @@ -24,6 +24,7 @@ import kafka.consumer.ConsumerConfig import java.net.InetAddress + /** * Configuration settings for the kafka server */ @@ -35,7 +36,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { val hostName: String = Utils.getString(props, "hostname", InetAddress.getLocalHost.getHostAddress) /* the broker id for this server */ - val brokerId: Int = Utils.getInt(props, "brokerid") + val brokerId: Int = Utils.getInt(props, "brokerid", 0) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024) @@ -138,7 +139,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500) /** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */ - val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086) + val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4096) /* number of fetcher threads used to replicate messages from a source broker. * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ diff --git core/src/main/scala/kafka/server/RequestPurgatory.scala core/src/main/scala/kafka/server/RequestPurgatory.scala index 83efe53..49efce5 100644 --- core/src/main/scala/kafka/server/RequestPurgatory.scala +++ core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -23,6 +23,9 @@ import java.util.concurrent._ import java.util.concurrent.atomic._ import kafka.network._ import kafka.utils._ +import com.yammer.metrics.core.Gauge +import kafka.metrics.KafkaMetricsGroup + /** * A request whose processing needs to be delayed for at most the given delayMs @@ -30,6 +33,7 @@ import kafka.utils._ * for example a key could be a (topic, partition) pair. */ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { + val creationTimeNs = SystemTime.nanoseconds val satisfied = new AtomicBoolean(false) } @@ -58,13 +62,41 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * this function handles delayed requests that have hit their time limit without being satisfied. * */ -abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) extends Logging{ - this.logIdent = logPrefix +abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) extends Logging with KafkaMetricsGroup { + /* a list of requests watching each key */ - private val watchersForKey = new ConcurrentHashMap[Any, Watchers] + private val watchersForKey = new Pool[Any, Watchers] + + private val numDelayedRequestsBeanName = "NumDelayedRequests" + private val timeToSatisfyHistogramBeanName = "TimeToSatisfyInNs" + private val satisfactionRateBeanName = "SatisfactionRate" + private val expirationRateBeanName = "ExpirationRate" + + override def metricsGroupIdent = "" + + val satisfactionRateMeter = newMeter( + satisfactionRateBeanName, + "requests", + TimeUnit.SECONDS + ) + + val timeToSatisfyHistogram = newHistogram(timeToSatisfyHistogramBeanName, biased = true) + + newGauge( + numDelayedRequestsBeanName, + new Gauge[Int] { + def value() = expiredRequestReaper.unsatisfied.get() + } + ) + + val expirationRateMeter = newMeter( + expirationRateBeanName, + "requests", + TimeUnit.SECONDS + ) /* background thread expiring requests that have been waiting too long */ - private val expiredRequestReaper = new ExpiredRequestReaper(logPrefix) + private val expiredRequestReaper = new ExpiredRequestReaper private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper) expirationThread.start() @@ -89,16 +121,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten else w.collectSatisfiedRequests(request) } - - private def watchersFor(key: Any): Watchers = { - var lst = watchersForKey.get(key) - if(lst == null) { - watchersForKey.putIfAbsent(key, new Watchers) - lst = watchersForKey.get(key) - } - lst - } - + + private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key, () => new Watchers) + /** * Check if this request satisfied this delayed request */ @@ -117,7 +142,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten } /** - * A linked list of DelayedRequests watching some key with some associated bookeeping logic + * A linked list of DelayedRequests watching some key with some associated + * bookkeeping logic. */ private class Watchers { @@ -132,10 +158,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten def add(t: T) { synchronized { - requests.add(t) - liveCount += 1 - maybePurge() - } + requests.add(t) + liveCount += 1 + maybePurge() + } } private def maybePurge() { @@ -151,32 +177,39 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten def decLiveCount() { synchronized { - liveCount -= 1 - } + liveCount -= 1 + } } def collectSatisfiedRequests(request: R): Seq[T] = { val response = new mutable.ArrayBuffer[T] synchronized { - val iter = requests.iterator() - while(iter.hasNext) { - val curr = iter.next - if(curr.satisfied.get) { - // another thread has satisfied this request, remove it - iter.remove() - } else { - if(checkSatisfied(request, curr)) { - iter.remove() - val updated = curr.satisfied.compareAndSet(false, true) - if(updated == true) { - response += curr - liveCount -= 1 - expiredRequestReaper.satisfyRequest() - } - } - } - } - } + val iter = requests.iterator() + while(iter.hasNext) { + val curr = iter.next + if(curr.satisfied.get) { + // another thread has satisfied this request, remove it + iter.remove() + } else { + // synchronize on curr to avoid any race condition with expire + // on client-side. + val satisfied = curr synchronized checkSatisfied(request, curr) + if(satisfied) { + iter.remove() + val updated = curr.satisfied.compareAndSet(false, true) + if(updated == true) { + val requestNs = SystemTime.nanoseconds - curr.creationTimeNs + satisfactionRateMeter.mark() + timeToSatisfyHistogram.update(requestNs) + + response += curr + liveCount -= 1 + expiredRequestReaper.satisfyRequest() + } + } + } + } + } response } } @@ -184,9 +217,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten /** * Runnable to expire requests that have sat unfullfilled past their deadline */ - private class ExpiredRequestReaper(logPrefix: String) extends Runnable with Logging { - this.logIdent = "ExpiredRequestReaper for " + logPrefix - + private class ExpiredRequestReaper extends Runnable with Logging { + this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId) /* a few magic parameters to help do cleanup to avoid accumulating old watchers */ private val CleanupThresholdSize = 100 private val CleanupThresholdPrct = 0.5 @@ -196,14 +228,16 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten private val shutdownLatch = new CountDownLatch(1) private val needsPurge = new AtomicBoolean(false) /* The count of elements in the delay queue that are unsatisfied */ - private val unsatisfied = new AtomicInteger(0) + private [kafka] val unsatisfied = new AtomicInteger(0) /** Main loop for the expiry thread */ def run() { while(running.get) { try { val curr = pollExpired() - expire(curr) + curr synchronized { + expire(curr) + } } catch { case ie: InterruptedException => if(needsPurge.getAndSet(false)) { @@ -232,11 +266,11 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten /** Shutdown the expiry thread*/ def shutdown() { - debug("shutting down") + debug("Shutting down.") running.set(false) expirationThread.interrupt() shutdownLatch.await() - debug("shut down completely") + debug("Shut down complete.") } /** Record the fact that we satisfied a request in the stats for the expiry queue */ @@ -250,6 +284,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) exten val curr = delayed.take() val updated = curr.satisfied.compareAndSet(false, true) if(updated) { + expirationRateMeter.mark() unsatisfied.getAndDecrement() for(key <- curr.keys) watchersFor(key).decLiveCount() diff --git core/src/main/scala/kafka/utils/Pool.scala core/src/main/scala/kafka/utils/Pool.scala index d62fa77..ff53eee 100644 --- core/src/main/scala/kafka/utils/Pool.scala +++ core/src/main/scala/kafka/utils/Pool.scala @@ -27,14 +27,23 @@ class Pool[K,V] extends Iterable[(K, V)] { def this(m: collection.Map[K, V]) { this() - for((k,v) <- m.elements) - pool.put(k, v) + m.foreach(kv => pool.put(kv._1, kv._2)) } def put(k: K, v: V) = pool.put(k, v) def putIfNotExists(k: K, v: V) = pool.putIfAbsent(k, v) - + + def getAndMaybePut(key: K, createValueIfAbsent:() => V) = { + val curr = pool.get(key) + if (curr == null) { + pool.putIfAbsent(key, createValueIfAbsent()) + pool.get(key) + } + else + curr + } + def contains(id: K) = pool.containsKey(id) def get(key: K): V = pool.get(key) @@ -46,7 +55,7 @@ class Pool[K,V] extends Iterable[(K, V)] { def values: Iterable[V] = JavaConversions.asIterable(new ArrayList[V](pool.values())) - def clear: Unit = pool.clear() + def clear() { pool.clear() } override def size = pool.size diff --git core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala index 7c73f06..d1cd847 100644 --- core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala +++ core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala @@ -76,7 +76,7 @@ class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness wit fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") } catch { - case e: InvalidMessageSizeException => println("This is good") + case e: InvalidMessageSizeException => "This is good" } // test ZookeeperConsumer diff --git core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 66d641a..b99eaa5 100644 --- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -103,8 +103,10 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val logManager = EasyMock.createMock(classOf[LogManager]) val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper]) val replicaManager = EasyMock.createMock(classOf[ReplicaManager]) + EasyMock.expect(replicaManager.config).andReturn(configs.head) EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient) EasyMock.expect(logManager.config).andReturn(configs.head) + EasyMock.replay(replicaManager) EasyMock.replay(logManager) EasyMock.replay(kafkaZookeeper) diff --git core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala new file mode 100644 index 0000000..351e140 --- /dev/null +++ core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala @@ -0,0 +1,64 @@ +package unit.kafka.metrics + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.junit.Test +import org.scalatest.junit.JUnit3Suite +import java.util.concurrent.TimeUnit +import junit.framework.Assert._ +import kafka.metrics.KafkaTimer +import com.yammer.metrics.core.{MetricsRegistry, Clock} + +class KafkaTimerTest extends JUnit3Suite { + + @Test + def testKafkaTimer() { + val clock = new ManualClock + val testRegistry = new MetricsRegistry(clock) + val metric = testRegistry.newTimer(this.getClass, "TestTimer") + + val timer = new KafkaTimer(metric) + timer.time { + clock.addMillis(1000) + } + assertEquals(1, metric.count()) + assertTrue((metric.max() - 1000).abs <= Double.Epsilon) + assertTrue((metric.min() - 1000).abs <= Double.Epsilon) + } + + class ManualClock extends Clock { + + private var ticksInNanos = 0L + + override def tick() = { + ticksInNanos + } + + override def time() = { + TimeUnit.NANOSECONDS.toMillis(ticksInNanos) + } + + def addMillis(millis: Long) { + ticksInNanos += TimeUnit.MILLISECONDS.toNanos(millis) + } + + def addHours(hours: Long) { + ticksInNanos += TimeUnit.HOURS.toNanos(hours) + } + } +} diff --git core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 6237432..380c3a7 100644 --- core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -18,26 +18,28 @@ package kafka.server import scala.collection._ -import org.junit.{After, Before, Test} +import org.junit.Test import junit.framework.Assert._ import kafka.message._ import kafka.api._ import kafka.utils.TestUtils +import org.scalatest.junit.JUnit3Suite -class RequestPurgatoryTest { + +class RequestPurgatoryTest extends JUnit3Suite { val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes))) val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes))) var purgatory: MockRequestPurgatory = null - @Before - def setup() { + override def setUp() { + super.setUp() purgatory = new MockRequestPurgatory() } - @After - def teardown() { + override def tearDown() { purgatory.shutdown() + super.tearDown() } @Test @@ -54,7 +56,7 @@ class RequestPurgatoryTest { assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size) purgatory.satisfied += r2 assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2)) - assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) + assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size) } @Test @@ -73,7 +75,7 @@ class RequestPurgatoryTest { assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L) } - class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest]("Mock Request Purgatory") { + class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] { val satisfied = mutable.Set[DelayedRequest]() val expired = mutable.Set[DelayedRequest]() def awaitExpiration(delayed: DelayedRequest) = { diff --git project/build/KafkaProject.scala project/build/KafkaProject.scala index 5ad1739..0389c59 100644 --- project/build/KafkaProject.scala +++ project/build/KafkaProject.scala @@ -16,7 +16,7 @@ */ import sbt._ -import scala.xml.{Node, Elem, NodeSeq} +import scala.xml.{Node, Elem} import scala.xml.transform.{RewriteRule, RuleTransformer} class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject { @@ -60,11 +60,11 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje def zkClientDep = - zkclient - zkclient - 20120522 - compile - + zkclient + zkclient + 20120522 + compile + object ZkClientDepAdder extends RuleTransformer(new RewriteRule() { override def transform(node: Node): Seq[Node] = node match { @@ -251,6 +251,9 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje trait CoreDependencies { val log4j = "log4j" % "log4j" % "1.2.15" val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2" + private val metricsVersion = "latest.release" + val metricsCore = "com.yammer.metrics" % "metrics-core" % metricsVersion + val slf4jSimple = "org.slf4j" % "slf4j-simple" % "latest.release" } trait HadoopDependencies { @@ -264,5 +267,4 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje trait CompressionDependencies { val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1" } - }