From 454288ce1fe0139ae8e4f9dc3cddce011d67489a Mon Sep 17 00:00:00 2001 From: Joel Koshy Date: Tue, 17 Feb 2015 16:47:37 -0800 Subject: [PATCH] KAFKA-1953; KAFKA-1962; Disambiguate purgatory metrics; restore delayed request metrics --- .../kafka/coordinator/ConsumerCoordinator.scala | 6 ++--- .../scala/kafka/coordinator/DelayedHeartbeat.scala | 4 +++ .../scala/kafka/coordinator/DelayedJoinGroup.scala | 4 +++ .../scala/kafka/coordinator/DelayedRebalance.scala | 4 +++ .../src/main/scala/kafka/server/DelayedFetch.scala | 20 +++++++++++++- .../main/scala/kafka/server/DelayedOperation.scala | 22 +++++++++++---- .../main/scala/kafka/server/DelayedProduce.scala | 31 ++++++++++++++++++++++ .../main/scala/kafka/server/ReplicaManager.scala | 9 ++++--- .../unit/kafka/server/DelayedOperationTest.scala | 6 ++++- 9 files changed, 93 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 01cf1d9..21790a5 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -71,9 +71,9 @@ class ConsumerCoordinator(val config: KafkaConfig, latestHeartbeatBucketEndMs = SystemTime.milliseconds // Initialize purgatories for delayed heartbeat, join-group and rebalance operations - heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId) - joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId) - rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId) + heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](purgatoryName = "Heartbeat", brokerId = config.brokerId) + joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](purgatoryName = "JoinGroup", brokerId = config.brokerId) + rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](purgatoryName = "Rebalance", brokerId = config.brokerId) } diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala index 894d6ed..b1248e9 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala @@ -36,6 +36,10 @@ class DelayedHeartbeat(sessionTimeout: Long, throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket") } + override def onExpiration() { + // TODO + } + /* mark all consumers within the heartbeat as heartbeat timed out */ override def onComplete() { for (registry <- bucket.consumerRegistryList) diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala index 445bfa1..df60cbc 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala @@ -35,6 +35,10 @@ class DelayedJoinGroup(sessionTimeout: Long, forceComplete() } + override def onExpiration() { + // TODO + } + /* always assume the partition is already assigned as this delayed operation should never time-out */ override def onComplete() { diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala index b3b3749..8defa2e 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala @@ -50,6 +50,10 @@ class DelayedRebalance(sessionTimeout: Long, false } + override def onExpiration() { + // TODO + } + /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */ override def onComplete() { groupRegistry.memberRegistries.values.foreach(consumerRegistry => diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index dd602ee..de6cf5b 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -17,11 +17,14 @@ package kafka.server +import java.util.concurrent.TimeUnit + import kafka.api.FetchResponsePartitionData import kafka.api.PartitionFetchInfo import kafka.common.UnknownTopicOrPartitionException import kafka.common.NotLeaderForPartitionException import kafka.common.TopicAndPartition +import kafka.metrics.KafkaMetricsGroup import scala.collection._ @@ -37,6 +40,7 @@ case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInf case class FetchMetadata(fetchMinBytes: Int, fetchOnlyLeader: Boolean, fetchOnlyCommitted: Boolean, + isFromFollower: Boolean, fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) { override def toString = "[minBytes: " + fetchMinBytes + ", " + @@ -109,6 +113,13 @@ class DelayedFetch(delayMs: Long, false } + override def onExpiration() { + if (fetchMetadata.isFromFollower) + DelayedFetchMetrics.followerExpiredRequestMeter.mark() + else + DelayedFetchMetrics.consumerExpiredRequestMeter.mark() + } + /** * Upon completion, read whatever data is available and pass to the complete callback */ @@ -122,4 +133,11 @@ class DelayedFetch(delayMs: Long, responseCallback(fetchPartitionData) } -} \ No newline at end of file +} + +object DelayedFetchMetrics extends KafkaMetricsGroup { + private val FetcherTypeKey = "fetcherType" + val followerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "follower")) + val consumerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "consumer")) +} + diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index fc06b01..e60dc2f 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -71,6 +71,11 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { def isCompleted(): Boolean = completed.get() /** + * Call-back to execute when a delayed operation expires, but before completion. + */ + def onExpiration(): Unit + + /** * Process for completing an operation; This function needs to be defined * in subclasses and will be called exactly once in forceComplete() */ @@ -89,7 +94,7 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { /** * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. */ -class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeInterval: Int = 1000) +class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { /* a list of operation watching keys */ @@ -98,18 +103,22 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeI /* background thread expiring operations that have timed out */ private val expirationReaper = new ExpiredOperationReaper + private val metricsTags = Map("delayedOperation" -> purgatoryName) + newGauge( "PurgatorySize", new Gauge[Int] { def value = watched() - } + }, + metricsTags ) newGauge( "NumDelayedOperations", new Gauge[Int] { def value = delayed() - } + }, + metricsTags ) expirationReaper.start() @@ -271,8 +280,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeI val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS) if (curr != null.asInstanceOf[T]) { // if there is an expired operation, try to force complete it - if (curr synchronized curr.forceComplete()) { - debug("Force complete expired delayed operation %s".format(curr)) + curr synchronized { + curr.onExpiration() + if (curr.forceComplete()) { + debug("Force complete expired delayed operation %s".format(curr)) + } } } } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index c229088..4d763bf 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -18,9 +18,14 @@ package kafka.server +import java.util.concurrent.TimeUnit + +import com.yammer.metrics.core.Meter import kafka.api.ProducerResponseStatus import kafka.common.ErrorMapping import kafka.common.TopicAndPartition +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.Pool import scala.collection._ @@ -110,6 +115,14 @@ class DelayedProduce(delayMs: Long, false } + override def onExpiration() { + produceMetadata.produceStatus.foreach { case (topicPartition, status) => + if (status.acksPending) { + DelayedProduceMetrics.recordExpiration(topicPartition) + } + } + } + /** * Upon completion, return the current response status along with the error code per partition */ @@ -118,3 +131,21 @@ class DelayedProduce(delayMs: Long, responseCallback(responseStatus) } } + +object DelayedProduceMetrics extends KafkaMetricsGroup { + + private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS) + + private val partitionExpirationMeterFactory = (key: TopicAndPartition) => + newMeter("ExpiresPerSec", + "requests", + TimeUnit.SECONDS, + tags = Map("topic" -> key.topic, "partition" -> key.partition.toString)) + private val partitionExpirationMeters = new Pool[TopicAndPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory)) + + def recordExpiration(partition: TopicAndPartition) { + aggregateExpirationMeter.mark() + partitionExpirationMeters.getAndMaybePut(partition).mark() + } +} + diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ce36cc7..f97d207 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -82,8 +82,10 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger - val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests) - val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) + val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( + purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests) + val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( + purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) newGauge( @@ -385,6 +387,7 @@ class ReplicaManager(val config: KafkaConfig, fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { + val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) @@ -414,7 +417,7 @@ class ReplicaManager(val config: KafkaConfig, val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => (topicAndPartition, FetchPartitionStatus(result.info.fetchOffset, fetchInfo.get(topicAndPartition).get)) } - val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, fetchPartitionStatus) + val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 93f52d3..7a37617 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -28,7 +28,7 @@ class DelayedOperationTest extends JUnit3Suite { override def setUp() { super.setUp() - purgatory = new DelayedOperationPurgatory[MockDelayedOperation](0, 5) + purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", 0, 5) } override def tearDown() { @@ -114,6 +114,10 @@ class DelayedOperationTest extends JUnit3Suite { false } + override def onExpiration() { + + } + override def onComplete() { synchronized { notify() -- 2.1.2