From ffeba15d6629f0fe518a1062d480bb7f958b9d56 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 31 Oct 2014 09:50:54 -0700 Subject: [PATCH 1/2] v1 --- core/src/main/scala/kafka/cluster/Partition.scala | 12 +- .../src/main/scala/kafka/server/DelayedFetch.scala | 10 +- .../main/scala/kafka/server/DelayedOperation.scala | 316 ++++++++++++++++++++ .../scala/kafka/server/DelayedOperationKey.scala | 38 +++ .../main/scala/kafka/server/DelayedProduce.scala | 14 +- .../main/scala/kafka/server/ReplicaManager.scala | 42 +-- .../main/scala/kafka/server/RequestPurgatory.scala | 317 --------------------- .../unit/kafka/server/DelayedOperationTest.scala | 124 ++++++++ .../unit/kafka/server/RequestPurgatoryTest.scala | 124 -------- system_test/metrics.json | 4 +- 10 files changed, 519 insertions(+), 482 deletions(-) create mode 100644 core/src/main/scala/kafka/server/DelayedOperation.scala create mode 100644 core/src/main/scala/kafka/server/DelayedOperationKey.scala delete mode 100644 core/src/main/scala/kafka/server/RequestPurgatory.scala create mode 100644 core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala delete mode 100644 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b9fde2a..8df7fe4 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -17,18 +17,18 @@ package kafka.cluster import kafka.common._ -import kafka.admin.AdminUtils import kafka.utils._ +import kafka.utils.Utils.{inReadLock,inWriteLock} +import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{LogOffsetMetadata, OffsetManager, ReplicaManager} +import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.utils.Utils.{inReadLock,inWriteLock} import scala.collection.immutable.Set import com.yammer.metrics.core.Gauge @@ -343,8 +343,8 @@ class Partition(val topic: String, if(oldHighWatermark.precedes(newHighWatermark)) { leaderReplica.highWatermark = newHighWatermark debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) - // some delayed requests may be unblocked after HW changed - val requestKey = new TopicAndPartition(this.topic, this.partitionId) + // some delayed operations may be unblocked after HW changed + val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId) replicaManager.tryCompleteDelayedFetch(requestKey) replicaManager.tryCompleteDelayedProduce(requestKey) } else { @@ -414,7 +414,7 @@ class Partition(val topic: String, val info = log.append(messages, assignOffsets = true) // probably unblock some follower fetch requests since log end offset has been updated - replicaManager.tryCompleteDelayedFetch(new TopicAndPartition(this.topic, this.partitionId)) + replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) info diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 1e2e56f..dd602ee 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -32,7 +32,7 @@ case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInf } /** - * The fetch metadata maintained by the delayed produce request + * The fetch metadata maintained by the delayed fetch operation */ case class FetchMetadata(fetchMinBytes: Int, fetchOnlyLeader: Boolean, @@ -45,17 +45,17 @@ case class FetchMetadata(fetchMinBytes: Int, "partitionStatus: " + fetchPartitionStatus + "]" } /** - * A delayed fetch request that can be created by the replica manager and watched - * in the fetch request purgatory + * A delayed fetch operation that can be created by the replica manager and watched + * in the fetch operation purgatory */ class DelayedFetch(delayMs: Long, fetchMetadata: FetchMetadata, replicaManager: ReplicaManager, responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) - extends DelayedRequest(delayMs) { + extends DelayedOperation(delayMs) { /** - * The request can be completed if: + * The operation can be completed if: * * Case A: This broker is no longer the leader for some partitions it tries to fetch * Case B: This broker does not know of some partitions it tries to fetch diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala new file mode 100644 index 0000000..fc06b01 --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils._ +import kafka.metrics.KafkaMetricsGroup + +import java.util +import java.util.concurrent._ +import java.util.concurrent.atomic._ +import scala.collection._ + +import com.yammer.metrics.core.Gauge + + +/** + * An operation whose processing needs to be delayed for at most the given delayMs. For example + * a delayed produce operation could be waiting for specified number of acks; or + * a delayed fetch operation could be waiting for a given number of bytes to accumulate. + * + * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once. + * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either + * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed, + * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls + * forceComplete(). + * + * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete(). + */ +abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { + private val completed = new AtomicBoolean(false) + + /* + * Force completing the delayed operation, if not already completed. + * This function can be triggered when + * + * 1. The operation has been verified to be completable inside tryComplete() + * 2. The operation has expired and hence needs to be completed right now + * + * Return true iff the operation is completed by the caller: note that + * concurrent threads can try to complete the same operation, but only + * the first thread will succeed in completing the operation and return + * true, others will still return false + */ + def forceComplete(): Boolean = { + if (completed.compareAndSet(false, true)) { + onComplete() + true + } else { + false + } + } + + /** + * Check if the delayed operation is already completed + */ + def isCompleted(): Boolean = completed.get() + + /** + * Process for completing an operation; This function needs to be defined + * in subclasses and will be called exactly once in forceComplete() + */ + def onComplete(): Unit + + /* + * Try to complete the delayed operation by first checking if the operation + * can be completed by now. If yes execute the completion logic by calling + * forceComplete() and return true iff forceComplete returns true; otherwise return false + * + * This function needs to be defined in subclasses + */ + def tryComplete(): Boolean +} + +/** + * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. + */ +class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeInterval: Int = 1000) + extends Logging with KafkaMetricsGroup { + + /* a list of operation watching keys */ + private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + + /* background thread expiring operations that have timed out */ + private val expirationReaper = new ExpiredOperationReaper + + newGauge( + "PurgatorySize", + new Gauge[Int] { + def value = watched() + } + ) + + newGauge( + "NumDelayedOperations", + new Gauge[Int] { + def value = delayed() + } + ) + + expirationReaper.start() + + /** + * Check if the operation can be completed, if not watch it based on the given watch keys + * + * Note that a delayed operation can be watched on multiple keys. It is possible that + * an operation is completed after it has been added to the watch list for some, but + * not all of the keys. In this case, the operation is considered completed and won't + * be added to the watch list of the remaining keys. The expiration reaper thread will + * remove this operation from any watcher list in which the operation exists. + * + * @param operation the delayed operation to be checked + * @param watchKeys keys for bookkeeping the operation + * @return true iff the delayed operations can be completed by the caller + */ + def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { + for(key <- watchKeys) { + // if the operation is already completed, stopping adding it to + // any further lists and return false + if (operation.isCompleted()) + return false + val watchers = watchersFor(key) + // if the operation can by completed by myself, stop adding it to + // any further lists and return true immediately + if(operation synchronized operation.tryComplete()) { + return true + } else { + watchers.watch(operation) + } + } + + // if it cannot be completed by now and hence is watched, add to the expire queue also + if (! operation.isCompleted()) { + expirationReaper.enqueue(operation) + } + + false + } + + /** + * Check if some some delayed operations can be completed with the given watch key, + * and if yes complete them. + * + * @return the number of completed operations during this process + */ + def checkAndComplete(key: Any): Int = { + val watchers = watchersForKey.get(key) + if(watchers == null) + 0 + else + watchers.tryCompleteWatched() + } + + /** + * Return the total size of watch lists the purgatory. Since an operation may be watched + * on multiple lists, and some of its watched entries may still be in the watch lists + * even when it has been completed, this number may be larger than the number of real operations watched + */ + def watched() = watchersForKey.values.map(_.watched).sum + + /** + * Return the number of delayed operations in the expiry queue + */ + def delayed() = expirationReaper.delayed + + /* + * Return the watch list of the given key + */ + private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) + + /** + * Shutdown the expire reaper thread + */ + def shutdown() { + expirationReaper.shutdown() + } + + /** + * A linked list of watched delayed operations based on some key + */ + private class Watchers { + private val operations = new util.LinkedList[T] + + def watched = operations.size() + + // add the element to watch + def watch(t: T) { + synchronized { + operations.add(t) + } + } + + // traverse the list and try to complete some watched elements + def tryCompleteWatched(): Int = { + var completed = 0 + synchronized { + val iter = operations.iterator() + while(iter.hasNext) { + val curr = iter.next + if (curr.isCompleted()) { + // another thread has completed this operation, just remove it + iter.remove() + } else { + if(curr synchronized curr.tryComplete()) { + iter.remove() + completed += 1 + } + } + } + } + completed + } + + // traverse the list and purge elements that are already completed by others + def purgeCompleted(): Int = { + var purged = 0 + synchronized { + val iter = operations.iterator() + while (iter.hasNext) { + val curr = iter.next + if(curr.isCompleted()) { + iter.remove() + purged += 1 + } + } + } + purged + } + } + + /** + * A background reaper to expire delayed operations that have timed out + */ + private class ExpiredOperationReaper extends ShutdownableThread( + "ExpirationReaper-%d".format(brokerId), + false) { + + /* The queue storing all delayed operations */ + private val delayedQueue = new DelayQueue[T] + + /* + * Return the number of delayed operations kept by the reaper + */ + def delayed() = delayedQueue.size() + + /* + * Add an operation to be expired + */ + def enqueue(t: T) { + delayedQueue.add(t) + } + + /** + * Try to get the next expired event and force completing it + */ + private def expireNext() { + val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS) + if (curr != null.asInstanceOf[T]) { + // if there is an expired operation, try to force complete it + if (curr synchronized curr.forceComplete()) { + debug("Force complete expired delayed operation %s".format(curr)) + } + } + } + + /** + * Delete all satisfied events from the delay queue and the watcher lists + */ + private def purgeCompleted(): Int = { + var purged = 0 + + // purge the delayed queue + val iter = delayedQueue.iterator() + while (iter.hasNext) { + val curr = iter.next() + if (curr.isCompleted()) { + iter.remove() + purged += 1 + } + } + + purged + } + + override def doWork() { + // try to get the next expired operation and force completing it + expireNext() + // see if we need to purge the watch lists + if (DelayedOperationPurgatory.this.watched() >= purgeInterval) { + debug("Begin purging watch lists") + val purged = watchersForKey.values.map(_.purgeCompleted()).sum + debug("Purged %d elements from watch lists.".format(purged)) + } + // see if we need to purge the delayed operation queue + if (delayed() >= purgeInterval) { + debug("Begin purging delayed queue") + val purged = purgeCompleted() + debug("Purged %d operations from delayed queue.".format(purged)) + } + } + } +} diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala new file mode 100644 index 0000000..fb7e9ed --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.TopicAndPartition + +/** + * Keys used for delayed operation metrics recording + */ +trait DelayedOperationKey { + def keyLabel: String +} + +object DelayedOperationKey { + val globalLabel = "All" +} + +case class TopicPartitionOperationKey(topic: String, partition: Int) extends DelayedOperationKey { + + def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) + + override def keyLabel = "%s-%d".format(topic, partition) +} diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 1603066..c229088 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -32,7 +32,7 @@ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: Producer } /** - * The produce metadata maintained by the delayed produce request + * The produce metadata maintained by the delayed produce operation */ case class ProduceMetadata(produceRequiredAcks: Short, produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) { @@ -42,14 +42,14 @@ case class ProduceMetadata(produceRequiredAcks: Short, } /** - * A delayed produce request that can be created by the replica manager and watched - * in the produce request purgatory + * A delayed produce operation that can be created by the replica manager and watched + * in the produce operation purgatory */ class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit) - extends DelayedRequest(delayMs) { + extends DelayedOperation(delayMs) { // first update the acks pending variable according to the error code produceMetadata.produceStatus.foreach { case (topicAndPartition, status) => @@ -65,13 +65,13 @@ class DelayedProduce(delayMs: Long, } /** - * The delayed produce request can be completed if every partition + * The delayed produce operation can be completed if every partition * it produces to is satisfied by one of the following: * * Case A: This broker is no longer the leader: set an error in response * Case B: This broker is the leader: * B.1 - If there was a local error thrown while checking if at least requiredAcks - * replicas have caught up to this request: set an error in response + * replicas have caught up to this operation: set an error in response * B.2 - Otherwise, set the response with no error. */ override def tryComplete(): Boolean = { @@ -117,4 +117,4 @@ class DelayedProduce(delayMs: Long, val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) responseCallback(responseStatus) } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b3566b0..e58fbb9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -80,8 +80,8 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger - val producerRequestPurgatory = new RequestPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests) - val fetchRequestPurgatory = new RequestPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) + val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests) + val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) newGauge( @@ -123,9 +123,9 @@ class ReplicaManager(val config: KafkaConfig, * 1. The partition HW has changed (for acks = -1) * 2. A follower replica's fetch operation is received (for acks > 1) */ - def tryCompleteDelayedProduce(key: TopicAndPartition) { - val completed = producerRequestPurgatory.checkAndComplete(key) - debug("Request key %s unblocked %d producer requests.".format(key, completed)) + def tryCompleteDelayedProduce(key: DelayedOperationKey) { + val completed = delayedProducePurgatory.checkAndComplete(key) + debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed)) } /** @@ -135,9 +135,9 @@ class ReplicaManager(val config: KafkaConfig, * 1. The partition HW has changed (for regular fetch) * 2. A new message set is appended to the local log (for follower fetch) */ - def tryCompleteDelayedFetch(key: TopicAndPartition) { - val completed = fetchRequestPurgatory.checkAndComplete(key) - debug("Request key %s unblocked %d fetch requests.".format(key, completed)) + def tryCompleteDelayedFetch(key: DelayedOperationKey) { + val completed = delayedFetchPurgatory.checkAndComplete(key) + debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed)) } def startup() { @@ -280,13 +280,13 @@ class ReplicaManager(val config: KafkaConfig, val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) - // create a list of (topic, partition) pairs to use as keys for this delayed request - val producerRequestKeys = messagesPerPartition.keys.toSeq + // create a list of (topic, partition) pairs to use as keys for this delayed produce operation + val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq // try to complete the request immediately, otherwise put it into the purgatory - // this is because while the delayed request is being created, new requests may - // arrive which can make this request completable. - producerRequestPurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + // this is because while the delayed produce operation is being created, new + // requests may arrive and hence make this operation completable. + delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } } @@ -392,13 +392,13 @@ class ReplicaManager(val config: KafkaConfig, val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback) - // create a list of (topic, partition) pairs to use as keys for this delayed request - val delayedFetchKeys = fetchPartitionStatus.keys.toSeq + // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation + val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionOperationKey(_)).toSeq // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed request is being created, new requests may - // arrive which can make this request completable. - fetchRequestPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + // this is because while the delayed fetch operation is being created, new requests + // may arrive and hence make this operation completable. + delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } } @@ -718,7 +718,7 @@ class ReplicaManager(val config: KafkaConfig, // for producer requests with ack > 1, we need to check // if they can be unblocked after some follower's log end offsets have moved - tryCompleteDelayedProduce(topicAndPartition) + tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicAndPartition)) case None => warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicAndPartition)) } @@ -749,8 +749,8 @@ class ReplicaManager(val config: KafkaConfig, def shutdown(checkpointHW: Boolean = true) { info("Shutting down") replicaFetcherManager.shutdown() - fetchRequestPurgatory.shutdown() - producerRequestPurgatory.shutdown() + delayedFetchPurgatory.shutdown() + delayedProducePurgatory.shutdown() if (checkpointHW) checkpointHighWatermarks() info("Shut down completely") diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala deleted file mode 100644 index 323b12e..0000000 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ /dev/null @@ -1,317 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.utils._ -import kafka.metrics.KafkaMetricsGroup - -import java.util -import java.util.concurrent._ -import java.util.concurrent.atomic._ -import scala.collection._ - -import com.yammer.metrics.core.Gauge - - -/** - * An operation whose processing needs to be delayed for at most the given delayMs. For example - * a delayed produce operation could be waiting for specified number of acks; or - * a delayed fetch operation could be waiting for a given number of bytes to accumulate. - * - * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once. - * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either - * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed, - * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls - * forceComplete(). - * - * A subclass of DelayedRequest needs to provide an implementation of both onComplete() and tryComplete(). - */ -abstract class DelayedRequest(delayMs: Long) extends DelayedItem(delayMs) { - private val completed = new AtomicBoolean(false) - - /* - * Force completing the delayed operation, if not already completed. - * This function can be triggered when - * - * 1. The operation has been verified to be completable inside tryComplete() - * 2. The operation has expired and hence needs to be completed right now - * - * Return true iff the operation is completed by the caller - */ - def forceComplete(): Boolean = { - if (completed.compareAndSet(false, true)) { - onComplete() - true - } else { - false - } - } - - /** - * Check if the delayed operation is already completed - */ - def isCompleted(): Boolean = completed.get() - - /** - * Process for completing an operation; This function needs to be defined in subclasses - * and will be called exactly once in forceComplete() - */ - def onComplete(): Unit - - /* - * Try to complete the delayed operation by first checking if the operation - * can be completed by now. If yes execute the completion logic by calling - * forceComplete() and return true iff forceComplete returns true; otherwise return false - * - * Note that concurrent threads can check if an operation can be completed or not, - * but only the first thread will succeed in completing the operation and return - * true, others will still return false - * - * this function needs to be defined in subclasses - */ - def tryComplete(): Boolean -} - -/** - * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. - */ -class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) - extends Logging with KafkaMetricsGroup { - - /* a list of requests watching each key */ - private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - - /* background thread expiring requests that have been waiting too long */ - private val expirationReaper = new ExpiredOperationReaper - - newGauge( - "PurgatorySize", - new Gauge[Int] { - def value = watched() - } - ) - - newGauge( - "NumDelayedRequests", - new Gauge[Int] { - def value = delayed() - } - ) - - expirationReaper.start() - - /** - * Check if the operation can be completed, if not watch it based on the given watch keys - * - * Note that a delayed operation can be watched on multiple keys. It is possible that - * an operation is completed after it has been added to the watch list for some, but - * not all of the keys. In this case, the operation is considered completed and won't - * be added to the watch list of the remaining keys. The expiration reaper thread will - * remove this operation from any watcher list in which the operation exists. - * - * @param operation the delayed operation to be checked - * @param watchKeys keys for bookkeeping the operation - * @return true iff the delayed operations can be completed by the caller - */ - def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { - for(key <- watchKeys) { - // if the operation is already completed, stopping adding it to - // any further lists and return false - if (operation.isCompleted()) - return false - val watchers = watchersFor(key) - // if the operation can by completed by myself, stop adding it to - // any further lists and return true immediately - if(operation synchronized operation.tryComplete()) { - return true - } else { - watchers.watch(operation) - } - } - - // if it cannot be completed by now and hence is watched, add to the expire queue also - if (! operation.isCompleted()) { - expirationReaper.enqueue(operation) - } - - false - } - - /** - * Check if some some delayed requests can be completed with the given watch key, - * and if yes complete them. - * - * @return the number of completed requests during this process - */ - def checkAndComplete(key: Any): Int = { - val watchers = watchersForKey.get(key) - if(watchers == null) - 0 - else - watchers.tryCompleteWatched() - } - - /** - * Return the total size of watch lists the purgatory. Since an operation may be watched - * on multiple lists, and some of its watched entries may still be in the watch lists - * even when it has been completed, this number may be larger than the number of real operations watched - */ - def watched() = watchersForKey.values.map(_.watched).sum - - /** - * Return the number of delayed operations in the expiry queue - */ - def delayed() = expirationReaper.delayed - - /* - * Return the watch list of the given key - */ - private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) - - /** - * Shutdown the expire reaper thread - */ - def shutdown() { - expirationReaper.shutdown() - } - - /** - * A linked list of watched delayed operations based on some key - */ - private class Watchers { - private val requests = new util.LinkedList[T] - - def watched = requests.size() - - // add the element to watch - def watch(t: T) { - synchronized { - requests.add(t) - } - } - - // traverse the list and try to complete some watched elements - def tryCompleteWatched(): Int = { - var completed = 0 - synchronized { - val iter = requests.iterator() - while(iter.hasNext) { - val curr = iter.next - if (curr.isCompleted()) { - // another thread has completed this request, just remove it - iter.remove() - } else { - if(curr synchronized curr.tryComplete()) { - iter.remove() - completed += 1 - } - } - } - } - completed - } - - // traverse the list and purge elements that are already completed by others - def purgeCompleted(): Int = { - var purged = 0 - synchronized { - val iter = requests.iterator() - while (iter.hasNext) { - val curr = iter.next - if(curr.isCompleted()) { - iter.remove() - purged += 1 - } - } - } - purged - } - } - - /** - * A background reaper to expire delayed operations that have timed out - */ - private class ExpiredOperationReaper extends ShutdownableThread( - "ExpirationReaper-%d".format(brokerId), - false) { - - /* The queue storing all delayed operations */ - private val delayedQueue = new DelayQueue[T] - - /* - * Return the number of delayed operations kept by the reaper - */ - def delayed() = delayedQueue.size() - - /* - * Add an operation to be expired - */ - def enqueue(t: T) { - delayedQueue.add(t) - } - - /** - * Try to get the next expired event and force completing it - */ - private def expireNext() { - val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS) - if (curr != null.asInstanceOf[T]) { - // if there is an expired operation, try to force complete it - if (curr synchronized curr.forceComplete()) { - debug("Force complete expired delayed operation %s".format(curr)) - } - } - } - - /** - * Delete all satisfied events from the delay queue and the watcher lists - */ - private def purgeCompleted(): Int = { - var purged = 0 - - // purge the delayed queue - val iter = delayedQueue.iterator() - while (iter.hasNext) { - val curr = iter.next() - if (curr.isCompleted()) { - iter.remove() - purged += 1 - } - } - - purged - } - - override def doWork() { - // try to get the next expired operation and force completing it - expireNext() - // see if we need to purge the watch lists - if (RequestPurgatory.this.watched() >= purgeInterval) { - debug("Begin purging watch lists") - val purged = watchersForKey.values.map(_.purgeCompleted()).sum - debug("Purged %d elements from watch lists.".format(purged)) - } - // see if we need to purge the delayed request queue - if (delayed() >= purgeInterval) { - debug("Begin purging delayed queue") - val purged = purgeCompleted() - debug("Purged %d operations from delayed queue.".format(purged)) - } - } - } -} diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala new file mode 100644 index 0000000..93f52d3 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.junit.Test +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ +import kafka.utils.TestUtils + +class DelayedOperationTest extends JUnit3Suite { + + var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null + + override def setUp() { + super.setUp() + purgatory = new DelayedOperationPurgatory[MockDelayedOperation](0, 5) + } + + override def tearDown() { + purgatory.shutdown() + super.tearDown() + } + + @Test + def testRequestSatisfaction() { + val r1 = new MockDelayedOperation(100000L) + val r2 = new MockDelayedOperation(100000L) + assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.checkAndComplete("test1")) + assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1"))) + assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test1")) + assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) + assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test2")) + r1.completable = true + assertEquals("r1 satisfied", 1, purgatory.checkAndComplete("test1")) + assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test1")) + r2.completable = true + assertEquals("r2 satisfied", 1, purgatory.checkAndComplete("test2")) + assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test2")) + } + + @Test + def testRequestExpiry() { + val expiration = 20L + val r1 = new MockDelayedOperation(expiration) + val r2 = new MockDelayedOperation(200000L) + val start = System.currentTimeMillis + assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1"))) + assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) + r1.awaitExpiration() + val elapsed = System.currentTimeMillis - start + assertTrue("r1 completed due to expiration", r1.isCompleted()) + assertFalse("r2 hasn't completed", r2.isCompleted()) + assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) + } + + @Test + def testRequestPurge() { + val r1 = new MockDelayedOperation(100000L) + val r2 = new MockDelayedOperation(100000L) + purgatory.tryCompleteElseWatch(r1, Array("test1")) + purgatory.tryCompleteElseWatch(r2, Array("test1", "test2")) + purgatory.tryCompleteElseWatch(r1, Array("test2", "test3")) + + assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched()) + assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed()) + + // complete one of the operations, it should + // eventually be purged from the watch list with purge interval 5 + r2.completable = true + r2.tryComplete() + TestUtils.waitUntilTrue(() => purgatory.watched() == 3, + "Purgatory should have 3 watched elements instead of " + purgatory.watched(), 1000L) + TestUtils.waitUntilTrue(() => purgatory.delayed() == 3, + "Purgatory should still have 3 total delayed operations instead of " + purgatory.delayed(), 1000L) + + // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5 + purgatory.tryCompleteElseWatch(r1, Array("test1")) + purgatory.tryCompleteElseWatch(r1, Array("test1")) + + TestUtils.waitUntilTrue(() => purgatory.watched() == 5, + "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L) + TestUtils.waitUntilTrue(() => purgatory.delayed() == 4, + "Purgatory should have 4 total delayed operations instead of " + purgatory.delayed(), 1000L) + } + + class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) { + var completable = false + + def awaitExpiration() { + synchronized { + wait() + } + } + + override def tryComplete() = { + if (completable) + forceComplete() + else + false + } + + override def onComplete() { + synchronized { + notify() + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala deleted file mode 100644 index a7720d5..0000000 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import org.junit.Test -import org.scalatest.junit.JUnit3Suite -import junit.framework.Assert._ -import kafka.utils.TestUtils - -class RequestPurgatoryTest extends JUnit3Suite { - - var purgatory: RequestPurgatory[MockDelayedRequest] = null - - override def setUp() { - super.setUp() - purgatory = new RequestPurgatory[MockDelayedRequest](0, 5) - } - - override def tearDown() { - purgatory.shutdown() - super.tearDown() - } - - @Test - def testRequestSatisfaction() { - val r1 = new MockDelayedRequest(100000L) - val r2 = new MockDelayedRequest(100000L) - assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.checkAndComplete("test1")) - assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1"))) - assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test1")) - assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) - assertEquals("Still nothing satisfied", 0, purgatory.checkAndComplete("test2")) - r1.completable = true - assertEquals("r1 satisfied", 1, purgatory.checkAndComplete("test1")) - assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test1")) - r2.completable = true - assertEquals("r2 satisfied", 1, purgatory.checkAndComplete("test2")) - assertEquals("Nothing satisfied", 0, purgatory.checkAndComplete("test2")) - } - - @Test - def testRequestExpiry() { - val expiration = 20L - val r1 = new MockDelayedRequest(expiration) - val r2 = new MockDelayedRequest(200000L) - val start = System.currentTimeMillis - assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1"))) - assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2"))) - r1.awaitExpiration() - val elapsed = System.currentTimeMillis - start - assertTrue("r1 completed due to expiration", r1.isCompleted()) - assertFalse("r2 hasn't completed", r2.isCompleted()) - assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) - } - - @Test - def testRequestPurge() { - val r1 = new MockDelayedRequest(100000L) - val r2 = new MockDelayedRequest(100000L) - purgatory.tryCompleteElseWatch(r1, Array("test1")) - purgatory.tryCompleteElseWatch(r2, Array("test1", "test2")) - purgatory.tryCompleteElseWatch(r1, Array("test2", "test3")) - - assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched()) - assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed()) - - // complete one of the operations, it should - // eventually be purged from the watch list with purge interval 5 - r2.completable = true - r2.tryComplete() - TestUtils.waitUntilTrue(() => purgatory.watched() == 3, - "Purgatory should have 3 watched elements instead of " + purgatory.watched(), 1000L) - TestUtils.waitUntilTrue(() => purgatory.delayed() == 3, - "Purgatory should still have 3 total delayed requests instead of " + purgatory.delayed(), 1000L) - - // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5 - purgatory.tryCompleteElseWatch(r1, Array("test1")) - purgatory.tryCompleteElseWatch(r1, Array("test1")) - - TestUtils.waitUntilTrue(() => purgatory.watched() == 5, - "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L) - TestUtils.waitUntilTrue(() => purgatory.delayed() == 4, - "Purgatory should have 4 total delayed requests instead of " + purgatory.delayed(), 1000L) - } - - class MockDelayedRequest(delayMs: Long) extends DelayedRequest(delayMs) { - var completable = false - - def awaitExpiration() { - synchronized { - wait() - } - } - - override def tryComplete() = { - if (completable) - forceComplete() - else - false - } - - override def onComplete() { - synchronized { - notify() - } - } - } - -} \ No newline at end of file diff --git a/system_test/metrics.json b/system_test/metrics.json index cd3fc14..30dabe5 100644 --- a/system_test/metrics.json +++ b/system_test/metrics.json @@ -78,13 +78,13 @@ { "graph_name": "ProducePurgatoryQueueSize", "y_label": "size", - "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests", + "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedOperations", "attributes": "Value" }, { "graph_name": "FetchPurgatoryQueueSize", "y_label": "size", - "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests", + "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedOperations", "attributes": "Value" }, { -- 1.7.12.4 From cec25f5e262c23a5b2f3665bd5ac5d30aff2f71b Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 31 Oct 2014 14:54:25 -0700 Subject: [PATCH 2/2] minor --- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 8df7fe4..b230e9a 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -232,7 +232,7 @@ class Partition(val topic: String, /** * Update the log end offset of a certain replica of this partition */ - def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) = { + def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) { getReplica(replicaId) match { case Some(replica) => replica.logEndOffset = offset -- 1.7.12.4