From 355b413033e1015acf2e7cbca5a2b9b925055667 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 8 Aug 2014 17:12:51 -0700 Subject: [PATCH 1/5] step 1 --- .../kafka/server/DelayedOperationPurgatory.scala | 285 ++++++++++++++++++ .../server/FetchDelayedOperationPurgatory.scala | 69 +++++ .../scala/kafka/server/FetchRequestPurgatory.scala | 69 ----- core/src/main/scala/kafka/server/KafkaApis.scala | 4 +- .../server/ProducerDelayedOperationPurgatory.scala | 69 +++++ .../kafka/server/ProducerRequestPurgatory.scala | 69 ----- .../main/scala/kafka/server/ReplicaManager.scala | 8 +- .../main/scala/kafka/server/RequestPurgatory.scala | 317 --------------------- core/src/main/scala/kafka/utils/DelayedItem.scala | 6 +- .../server/DelayedOperationPurgatoryTest.scala | 94 ++++++ .../unit/kafka/server/RequestPurgatoryTest.scala | 94 ------ 11 files changed, 526 insertions(+), 558 deletions(-) create mode 100644 core/src/main/scala/kafka/server/DelayedOperationPurgatory.scala create mode 100644 core/src/main/scala/kafka/server/FetchDelayedOperationPurgatory.scala delete mode 100644 core/src/main/scala/kafka/server/FetchRequestPurgatory.scala create mode 100644 core/src/main/scala/kafka/server/ProducerDelayedOperationPurgatory.scala delete mode 100644 core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala delete mode 100644 core/src/main/scala/kafka/server/RequestPurgatory.scala create mode 100644 core/src/test/scala/unit/kafka/server/DelayedOperationPurgatoryTest.scala delete mode 100644 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala diff --git a/core/src/main/scala/kafka/server/DelayedOperationPurgatory.scala b/core/src/main/scala/kafka/server/DelayedOperationPurgatory.scala new file mode 100644 index 0000000..3998b6d --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedOperationPurgatory.scala @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.network._ +import kafka.utils._ +import kafka.metrics.KafkaMetricsGroup + +import java.util +import java.util.concurrent._ +import java.util.concurrent.atomic._ +import scala.collection._ + +import com.yammer.metrics.core.Gauge + + +/** + * An operation whose processing needs to be delayed for at most the given delayMs; + * upon complete, the given callback function will be triggered. For example a delayed + * message append operation could be waiting for specified number of acks; or a delayed + * message fetch operation could be waiting for a given number of bytes to accumulate. + */ +abstract class DelayedOperation(delayMs: Long, onComplete: Boolean => Unit) extends DelayedItem(delayMs) { + val completed = new AtomicBoolean(false) + + /* + * Check if the delayed operation is already completed + * + * Note that concurrent threads can check if an operation can be completed or not, + * but only the first thread will succeed in completing the operation + */ + def tryComplete(): Boolean = completed.compareAndSet(false, true) + + /* + * When delayMs has elapsed, expire the delayed operation + */ + def onExpired() = onComplete(false) +} + +/** + * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. + * + */ +abstract class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeInterval: Int = 1000) + extends Logging with KafkaMetricsGroup { + + /* a list of requests watching each key */ + private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + + /* the number of requests being watched, duplicates added on different watchers are also counted */ + private val watched = new AtomicInteger(0) + + /* background thread expiring requests that have been waiting too long */ + private val expirationReaper = new ExpiredOperationReaper + + newGauge( + "PurgatorySize", + new Gauge[Int] { + def value = size + } + ) + + newGauge( + "NumDelayedOperations", + new Gauge[Int] { + def value = expirationReaper.numOperations + } + ) + + expirationThread.start() + + /** + * Check if the operation can be completed, if not watch it based on the given watch keys + * + * Note that a delayed operation can be watched on multiple keys, and hence due to concurrency may be + * found completed when trying to watch it on some later keys. In this case the operation is still + * treated as completed and hence no longer watched although it is still in the watch lists of + * the earlier keys. Those already watched elements will be later purged by the expire reaper. + * + * @param operation the delayed operation to be checked + * @param watchKeys keys for bookkeeping the operation + * @return true iff the delayed operations can be completed + */ + def tryCompleteElseWatch(operation: DelayedOperation, watchKeys: Seq[Any]): Boolean = { + for(key <- watchKeys) { + val watchers = watchersFor(key) + // if the operation is found completed, stop adding it to any further + // lists and return true immediately + if(!watchers.checkAndMaybeAdd(operation)) { + return true + } + } + + // if it is indeed watched, add to the expire queue also + watched.getAndIncrement() + expirationReaper.enqueue(operation) + + false + + } + + /** + * Return a list of completed operations with the given watch key. + */ + def getCompleted(key: Any): Seq[T] = { + val watchers = watchersForKey.get(key) + if(watchers == null) + Seq.empty + else + watchers.collectCompletedOperations() + } + + /* + * Return the watch list of the given key + */ + private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) + + /* + * Return the size of the purgatory, which is size of watch lists plus the size of the expire reaper. + * Since an operation may still be in the watch lists even when it has been completed, this number + * may be larger than the number of real operations watched + */ + protected def size() = watchersForKey.values.map(_.numRequests).sum + expirationReaper.numOperations + + /** + * Shutdown the expire reaper thread + */ + def shutdown() { + expirationReaper.shutdown() + } + + /** + * A linked list of watched delayed operations based on some key + */ + private class Watchers { + private val requests = new util.ArrayList[T] + + // potentially add the element to watch if it is not satisfied yet + def checkAndMaybeAdd(t: T): Boolean = { + synchronized { + // if it is already satisfied, return false + if (t.completed.get()) + return false + // if the operation can be completed, return false; otherwise add to watch list + if(t.tryComplete()) { + return false + } else { + requests.add(t) + return true + } + } + } + + // traverse the list and purge satisfied elements + def purgeSatisfied(): Int = { + synchronized { + val iter = requests.iterator() + var purged = 0 + while (iter.hasNext) { + val curr = iter.next + if(curr.completed.get()) { + iter.remove() + purged += 1 + } + } + purged + } + } + + // traverse the list and try to satisfy watched elements + def collectCompletedOperations(): Seq[T] = { + val response = new mutable.ArrayBuffer[T] + synchronized { + val iter = requests.iterator() + while(iter.hasNext) { + val curr = iter.next + if (curr.completed.get()) { + // another thread has completed this request, just remove it + iter.remove() + } else { + val completed = curr.tryComplete() + if(completed) { + iter.remove() + watched.getAndDecrement() + response += curr + expirationReaper.satisfyRequest() + } + } + } + } + response + } + } + + /** + * A background reaper to expire delayed operations that have timed out + */ + private class ExpiredOperationReaper extends ShutdownableThread( + "ExpirationReaper-%d".format(brokerId), + false) { + + /* The queue storing all delayed operations */ + private val delayed = new DelayQueue[T] + + /* + * Return the number of delayed operations kept by the reaper + */ + def numOperations = delayed.size() + + /* + * Add a operation to be expired + */ + def enqueue(t: T) { + delayed.add(t) + unsatisfied.incrementAndGet() + } + + /** + * Get the next expired event + */ + private def pollExpired(): T = { + while (true) { + val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) + if (curr == null) + return null.asInstanceOf[T] + // try set the operation failed (and hence completed), if succeed return it; + // otherwise try to get the next expired operation since this one has been completed by others + if (curr.completed.compareAndSet(false, true)) { + return curr + } + } + throw new RuntimeException("This should not happen") + } + + /** + * Delete all satisfied events from the delay queue and the watcher lists + */ + private def purgeSatisfied(): Int = { + var purged = 0 + + // purge the delayed queue + val iter = delayed.iterator() + while (iter.hasNext) { + val curr = iter.next() + if (curr.completed.get()) { + iter.remove() + purged += 1 + } + } + + purged + } + + + override def doWork() { + val curr = pollExpired() + if (curr != null) { + curr.onExpired() + } + if (size >= purgeInterval) { // see if we need to force a full purge + debug("Beginning purgatory purge") + val purged = purgeSatisfied() + debug("Purged %d operations from delay queue.".format(purged)) + val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum + debug("Purged %d operations from watch lists.".format(numPurgedFromWatchers)) + } + } + } + +} diff --git a/core/src/main/scala/kafka/server/FetchDelayedOperationPurgatory.scala b/core/src/main/scala/kafka/server/FetchDelayedOperationPurgatory.scala new file mode 100644 index 0000000..71c5920 --- /dev/null +++ b/core/src/main/scala/kafka/server/FetchDelayedOperationPurgatory.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.KafkaMetricsGroup +import kafka.network.RequestChannel +import kafka.api.FetchResponseSend + +import java.util.concurrent.TimeUnit + +/** + * The purgatory holding delayed fetch requests + */ +class FetchDelayedOperationPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) + extends DelayedOperationPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { + this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) + + private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { + private val metricPrefix = if (forFollower) "Follower" else "Consumer" + + val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) + } + + private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) + private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) + + private def recordDelayedFetchExpired(forFollower: Boolean) { + val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics + else aggregateNonFollowerFetchRequestMetrics + + metrics.expiredRequestMeter.mark() + } + + /** + * Check if a specified delayed fetch request is satisfied + */ + def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager) + + /** + * When a delayed fetch request expires just answer it with whatever data is present + */ + def expire(delayedFetch: DelayedFetch) { + debug("Expiring fetch request %s.".format(delayedFetch.fetch)) + val fromFollower = delayedFetch.fetch.isFromFollower + recordDelayedFetchExpired(fromFollower) + respond(delayedFetch) + } + + // TODO: purgatory should not be responsible for sending back the responses + def respond(delayedFetch: DelayedFetch) { + val response = delayedFetch.respond(replicaManager) + requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response))) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala deleted file mode 100644 index ed13188..0000000 --- a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.metrics.KafkaMetricsGroup -import kafka.network.RequestChannel -import kafka.api.FetchResponseSend - -import java.util.concurrent.TimeUnit - -/** - * The purgatory holding delayed fetch requests - */ -class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) - extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { - this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) - - private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { - private val metricPrefix = if (forFollower) "Follower" else "Consumer" - - val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) - private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) - - private def recordDelayedFetchExpired(forFollower: Boolean) { - val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - - metrics.expiredRequestMeter.mark() - } - - /** - * Check if a specified delayed fetch request is satisfied - */ - def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager) - - /** - * When a delayed fetch request expires just answer it with whatever data is present - */ - def expire(delayedFetch: DelayedFetch) { - debug("Expiring fetch request %s.".format(delayedFetch.fetch)) - val fromFollower = delayedFetch.fetch.isFromFollower - recordDelayedFetchExpired(fromFollower) - respond(delayedFetch) - } - - // TODO: purgatory should not be responsible for sending back the responses - def respond(delayedFetch: DelayedFetch) { - val response = delayedFetch.respond(replicaManager) - requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response))) - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb94673..089ebc3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -42,8 +42,8 @@ class KafkaApis(val requestChannel: RequestChannel, val config: KafkaConfig, val controller: KafkaController) extends Logging { - val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel) - val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel) + val producerRequestPurgatory = new ProducerDelayedOperationPurgatory(replicaManager, offsetManager, requestChannel) + val fetchRequestPurgatory = new FetchDelayedOperationPurgatory(replicaManager, requestChannel) // TODO: the following line will be removed in 0.9 replicaManager.initWithRequestPurgatory(producerRequestPurgatory, fetchRequestPurgatory) var metadataCache = new MetadataCache diff --git a/core/src/main/scala/kafka/server/ProducerDelayedOperationPurgatory.scala b/core/src/main/scala/kafka/server/ProducerDelayedOperationPurgatory.scala new file mode 100644 index 0000000..4b950e1 --- /dev/null +++ b/core/src/main/scala/kafka/server/ProducerDelayedOperationPurgatory.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.Pool +import kafka.network.{BoundedByteBufferSend, RequestChannel} + +import java.util.concurrent.TimeUnit + +/** + * The purgatory holding delayed producer requests + */ +class ProducerDelayedOperationPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) + extends DelayedOperationPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { + this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) + + private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup { + val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) + } + + private val producerRequestMetricsForKey = { + val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") + new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory)) + } + + private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics + + private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) { + val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) + List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) + } + + /** + * Check if a specified delayed fetch request is satisfied + */ + def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager) + + /** + * When a delayed produce request expires answer it with possible time out error codes + */ + def expire(delayedProduce: DelayedProduce) { + debug("Expiring produce request %s.".format(delayedProduce.produce)) + for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) + recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition)) + respond(delayedProduce) + } + + // TODO: purgatory should not be responsible for sending back the responses + def respond(delayedProduce: DelayedProduce) { + val response = delayedProduce.respond(offsetManager) + requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response))) + } +} diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala deleted file mode 100644 index d4a7d4a..0000000 --- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.metrics.KafkaMetricsGroup -import kafka.utils.Pool -import kafka.network.{BoundedByteBufferSend, RequestChannel} - -import java.util.concurrent.TimeUnit - -/** - * The purgatory holding delayed producer requests - */ -class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) - extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { - this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) - - private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup { - val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val producerRequestMetricsForKey = { - val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") - new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory)) - } - - private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics - - private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) { - val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) - List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) - } - - /** - * Check if a specified delayed fetch request is satisfied - */ - def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager) - - /** - * When a delayed produce request expires answer it with possible time out error codes - */ - def expire(delayedProduce: DelayedProduce) { - debug("Expiring produce request %s.".format(delayedProduce.produce)) - for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) - recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition)) - respond(delayedProduce) - } - - // TODO: purgatory should not be responsible for sending back the responses - def respond(delayedProduce: DelayedProduce) { - val response = delayedProduce.respond(offsetManager) - requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response))) - } -} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 68758e3..d6a8356 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -46,7 +46,7 @@ object ReplicaManager { case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset: LogOffsetMetadata) -class ReplicaManager(val config: KafkaConfig, +class ReplicaManager(config: KafkaConfig, time: Time, val zkClient: ZkClient, scheduler: Scheduler, @@ -64,8 +64,8 @@ class ReplicaManager(val config: KafkaConfig, this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger - var producerRequestPurgatory: ProducerRequestPurgatory = null - var fetchRequestPurgatory: FetchRequestPurgatory = null + var producerRequestPurgatory: ProducerDelayedOperationPurgatory = null + var fetchRequestPurgatory: FetchDelayedOperationPurgatory = null newGauge( "LeaderCount", @@ -105,7 +105,7 @@ class ReplicaManager(val config: KafkaConfig, * TODO: will be removed in 0.9 where we refactor server structure */ - def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) { + def initWithRequestPurgatory(producerRequestPurgatory: ProducerDelayedOperationPurgatory, fetchRequestPurgatory: FetchDelayedOperationPurgatory) { this.producerRequestPurgatory = producerRequestPurgatory this.fetchRequestPurgatory = fetchRequestPurgatory } diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala deleted file mode 100644 index ce06d2c..0000000 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ /dev/null @@ -1,317 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.network._ -import kafka.utils._ -import kafka.metrics.KafkaMetricsGroup - -import java.util -import java.util.concurrent._ -import java.util.concurrent.atomic._ -import scala.collection._ - -import com.yammer.metrics.core.Gauge - - -/** - * A request whose processing needs to be delayed for at most the given delayMs - * The associated keys are used for bookeeping, and represent the "trigger" that causes this request to check if it is satisfied, - * for example a key could be a (topic, partition) pair. - */ -class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { - val satisfied = new AtomicBoolean(false) -} - -/** - * A helper class for dealing with asynchronous requests with a timeout. A DelayedRequest has a request to delay - * and also a list of keys that can trigger the action. Implementations can add customized logic to control what it means for a given - * request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition) - * to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request - * to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting). - * - * For us the key is generally a (topic, partition) pair. - * By calling - * val isSatisfiedByMe = checkAndMaybeWatch(delayedRequest) - * we will check if a request is satisfied already, and if not add the request for watch on all its keys. - * - * It is up to the user to then call - * val satisfied = update(key, request) - * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this - * new request. - * - * An implementation provides extends two helper functions - * def checkSatisfied(request: R, delayed: T): Boolean - * this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed - * request delayed. This method will likely also need to do whatever bookkeeping is necessary. - * - * The second function is - * def expire(delayed: T) - * this function handles delayed requests that have hit their time limit without being satisfied. - * - */ -abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) - extends Logging with KafkaMetricsGroup { - - /* a list of requests watching each key */ - private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - - /* the number of requests being watched, duplicates added on different watchers are also counted */ - private val watched = new AtomicInteger(0) - - /* background thread expiring requests that have been waiting too long */ - private val expiredRequestReaper = new ExpiredRequestReaper - private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) - - newGauge( - "PurgatorySize", - new Gauge[Int] { - def value = watched.get() + expiredRequestReaper.numRequests - } - ) - - newGauge( - "NumDelayedRequests", - new Gauge[Int] { - def value = expiredRequestReaper.unsatisfied.get() - } - ) - - expirationThread.start() - - /** - * Try to add the request for watch on all keys. Return true iff the request is - * satisfied and the satisfaction is done by the caller. - * - * Requests can be watched on only a few of the keys if it is found satisfied when - * trying to add it to each one of the keys. In this case the request is still treated as satisfied - * and hence no longer watched. Those already added elements will be later purged by the expire reaper. - */ - def checkAndMaybeWatch(delayedRequest: T): Boolean = { - for(key <- delayedRequest.keys) { - val lst = watchersFor(key) - if(!lst.checkAndMaybeAdd(delayedRequest)) { - if(delayedRequest.satisfied.compareAndSet(false, true)) - return true - else - return false - } - } - - // if it is indeed watched, add to the expire queue also - expiredRequestReaper.enqueue(delayedRequest) - - false - } - - /** - * Update any watchers and return a list of newly satisfied requests. - */ - def update(key: Any): Seq[T] = { - val w = watchersForKey.get(key) - if(w == null) - Seq.empty - else - w.collectSatisfiedRequests() - } - - private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) - - /** - * Check if this delayed request is already satisfied - */ - protected def checkSatisfied(request: T): Boolean - - /** - * Handle an expired delayed request - */ - protected def expire(delayed: T) - - /** - * Shutdown the expire reaper thread - */ - def shutdown() { - expiredRequestReaper.shutdown() - } - - /** - * A linked list of DelayedRequests watching some key with some associated - * bookkeeping logic. - */ - private class Watchers { - private val requests = new util.ArrayList[T] - - // potentially add the element to watch if it is not satisfied yet - def checkAndMaybeAdd(t: T): Boolean = { - synchronized { - // if it is already satisfied, do not add to the watch list - if (t.satisfied.get) - return false - // synchronize on the delayed request to avoid any race condition - // with expire and update threads on client-side. - if(t synchronized checkSatisfied(t)) { - return false - } - requests.add(t) - watched.getAndIncrement() - return true - } - } - - // traverse the list and purge satisfied elements - def purgeSatisfied(): Int = { - synchronized { - val iter = requests.iterator() - var purged = 0 - while(iter.hasNext) { - val curr = iter.next - if(curr.satisfied.get()) { - iter.remove() - watched.getAndDecrement() - purged += 1 - } - } - purged - } - } - - // traverse the list and try to satisfy watched elements - def collectSatisfiedRequests(): Seq[T] = { - val response = new mutable.ArrayBuffer[T] - synchronized { - val iter = requests.iterator() - while(iter.hasNext) { - val curr = iter.next - if(curr.satisfied.get) { - // another thread has satisfied this request, remove it - iter.remove() - } else { - // synchronize on curr to avoid any race condition with expire - // on client-side. - val satisfied = curr synchronized checkSatisfied(curr) - if(satisfied) { - iter.remove() - watched.getAndDecrement() - val updated = curr.satisfied.compareAndSet(false, true) - if(updated == true) { - response += curr - expiredRequestReaper.satisfyRequest() - } - } - } - } - } - response - } - } - - /** - * Runnable to expire requests that have sat unfullfilled past their deadline - */ - private class ExpiredRequestReaper extends Runnable with Logging { - this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId) - - private val delayed = new DelayQueue[T] - private val running = new AtomicBoolean(true) - private val shutdownLatch = new CountDownLatch(1) - - /* The count of elements in the delay queue that are unsatisfied */ - private [kafka] val unsatisfied = new AtomicInteger(0) - - def numRequests = delayed.size() - - /** Main loop for the expiry thread */ - def run() { - while(running.get) { - try { - val curr = pollExpired() - if (curr != null) { - curr synchronized { - expire(curr) - } - } - if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge - debug("Beginning purgatory purge") - val purged = purgeSatisfied() - debug("Purged %d requests from delay queue.".format(purged)) - val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum - debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers)) - } - } catch { - case e: Exception => - error("Error in long poll expiry thread: ", e) - } - } - shutdownLatch.countDown() - } - - /** Add a request to be expired */ - def enqueue(t: T) { - delayed.add(t) - unsatisfied.incrementAndGet() - } - - /** Shutdown the expiry thread*/ - def shutdown() { - debug("Shutting down.") - running.set(false) - shutdownLatch.await() - debug("Shut down complete.") - } - - /** Record the fact that we satisfied a request in the stats for the expiry queue */ - def satisfyRequest(): Unit = unsatisfied.getAndDecrement() - - /** - * Get the next expired event - */ - private def pollExpired(): T = { - while(true) { - val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) - if (curr == null) - return null.asInstanceOf[T] - val updated = curr.satisfied.compareAndSet(false, true) - if(updated) { - unsatisfied.getAndDecrement() - return curr - } - } - throw new RuntimeException("This should not happen") - } - - /** - * Delete all satisfied events from the delay queue and the watcher lists - */ - private def purgeSatisfied(): Int = { - var purged = 0 - - // purge the delayed queue - val iter = delayed.iterator() - while(iter.hasNext) { - val curr = iter.next() - if(curr.satisfied.get) { - iter.remove() - purged += 1 - } - } - - purged - } - } - -} diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala index d727649..3d7df84 100644 --- a/core/src/main/scala/kafka/utils/DelayedItem.scala +++ b/core/src/main/scala/kafka/utils/DelayedItem.scala @@ -20,7 +20,7 @@ package kafka.utils import java.util.concurrent._ import scala.math._ -class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed with Logging { +class DelayedItem(delay: Long, unit: TimeUnit) extends Delayed with Logging { val createdMs = SystemTime.milliseconds val delayMs = { @@ -29,8 +29,8 @@ class DelayedItem[T](val item: T, delay: Long, unit: TimeUnit) extends Delayed w else given } - def this(item: T, delayMs: Long) = - this(item, delayMs, TimeUnit.MILLISECONDS) + def this(delayMs: Long) = + this(delayMs, TimeUnit.MILLISECONDS) /** * The remaining delay time diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationPurgatoryTest.scala new file mode 100644 index 0000000..c9a5f2e --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationPurgatoryTest.scala @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import scala.collection._ +import org.junit.Test +import junit.framework.Assert._ +import kafka.message._ +import kafka.api._ +import kafka.utils.TestUtils +import org.scalatest.junit.JUnit3Suite + + +class DelayedOperationPurgatoryTest extends JUnit3Suite { + + val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes))) + val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes))) + var purgatory: MockDelayedOperationPurgatory = null + + override def setUp() { + super.setUp() + purgatory = new MockDelayedOperationPurgatory() + } + + override def tearDown() { + purgatory.shutdown() + super.tearDown() + } + + @Test + def testRequestSatisfaction() { + val r1 = new DelayedRequest(Array("test1"), null, 100000L) + val r2 = new DelayedRequest(Array("test2"), null, 100000L) + assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) + assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) + assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) + purgatory.satisfied += r1 + assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) + assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) + purgatory.satisfied += r2 + assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) + assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) + } + + @Test + def testRequestExpiry() { + val expiration = 20L + val r1 = new DelayedRequest(Array("test1"), null, expiration) + val r2 = new DelayedRequest(Array("test1"), null, 200000L) + val start = System.currentTimeMillis + assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) + assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) + purgatory.awaitExpiration(r1) + val elapsed = System.currentTimeMillis - start + assertTrue("r1 expired", purgatory.expired.contains(r1)) + assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2)) + assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) + } + + class MockDelayedOperationPurgatory extends DelayedOperationPurgatory[DelayedRequest] { + val satisfied = mutable.Set[DelayedRequest]() + val expired = mutable.Set[DelayedRequest]() + def awaitExpiration(delayed: DelayedRequest) = { + delayed synchronized { + delayed.wait() + } + } + def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) + def expire(delayed: DelayedRequest) { + expired += delayed + delayed synchronized { + delayed.notify() + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala deleted file mode 100644 index 168712d..0000000 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import scala.collection._ -import org.junit.Test -import junit.framework.Assert._ -import kafka.message._ -import kafka.api._ -import kafka.utils.TestUtils -import org.scalatest.junit.JUnit3Suite - - -class RequestPurgatoryTest extends JUnit3Suite { - - val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes))) - val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes))) - var purgatory: MockRequestPurgatory = null - - override def setUp() { - super.setUp() - purgatory = new MockRequestPurgatory() - } - - override def tearDown() { - purgatory.shutdown() - super.tearDown() - } - - @Test - def testRequestSatisfaction() { - val r1 = new DelayedRequest(Array("test1"), null, 100000L) - val r2 = new DelayedRequest(Array("test2"), null, 100000L) - assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) - assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) - assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) - assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) - assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) - purgatory.satisfied += r1 - assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) - assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) - purgatory.satisfied += r2 - assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) - assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) - } - - @Test - def testRequestExpiry() { - val expiration = 20L - val r1 = new DelayedRequest(Array("test1"), null, expiration) - val r2 = new DelayedRequest(Array("test1"), null, 200000L) - val start = System.currentTimeMillis - assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) - assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) - purgatory.awaitExpiration(r1) - val elapsed = System.currentTimeMillis - start - assertTrue("r1 expired", purgatory.expired.contains(r1)) - assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2)) - assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) - } - - class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] { - val satisfied = mutable.Set[DelayedRequest]() - val expired = mutable.Set[DelayedRequest]() - def awaitExpiration(delayed: DelayedRequest) = { - delayed synchronized { - delayed.wait() - } - } - def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) - def expire(delayed: DelayedRequest) { - expired += delayed - delayed synchronized { - delayed.notify() - } - } - } - -} \ No newline at end of file -- 1.7.12.4 From 5485d5deb1a9e0c5d00a3509d18fb2550c8829e1 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 12 Aug 2014 09:49:17 -0700 Subject: [PATCH 2/5] change name back --- .../kafka/server/DelayedOperationPurgatory.scala | 285 -------------------- .../server/FetchDelayedOperationPurgatory.scala | 69 ----- .../scala/kafka/server/FetchRequestPurgatory.scala | 69 +++++ core/src/main/scala/kafka/server/KafkaApis.scala | 4 +- .../server/ProducerDelayedOperationPurgatory.scala | 69 ----- .../kafka/server/ProducerRequestPurgatory.scala | 69 +++++ .../main/scala/kafka/server/ReplicaManager.scala | 6 +- .../main/scala/kafka/server/RequestPurgatory.scala | 286 +++++++++++++++++++++ .../server/DelayedOperationPurgatoryTest.scala | 94 ------- .../unit/kafka/server/RequestPurgatoryTest.scala | 94 +++++++ 10 files changed, 523 insertions(+), 522 deletions(-) delete mode 100644 core/src/main/scala/kafka/server/DelayedOperationPurgatory.scala delete mode 100644 core/src/main/scala/kafka/server/FetchDelayedOperationPurgatory.scala create mode 100644 core/src/main/scala/kafka/server/FetchRequestPurgatory.scala delete mode 100644 core/src/main/scala/kafka/server/ProducerDelayedOperationPurgatory.scala create mode 100644 core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala create mode 100644 core/src/main/scala/kafka/server/RequestPurgatory.scala delete mode 100644 core/src/test/scala/unit/kafka/server/DelayedOperationPurgatoryTest.scala create mode 100644 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala diff --git a/core/src/main/scala/kafka/server/DelayedOperationPurgatory.scala b/core/src/main/scala/kafka/server/DelayedOperationPurgatory.scala deleted file mode 100644 index 3998b6d..0000000 --- a/core/src/main/scala/kafka/server/DelayedOperationPurgatory.scala +++ /dev/null @@ -1,285 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.network._ -import kafka.utils._ -import kafka.metrics.KafkaMetricsGroup - -import java.util -import java.util.concurrent._ -import java.util.concurrent.atomic._ -import scala.collection._ - -import com.yammer.metrics.core.Gauge - - -/** - * An operation whose processing needs to be delayed for at most the given delayMs; - * upon complete, the given callback function will be triggered. For example a delayed - * message append operation could be waiting for specified number of acks; or a delayed - * message fetch operation could be waiting for a given number of bytes to accumulate. - */ -abstract class DelayedOperation(delayMs: Long, onComplete: Boolean => Unit) extends DelayedItem(delayMs) { - val completed = new AtomicBoolean(false) - - /* - * Check if the delayed operation is already completed - * - * Note that concurrent threads can check if an operation can be completed or not, - * but only the first thread will succeed in completing the operation - */ - def tryComplete(): Boolean = completed.compareAndSet(false, true) - - /* - * When delayMs has elapsed, expire the delayed operation - */ - def onExpired() = onComplete(false) -} - -/** - * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. - * - */ -abstract class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeInterval: Int = 1000) - extends Logging with KafkaMetricsGroup { - - /* a list of requests watching each key */ - private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - - /* the number of requests being watched, duplicates added on different watchers are also counted */ - private val watched = new AtomicInteger(0) - - /* background thread expiring requests that have been waiting too long */ - private val expirationReaper = new ExpiredOperationReaper - - newGauge( - "PurgatorySize", - new Gauge[Int] { - def value = size - } - ) - - newGauge( - "NumDelayedOperations", - new Gauge[Int] { - def value = expirationReaper.numOperations - } - ) - - expirationThread.start() - - /** - * Check if the operation can be completed, if not watch it based on the given watch keys - * - * Note that a delayed operation can be watched on multiple keys, and hence due to concurrency may be - * found completed when trying to watch it on some later keys. In this case the operation is still - * treated as completed and hence no longer watched although it is still in the watch lists of - * the earlier keys. Those already watched elements will be later purged by the expire reaper. - * - * @param operation the delayed operation to be checked - * @param watchKeys keys for bookkeeping the operation - * @return true iff the delayed operations can be completed - */ - def tryCompleteElseWatch(operation: DelayedOperation, watchKeys: Seq[Any]): Boolean = { - for(key <- watchKeys) { - val watchers = watchersFor(key) - // if the operation is found completed, stop adding it to any further - // lists and return true immediately - if(!watchers.checkAndMaybeAdd(operation)) { - return true - } - } - - // if it is indeed watched, add to the expire queue also - watched.getAndIncrement() - expirationReaper.enqueue(operation) - - false - - } - - /** - * Return a list of completed operations with the given watch key. - */ - def getCompleted(key: Any): Seq[T] = { - val watchers = watchersForKey.get(key) - if(watchers == null) - Seq.empty - else - watchers.collectCompletedOperations() - } - - /* - * Return the watch list of the given key - */ - private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) - - /* - * Return the size of the purgatory, which is size of watch lists plus the size of the expire reaper. - * Since an operation may still be in the watch lists even when it has been completed, this number - * may be larger than the number of real operations watched - */ - protected def size() = watchersForKey.values.map(_.numRequests).sum + expirationReaper.numOperations - - /** - * Shutdown the expire reaper thread - */ - def shutdown() { - expirationReaper.shutdown() - } - - /** - * A linked list of watched delayed operations based on some key - */ - private class Watchers { - private val requests = new util.ArrayList[T] - - // potentially add the element to watch if it is not satisfied yet - def checkAndMaybeAdd(t: T): Boolean = { - synchronized { - // if it is already satisfied, return false - if (t.completed.get()) - return false - // if the operation can be completed, return false; otherwise add to watch list - if(t.tryComplete()) { - return false - } else { - requests.add(t) - return true - } - } - } - - // traverse the list and purge satisfied elements - def purgeSatisfied(): Int = { - synchronized { - val iter = requests.iterator() - var purged = 0 - while (iter.hasNext) { - val curr = iter.next - if(curr.completed.get()) { - iter.remove() - purged += 1 - } - } - purged - } - } - - // traverse the list and try to satisfy watched elements - def collectCompletedOperations(): Seq[T] = { - val response = new mutable.ArrayBuffer[T] - synchronized { - val iter = requests.iterator() - while(iter.hasNext) { - val curr = iter.next - if (curr.completed.get()) { - // another thread has completed this request, just remove it - iter.remove() - } else { - val completed = curr.tryComplete() - if(completed) { - iter.remove() - watched.getAndDecrement() - response += curr - expirationReaper.satisfyRequest() - } - } - } - } - response - } - } - - /** - * A background reaper to expire delayed operations that have timed out - */ - private class ExpiredOperationReaper extends ShutdownableThread( - "ExpirationReaper-%d".format(brokerId), - false) { - - /* The queue storing all delayed operations */ - private val delayed = new DelayQueue[T] - - /* - * Return the number of delayed operations kept by the reaper - */ - def numOperations = delayed.size() - - /* - * Add a operation to be expired - */ - def enqueue(t: T) { - delayed.add(t) - unsatisfied.incrementAndGet() - } - - /** - * Get the next expired event - */ - private def pollExpired(): T = { - while (true) { - val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) - if (curr == null) - return null.asInstanceOf[T] - // try set the operation failed (and hence completed), if succeed return it; - // otherwise try to get the next expired operation since this one has been completed by others - if (curr.completed.compareAndSet(false, true)) { - return curr - } - } - throw new RuntimeException("This should not happen") - } - - /** - * Delete all satisfied events from the delay queue and the watcher lists - */ - private def purgeSatisfied(): Int = { - var purged = 0 - - // purge the delayed queue - val iter = delayed.iterator() - while (iter.hasNext) { - val curr = iter.next() - if (curr.completed.get()) { - iter.remove() - purged += 1 - } - } - - purged - } - - - override def doWork() { - val curr = pollExpired() - if (curr != null) { - curr.onExpired() - } - if (size >= purgeInterval) { // see if we need to force a full purge - debug("Beginning purgatory purge") - val purged = purgeSatisfied() - debug("Purged %d operations from delay queue.".format(purged)) - val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum - debug("Purged %d operations from watch lists.".format(numPurgedFromWatchers)) - } - } - } - -} diff --git a/core/src/main/scala/kafka/server/FetchDelayedOperationPurgatory.scala b/core/src/main/scala/kafka/server/FetchDelayedOperationPurgatory.scala deleted file mode 100644 index 71c5920..0000000 --- a/core/src/main/scala/kafka/server/FetchDelayedOperationPurgatory.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.metrics.KafkaMetricsGroup -import kafka.network.RequestChannel -import kafka.api.FetchResponseSend - -import java.util.concurrent.TimeUnit - -/** - * The purgatory holding delayed fetch requests - */ -class FetchDelayedOperationPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) - extends DelayedOperationPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { - this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) - - private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { - private val metricPrefix = if (forFollower) "Follower" else "Consumer" - - val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) - private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) - - private def recordDelayedFetchExpired(forFollower: Boolean) { - val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics - else aggregateNonFollowerFetchRequestMetrics - - metrics.expiredRequestMeter.mark() - } - - /** - * Check if a specified delayed fetch request is satisfied - */ - def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager) - - /** - * When a delayed fetch request expires just answer it with whatever data is present - */ - def expire(delayedFetch: DelayedFetch) { - debug("Expiring fetch request %s.".format(delayedFetch.fetch)) - val fromFollower = delayedFetch.fetch.isFromFollower - recordDelayedFetchExpired(fromFollower) - respond(delayedFetch) - } - - // TODO: purgatory should not be responsible for sending back the responses - def respond(delayedFetch: DelayedFetch) { - val response = delayedFetch.respond(replicaManager) - requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response))) - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala new file mode 100644 index 0000000..ed13188 --- /dev/null +++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.KafkaMetricsGroup +import kafka.network.RequestChannel +import kafka.api.FetchResponseSend + +import java.util.concurrent.TimeUnit + +/** + * The purgatory holding delayed fetch requests + */ +class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) + extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { + this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) + + private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { + private val metricPrefix = if (forFollower) "Follower" else "Consumer" + + val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) + } + + private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) + private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) + + private def recordDelayedFetchExpired(forFollower: Boolean) { + val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics + else aggregateNonFollowerFetchRequestMetrics + + metrics.expiredRequestMeter.mark() + } + + /** + * Check if a specified delayed fetch request is satisfied + */ + def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager) + + /** + * When a delayed fetch request expires just answer it with whatever data is present + */ + def expire(delayedFetch: DelayedFetch) { + debug("Expiring fetch request %s.".format(delayedFetch.fetch)) + val fromFollower = delayedFetch.fetch.isFromFollower + recordDelayedFetchExpired(fromFollower) + respond(delayedFetch) + } + + // TODO: purgatory should not be responsible for sending back the responses + def respond(delayedFetch: DelayedFetch) { + val response = delayedFetch.respond(replicaManager) + requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response))) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 089ebc3..bb94673 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -42,8 +42,8 @@ class KafkaApis(val requestChannel: RequestChannel, val config: KafkaConfig, val controller: KafkaController) extends Logging { - val producerRequestPurgatory = new ProducerDelayedOperationPurgatory(replicaManager, offsetManager, requestChannel) - val fetchRequestPurgatory = new FetchDelayedOperationPurgatory(replicaManager, requestChannel) + val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager, offsetManager, requestChannel) + val fetchRequestPurgatory = new FetchRequestPurgatory(replicaManager, requestChannel) // TODO: the following line will be removed in 0.9 replicaManager.initWithRequestPurgatory(producerRequestPurgatory, fetchRequestPurgatory) var metadataCache = new MetadataCache diff --git a/core/src/main/scala/kafka/server/ProducerDelayedOperationPurgatory.scala b/core/src/main/scala/kafka/server/ProducerDelayedOperationPurgatory.scala deleted file mode 100644 index 4b950e1..0000000 --- a/core/src/main/scala/kafka/server/ProducerDelayedOperationPurgatory.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.metrics.KafkaMetricsGroup -import kafka.utils.Pool -import kafka.network.{BoundedByteBufferSend, RequestChannel} - -import java.util.concurrent.TimeUnit - -/** - * The purgatory holding delayed producer requests - */ -class ProducerDelayedOperationPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) - extends DelayedOperationPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { - this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) - - private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup { - val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) - } - - private val producerRequestMetricsForKey = { - val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") - new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory)) - } - - private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics - - private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) { - val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) - List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) - } - - /** - * Check if a specified delayed fetch request is satisfied - */ - def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager) - - /** - * When a delayed produce request expires answer it with possible time out error codes - */ - def expire(delayedProduce: DelayedProduce) { - debug("Expiring produce request %s.".format(delayedProduce.produce)) - for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) - recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition)) - respond(delayedProduce) - } - - // TODO: purgatory should not be responsible for sending back the responses - def respond(delayedProduce: DelayedProduce) { - val response = delayedProduce.respond(offsetManager) - requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response))) - } -} diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala new file mode 100644 index 0000000..d4a7d4a --- /dev/null +++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.Pool +import kafka.network.{BoundedByteBufferSend, RequestChannel} + +import java.util.concurrent.TimeUnit + +/** + * The purgatory holding delayed producer requests + */ +class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) + extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { + this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) + + private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup { + val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS) + } + + private val producerRequestMetricsForKey = { + val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-") + new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory)) + } + + private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics + + private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) { + val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) + List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) + } + + /** + * Check if a specified delayed fetch request is satisfied + */ + def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager) + + /** + * When a delayed produce request expires answer it with possible time out error codes + */ + def expire(delayedProduce: DelayedProduce) { + debug("Expiring produce request %s.".format(delayedProduce.produce)) + for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending) + recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition)) + respond(delayedProduce) + } + + // TODO: purgatory should not be responsible for sending back the responses + def respond(delayedProduce: DelayedProduce) { + val response = delayedProduce.respond(offsetManager) + requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response))) + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d6a8356..06e7108 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -64,8 +64,8 @@ class ReplicaManager(config: KafkaConfig, this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger - var producerRequestPurgatory: ProducerDelayedOperationPurgatory = null - var fetchRequestPurgatory: FetchDelayedOperationPurgatory = null + var producerRequestPurgatory: ProducerRequestPurgatory = null + var fetchRequestPurgatory: FetchRequestPurgatory = null newGauge( "LeaderCount", @@ -105,7 +105,7 @@ class ReplicaManager(config: KafkaConfig, * TODO: will be removed in 0.9 where we refactor server structure */ - def initWithRequestPurgatory(producerRequestPurgatory: ProducerDelayedOperationPurgatory, fetchRequestPurgatory: FetchDelayedOperationPurgatory) { + def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) { this.producerRequestPurgatory = producerRequestPurgatory this.fetchRequestPurgatory = fetchRequestPurgatory } diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala new file mode 100644 index 0000000..dc4ce54 --- /dev/null +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils._ +import kafka.metrics.KafkaMetricsGroup + +import java.util +import java.util.concurrent._ +import java.util.concurrent.atomic._ +import scala.collection._ + +import com.yammer.metrics.core.Gauge + + +/** + * An operation whose processing needs to be delayed for at most the given delayMs; + * upon complete, the given callback function will be triggered. For example a delayed + * message append operation could be waiting for specified number of acks; or a delayed + * message fetch operation could be waiting for a given number of bytes to accumulate. + */ +abstract class DelayedRequest(delayMs: Long, onComplete: Boolean => Unit) extends DelayedItem(delayMs) { + val completed = new AtomicBoolean(false) + + /* + * Check if the delayed operation is already completed + * + * Note that concurrent threads can check if an operation can be completed or not, + * but only the first thread will succeed in completing the operation + */ + def tryComplete(): Boolean = completed.compareAndSet(false, true) + + /* + * When delayMs has elapsed, expire the delayed operation + */ + def onExpired() = onComplete(false) +} + +/** + * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. + * + */ +abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) + extends Logging with KafkaMetricsGroup { + + /* a list of requests watching each key */ + private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + + /* the number of requests being watched, duplicates added on different watchers are also counted */ + private val watched = new AtomicInteger(0) + + /* background thread expiring requests that have been waiting too long */ + private val expirationReaper = new ExpiredOperationReaper + + newGauge( + "PurgatorySize", + new Gauge[Int] { + def value = size() + } + ) + + newGauge( + "NumDelayedOperations", + new Gauge[Int] { + def value = expirationReaper.numOperations + } + ) + + expirationThread.start() + + /** + * Check if the operation can be completed, if not watch it based on the given watch keys + * + * Note that a delayed operation can be watched on multiple keys, and hence due to concurrency may be + * found completed when trying to watch it on some later keys. In this case the operation is still + * treated as completed and hence no longer watched although it is still in the watch lists of + * the earlier keys. Those already watched elements will be later purged by the expire reaper. + * + * @param operation the delayed operation to be checked + * @param watchKeys keys for bookkeeping the operation + * @return true iff the delayed operations can be completed + */ + def tryCompleteElseWatch(operation: DelayedRequest, watchKeys: Seq[Any]): Boolean = { + for(key <- watchKeys) { + val watchers = watchersFor(key) + // if the operation is found completed, stop adding it to any further + // lists and return true immediately + if(!watchers.checkAndMaybeAdd(operation)) { + return true + } + } + + // if it is indeed watched, add to the expire queue also + watched.getAndIncrement() + expirationReaper.enqueue(operation) + + false + + } + + /** + * Return a list of completed operations with the given watch key. + * + * @return the list of completed operations + */ + def getCompleted(key: Any): Seq[T] = { + val watchers = watchersForKey.get(key) + if(watchers == null) + Seq.empty + else + watchers.collectCompletedOperations() + } + + /* + * Return the watch list of the given key + */ + private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) + + /* + * Return the size of the purgatory, which is size of watch lists plus the size of the expire reaper. + * Since an operation may still be in the watch lists even when it has been completed, this number + * may be larger than the number of real operations watched + */ + protected def size() = watchersForKey.values.map(_.numRequests).sum + expirationReaper.numOperations + + /** + * Shutdown the expire reaper thread + */ + def shutdown() { + expirationReaper.shutdown() + } + + /** + * A linked list of watched delayed operations based on some key + */ + private class Watchers { + private val requests = new util.ArrayList[T] + + // potentially add the element to watch if it is not satisfied yet + def checkAndMaybeAdd(t: T): Boolean = { + synchronized { + // if it is already satisfied, return false + if (t.completed.get()) + return false + // if the operation can be completed, return false; otherwise add to watch list + if(t.tryComplete()) { + return false + } else { + requests.add(t) + return true + } + } + } + + // traverse the list and purge satisfied elements + def purgeSatisfied(): Int = { + synchronized { + val iter = requests.iterator() + var purged = 0 + while (iter.hasNext) { + val curr = iter.next + if(curr.completed.get()) { + iter.remove() + purged += 1 + } + } + purged + } + } + + // traverse the list and try to satisfy watched elements + def collectCompletedOperations(): Seq[T] = { + val response = new mutable.ArrayBuffer[T] + synchronized { + val iter = requests.iterator() + while(iter.hasNext) { + val curr = iter.next + if (curr.completed.get()) { + // another thread has completed this request, just remove it + iter.remove() + } else { + val completed = curr.tryComplete() + if(completed) { + iter.remove() + watched.getAndDecrement() + response += curr + expirationReaper.satisfyRequest() + } + } + } + } + response + } + } + + /** + * A background reaper to expire delayed operations that have timed out + */ + private class ExpiredOperationReaper extends ShutdownableThread( + "ExpirationReaper-%d".format(brokerId), + false) { + + /* The queue storing all delayed operations */ + private val delayed = new DelayQueue[T] + + /* + * Return the number of delayed operations kept by the reaper + */ + def numOperations = delayed.size() + + /* + * Add a operation to be expired + */ + def enqueue(t: T) { + delayed.add(t) + unsatisfied.incrementAndGet() + } + + /** + * Get the next expired event + */ + private def pollExpired(): T = { + while (true) { + val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) + if (curr == null) + return null.asInstanceOf[T] + // try set the operation failed (and hence completed), if succeed return it; + // otherwise try to get the next expired operation since this one has been completed by others + if (curr.completed.compareAndSet(false, true)) { + return curr + } + } + throw new RuntimeException("This should not happen") + } + + /** + * Delete all satisfied events from the delay queue and the watcher lists + */ + private def purgeSatisfied(): Int = { + var purged = 0 + + // purge the delayed queue + val iter = delayed.iterator() + while (iter.hasNext) { + val curr = iter.next() + if (curr.completed.get()) { + iter.remove() + purged += 1 + } + } + + purged + } + + + override def doWork() { + val curr = pollExpired() + if (curr != null) { + curr.onExpired() + } + if (size() >= purgeInterval) { // see if we need to force a full purge + debug("Beginning purgatory purge") + val purged = purgeSatisfied() + debug("Purged %d operations from delay queue.".format(purged)) + val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum + debug("Purged %d operations from watch lists.".format(numPurgedFromWatchers)) + } + } + } + +} diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationPurgatoryTest.scala deleted file mode 100644 index c9a5f2e..0000000 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationPurgatoryTest.scala +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import scala.collection._ -import org.junit.Test -import junit.framework.Assert._ -import kafka.message._ -import kafka.api._ -import kafka.utils.TestUtils -import org.scalatest.junit.JUnit3Suite - - -class DelayedOperationPurgatoryTest extends JUnit3Suite { - - val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes))) - val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes))) - var purgatory: MockDelayedOperationPurgatory = null - - override def setUp() { - super.setUp() - purgatory = new MockDelayedOperationPurgatory() - } - - override def tearDown() { - purgatory.shutdown() - super.tearDown() - } - - @Test - def testRequestSatisfaction() { - val r1 = new DelayedRequest(Array("test1"), null, 100000L) - val r2 = new DelayedRequest(Array("test2"), null, 100000L) - assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) - assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) - assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) - assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) - assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) - purgatory.satisfied += r1 - assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) - assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) - purgatory.satisfied += r2 - assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) - assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) - } - - @Test - def testRequestExpiry() { - val expiration = 20L - val r1 = new DelayedRequest(Array("test1"), null, expiration) - val r2 = new DelayedRequest(Array("test1"), null, 200000L) - val start = System.currentTimeMillis - assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) - assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) - purgatory.awaitExpiration(r1) - val elapsed = System.currentTimeMillis - start - assertTrue("r1 expired", purgatory.expired.contains(r1)) - assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2)) - assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) - } - - class MockDelayedOperationPurgatory extends DelayedOperationPurgatory[DelayedRequest] { - val satisfied = mutable.Set[DelayedRequest]() - val expired = mutable.Set[DelayedRequest]() - def awaitExpiration(delayed: DelayedRequest) = { - delayed synchronized { - delayed.wait() - } - } - def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) - def expire(delayed: DelayedRequest) { - expired += delayed - delayed synchronized { - delayed.notify() - } - } - } - -} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala new file mode 100644 index 0000000..a4b7f5b --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import scala.collection._ +import org.junit.Test +import junit.framework.Assert._ +import kafka.message._ +import kafka.api._ +import kafka.utils.TestUtils +import org.scalatest.junit.JUnit3Suite + + +class RequestPurgatoryTest extends JUnit3Suite { + + val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello1".getBytes))) + val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new Message("hello2".getBytes))) + var purgatory: MockDelayedOperationPurgatory = null + + override def setUp() { + super.setUp() + purgatory = new MockDelayedOperationPurgatory() + } + + override def tearDown() { + purgatory.shutdown() + super.tearDown() + } + + @Test + def testRequestSatisfaction() { + val r1 = new DelayedRequest(Array("test1"), null, 100000L) + val r2 = new DelayedRequest(Array("test2"), null, 100000L) + assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size) + assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size) + assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) + assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size) + purgatory.satisfied += r1 + assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1")) + assertEquals("Nothing satisfied", 0, purgatory.update("test1").size) + purgatory.satisfied += r2 + assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2")) + assertEquals("Nothing satisfied", 0, purgatory.update("test2").size) + } + + @Test + def testRequestExpiry() { + val expiration = 20L + val r1 = new DelayedRequest(Array("test1"), null, expiration) + val r2 = new DelayedRequest(Array("test1"), null, 200000L) + val start = System.currentTimeMillis + assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) + assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2)) + purgatory.awaitExpiration(r1) + val elapsed = System.currentTimeMillis - start + assertTrue("r1 expired", purgatory.expired.contains(r1)) + assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2)) + assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) + } + + class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] { + val satisfied = mutable.Set[DelayedRequest]() + val expired = mutable.Set[DelayedRequest]() + def awaitExpiration(delayed: DelayedRequest) = { + delayed synchronized { + delayed.wait() + } + } + def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed) + def expire(delayed: DelayedRequest) { + expired += delayed + delayed synchronized { + delayed.notify() + } + } + } + +} \ No newline at end of file -- 1.7.12.4 From de78bc4150bd6d04958d32af384abe9df7b00f82 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 13 Aug 2014 09:13:06 -0700 Subject: [PATCH 3/5] dummy --- .../src/main/scala/kafka/server/DelayedFetch.scala | 39 ++++++++++++---------- core/src/main/scala/kafka/server/KafkaApis.scala | 16 +++++++-- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index e0f14e2..d367786 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -37,34 +37,39 @@ import scala.collection.Seq * - should return whatever data is available. */ -class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], - override val request: RequestChannel.Request, - override val delayMs: Long, - val fetch: FetchRequest, - private val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata]) - extends DelayedRequest(keys, request, delayMs) { +case class FetchInfo(fetchMinBytes: Int, + fetchOnlyCommitted: Boolean, + fetchStartOffsets: Map[TopicAndPartition, LogOffsetMetadata]) { - def isSatisfied(replicaManager: ReplicaManager) : Boolean = { + override def toString = "FetchInfo [minBytes: " + fetchMinBytes + "] : " + + "[committedOnly: " + fetchOnlyCommitted + "] : " + "[startOffsets: " + fetchStartOffsets + "]" +} + + +class DelayedFetch(delayMs: Long, onComplete: Boolean => Unit, fetchInfo: FetchInfo, replicaManager: ReplicaManager) + extends DelayedRequest(delayMs, onComplete) { + + override def tryComplete() : Boolean = { var accumulatedSize = 0 - val fromFollower = fetch.isFromFollower - partitionFetchOffsets.foreach { + fetchInfo.fetchStartOffsets.foreach { case (topicAndPartition, fetchOffset) => try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) val endOffset = - if (fromFollower) - replica.logEndOffset - else + if (fetchInfo.fetchOnlyCommitted) replica.highWatermark + else + replica.logEndOffset if (endOffset.offsetOnOlderSegment(fetchOffset)) { // Case C, this can happen when the new follower replica fetching on a truncated leader - debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition)) + debug("Satisfying %s since it is fetching later segments of partition %s.".format(fetchInfo, topicAndPartition)) return true } else if (fetchOffset.offsetOnOlderSegment(endOffset)) { // Case C, this can happen when the folloer replica is lagging too much - debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch)) + debug("Satisfying %s immediately since it is fetching older segments.".format(fetchInfo)) return true } else if (fetchOffset.precedes(endOffset)) { accumulatedSize += endOffset.positionDiff(fetchOffset) @@ -72,16 +77,16 @@ class DelayedFetch(override val keys: Seq[TopicPartitionRequestKey], } } catch { case utpe: UnknownTopicOrPartitionException => // Case A - debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetch)) + debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchInfo)) return true case nle: NotLeaderForPartitionException => // Case B - debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetch)) + debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchInfo)) return true } } // Case D - accumulatedSize >= fetch.minBytes + accumulatedSize >= fetchInfo.fetchMinBytes } def respond(replicaManager: ReplicaManager): FetchResponse = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb94673..817b01c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -319,8 +319,20 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, - dataRead.mapValues(_.offset)) + + //val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, + // dataRead.mapValues(_.offset)) + + def callback(succeeded: Boolean) { + if (succeeded) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(FetchResponse(fetch.correlationId, replicaManager.readMessageSets(fetchRequest).mapValues(_.data))))) + } + + val delayedFetch = new DelayedFetch( + fetchRequest.maxWait, + callback, + dataRead.mapValues(_.offset), + replicaManager) + // add the fetch request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) -- 1.7.12.4 From 5768f08695994acbbc0b2a7a071632abb9eeed06 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 13 Aug 2014 15:48:13 -0700 Subject: [PATCH 4/5] wip version 1 --- .../main/scala/kafka/api/ProducerResponse.scala | 3 +- core/src/main/scala/kafka/log/Log.scala | 26 +- .../src/main/scala/kafka/server/DelayedFetch.scala | 25 +- .../main/scala/kafka/server/DelayedProduce.scala | 108 ++++---- core/src/main/scala/kafka/server/KafkaApis.scala | 222 +++------------- .../main/scala/kafka/server/ReplicaManager.scala | 291 ++++++++++++++++----- .../main/scala/kafka/server/RequestPurgatory.scala | 24 +- 7 files changed, 375 insertions(+), 324 deletions(-) diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index a286272..5d1fac4 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -43,8 +43,7 @@ object ProducerResponse { case class ProducerResponseStatus(var error: Short, offset: Long) -case class ProducerResponse(correlationId: Int, - status: Map[TopicAndPartition, ProducerResponseStatus]) +case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse() { /** diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0ddf97b..002c902 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -31,6 +31,21 @@ import scala.collection.JavaConversions import com.yammer.metrics.core.Gauge +/** + * Struct to hold various quantities we compute about each message set before appending to the log + * @param firstOffset The first offset in the message set + * @param lastOffset The last offset in the message set + * @param shallowCount The number of shallow messages + * @param validBytes The number of valid bytes + * @param codec The codec used in the message set + * @param offsetsMonotonic Are the offsets in this message set monotonically increasing + */ +case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) + +object LogAppendInfo { + val UnknownLogAppendInfo = LogAppendInfo(-1, -1, NoCompressionCodec, -1, -1, false) +} + /** * An append-only log for storing messages. @@ -302,17 +317,6 @@ class Log(val dir: File, } /** - * Struct to hold various quantities we compute about each message set before appending to the log - * @param firstOffset The first offset in the message set - * @param lastOffset The last offset in the message set - * @param shallowCount The number of shallow messages - * @param validBytes The number of valid bytes - * @param codec The codec used in the message set - * @param offsetsMonotonic Are the offsets in this message set monotonically increasing - */ - case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean) - - /** * Validate the following: *
    *
  1. each message matches its CRC diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index d367786..4b7542f 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -38,6 +38,7 @@ import scala.collection.Seq */ case class FetchInfo(fetchMinBytes: Int, + fetchOnlyLeader: Boolean, fetchOnlyCommitted: Boolean, fetchStartOffsets: Map[TopicAndPartition, LogOffsetMetadata]) { @@ -47,7 +48,10 @@ case class FetchInfo(fetchMinBytes: Int, } -class DelayedFetch(delayMs: Long, onComplete: Boolean => Unit, fetchInfo: FetchInfo, replicaManager: ReplicaManager) +class DelayedFetch(delayMs: Long, + fetchInfo: FetchInfo, + replicaManager: ReplicaManager, + onComplete: Map[TopicAndPartition, PartitionDataAndOffset] => Unit) extends DelayedRequest(delayMs, onComplete) { override def tryComplete() : Boolean = { @@ -66,11 +70,11 @@ class DelayedFetch(delayMs: Long, onComplete: Boolean => Unit, fetchInfo: FetchI if (endOffset.offsetOnOlderSegment(fetchOffset)) { // Case C, this can happen when the new follower replica fetching on a truncated leader debug("Satisfying %s since it is fetching later segments of partition %s.".format(fetchInfo, topicAndPartition)) - return true + return super.tryComplete() } else if (fetchOffset.offsetOnOlderSegment(endOffset)) { // Case C, this can happen when the folloer replica is lagging too much debug("Satisfying %s immediately since it is fetching older segments.".format(fetchInfo)) - return true + return super.tryComplete() } else if (fetchOffset.precedes(endOffset)) { accumulatedSize += endOffset.positionDiff(fetchOffset) } @@ -78,15 +82,24 @@ class DelayedFetch(delayMs: Long, onComplete: Boolean => Unit, fetchInfo: FetchI } catch { case utpe: UnknownTopicOrPartitionException => // Case A debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetchInfo)) - return true + return super.tryComplete() case nle: NotLeaderForPartitionException => // Case B debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchInfo)) - return true + return super.tryComplete() } } // Case D - accumulatedSize >= fetchInfo.fetchMinBytes + if (accumulatedSize >= fetchInfo.fetchMinBytes) + super.tryComplete() + else + false + } + + override def onExpired() { + // read whatever data is available and return + val readData = replicaManager.readMessageSets(fetch) + onComplete(readData) } def respond(replicaManager: ReplicaManager): FetchResponse = { diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 9481508..99bdf6f 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -35,81 +35,93 @@ import scala.collection.Seq * B.2 - else, at least requiredAcks replicas should be caught up to this request. */ -class DelayedProduce(override val keys: Seq[TopicPartitionRequestKey], - override val request: RequestChannel.Request, - override val delayMs: Long, - val produce: ProducerRequest, - val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus], - val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) - extends DelayedRequest(keys, request, delayMs) with Logging { +case class ProduceStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) { + @volatile var acksPending = false - // first update the acks pending variable according to the error code - partitionStatus foreach { case (topicAndPartition, delayedStatus) => - if (delayedStatus.responseStatus.error == ErrorMapping.NoError) { - // Timeout error state will be cleared when required acks are received - delayedStatus.acksPending = true - delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode - } else { - delayedStatus.acksPending = false - } + override def toString = "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d" + .format(acksPending, responseStatus.error, responseStatus.offset, requiredOffset) +} - trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus)) - } +case class ProduceInfo(produceRequiredAcks: Short, + produceStatus: Map[TopicAndPartition, ProduceStatus]) { - def respond(offsetManager: OffsetManager): RequestOrResponse = { - val responseStatus = partitionStatus.mapValues(status => status.responseStatus) + override def toString = "ProduceInfo [requiredBytes: " + fetchMinBytes + "] : " + + "[partitionStatus: " + produceStatus + "]" +} - val errorCode = responseStatus.find { case (_, status) => - status.error != ErrorMapping.NoError - }.map(_._2.error).getOrElse(ErrorMapping.NoError) +class DelayedProduce(delayMs: Long, + produceInfo: ProduceInfo, + replicaManager: ReplicaManager, + onComplete: Map[TopicAndPartition, ProducerResponseStatus] => Unit) + extends DelayedRequest(delayMs) with Logging { - if (errorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + // first update the acks pending variable according to the error code + produceInfo.produceStatus foreach { case (topicAndPartition, status) => + if (status.responseStatus.error == ErrorMapping.NoError) { + // Timeout error state will be cleared when required acks are received + status.acksPending = true + status.responseStatus.error = ErrorMapping.RequestTimedOutCode + } else { + status.acksPending = false } - val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) - - response + trace("Initial partition status for %s is %s".format(topicAndPartition, status)) } - def isSatisfied(replicaManager: ReplicaManager) = { + def tryComplete(): Boolean = { // check for each partition if it still has pending acks - partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) => + produceInfo.produceStatus.foreach { case (topicAndPartition, status) => trace("Checking producer request satisfaction for %s, acksPending = %b" - .format(topicAndPartition, fetchPartitionStatus.acksPending)) + .format(topicAndPartition, status.acksPending)) // skip those partitions that have already been satisfied - if (fetchPartitionStatus.acksPending) { + if (status.acksPending) { val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => partition.checkEnoughReplicasReachOffset( - fetchPartitionStatus.requiredOffset, - produce.requiredAcks) + status.requiredOffset, + produceInfo.produceRequiredAcks) case None => (false, ErrorMapping.UnknownTopicOrPartitionCode) } if (errorCode != ErrorMapping.NoError) { - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.responseStatus.error = errorCode + status.acksPending = false + status.responseStatus.error = errorCode } else if (hasEnough) { - fetchPartitionStatus.acksPending = false - fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError + status.acksPending = false + status.responseStatus.error = ErrorMapping.NoError } } } // unblocked if there are no partitions with pending acks - val satisfied = ! partitionStatus.exists(p => p._2.acksPending) - satisfied + if (! produceInfo.produceStatus.values.exists(p => p.acksPending)) + super.tryComplete() + else + false } -} -case class DelayedProduceResponseStatus(val requiredOffset: Long, - val responseStatus: ProducerResponseStatus) { - @volatile var acksPending = false + override def onExpired() { + // return the current response status + val responseStatus = produceInfo.produceStatus.mapValues(status => status.responseStatus) + onComplete(responseStatus) + } + + def respond(offsetManager: OffsetManager): RequestOrResponse = { + val responseStatus = partitionStatus.mapValues(status => status.responseStatus) + + val errorCode = responseStatus.find { case (_, status) => + status.error != ErrorMapping.NoError + }.map(_._2.error).getOrElse(ErrorMapping.NoError) - override def toString = - "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( - acksPending, responseStatus.error, responseStatus.offset, requiredOffset) + if (errorCode == ErrorMapping.NoError) { + offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) + } + + val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize)) + .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) + + response + } } + diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 817b01c..949f57e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -151,207 +151,61 @@ class KafkaApis(val requestChannel: RequestChannel, * Handle a produce request or offset commit request (which is really a specialized producer request) */ def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { - val (produceRequest, offsetCommitRequestOpt) = - if (request.requestId == RequestKeys.OffsetCommitKey) { - val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] - (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) - } else { - (request.requestObj.asInstanceOf[ProducerRequest], None) - } - - val sTime = SystemTime.milliseconds - val localProduceResults = appendToLocalLog(produceRequest) - debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - - val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) - - val numPartitionsInError = localProduceResults.count(_.error.isDefined) - if(produceRequest.requiredAcks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since - // no response is expected by the producer the handler will send a close connection response to the socket server - // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata - if (numPartitionsInError != 0) { - info(("Send the close connection response due to error handling produce request " + - "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") - .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) - requestChannel.closeConnection(request.processor, request) - } else { - - if (firstErrorCode == ErrorMapping.NoError) - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) - - if (offsetCommitRequestOpt.isDefined) { - val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) - } else + val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] + + // the callback for sending the response + def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + val numPartitionsInError = responseStatus.values.count(_.error.isDefined) + + if(produceRequest.requiredAcks == 0) { + // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since + // no response is expected by the producer the handler will send a close connection response to the socket server + // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata + if (numPartitionsInError != 0) { + info(("Send the close connection response due to error handling produce request " + + "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") + .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) + requestChannel.closeConnection(request.processor, request) + } else { requestChannel.noOperation(request.processor, request) + } + } else { + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - } else if (produceRequest.requiredAcks == 1 || - produceRequest.numPartitions <= 0 || - numPartitionsInError == produceRequest.numPartitions) { - - if (firstErrorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) - } - - val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap - val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize)) - .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) - - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) - } else { - // create a list of (topic, partition) pairs to use as keys for this delayed request - val producerRequestKeys = produceRequest.data.keys.map( - topicAndPartition => new TopicPartitionRequestKey(topicAndPartition)).toSeq - val statuses = localProduceResults.map(r => - r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap - val delayedRequest = new DelayedProduce( - producerRequestKeys, - request, - produceRequest.ackTimeoutMs.toLong, - produceRequest, - statuses, - offsetCommitRequestOpt) - - // add the produce request for watch if it's not satisfied, otherwise send the response back - val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest) - if (satisfiedByMe) - producerRequestPurgatory.respond(delayedRequest) } + // call the replica manager to append messages to the replicas + replicaManager.appendMessages( + produceRequest.ackTimeoutMs.toLong, + produceRequest.requiredAcks, + produceRequest.data, + sendResponseCallback) + // we do not need the data anymore produceRequest.emptyData() } - case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { - def this(key: TopicAndPartition, throwable: Throwable) = - this(key, -1L, -1L, Some(throwable)) - - def errorCode = error match { - case None => ErrorMapping.NoError - case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]]) - } - } - - /** - * Helper method for handling a parsed producer request - */ - private def appendToLocalLog(producerRequest: ProducerRequest): Iterable[ProduceResult] = { - val partitionAndData: Map[TopicAndPartition, MessageSet] = producerRequest.data - trace("Append [%s] to local log ".format(partitionAndData.toString)) - partitionAndData.map {case (topicAndPartition, messages) => - try { - val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) - val info = partitionOpt match { - case Some(partition) => - partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) - case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" - .format(topicAndPartition, brokerId)) - } - - val numAppendedMessages = if (info.firstOffset == -1L || info.lastOffset == -1L) 0 else (info.lastOffset - info.firstOffset + 1) - - // update stats for successfully appended bytes and messages as bytesInRate and messageInRate - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) - - trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" - .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) - ProduceResult(topicAndPartition, info.firstOffset, info.lastOffset) - } catch { - // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException - // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request - // for a partition it is the leader for - case e: KafkaStorageException => - fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) - Runtime.getRuntime.halt(1) - null - case utpe: UnknownTopicOrPartitionException => - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage)) - new ProduceResult(topicAndPartition, utpe) - case nle: NotLeaderForPartitionException => - warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( - producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) - new ProduceResult(topicAndPartition, nle) - case e: Throwable => - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() - error("Error processing ProducerRequest with correlation id %d from client %s on partition %s" - .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition), e) - new ProduceResult(topicAndPartition, e) - } - } - } - /** * Handle a fetch request */ def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - val dataRead = replicaManager.readMessageSets(fetchRequest) - - // if the fetch request comes from the follower, - // update its corresponding log end offset - if(fetchRequest.isFromFollower) - recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset)) - - // check if this fetch request can be satisfied right away - val bytesReadable = dataRead.values.map(_.data.messages.sizeInBytes).sum - val errorReadingData = dataRead.values.foldLeft(false)((errorIncurred, dataAndOffset) => - errorIncurred || (dataAndOffset.data.error != ErrorMapping.NoError)) - // send the data immediately if 1) fetch request does not want to wait - // 2) fetch request does not require any data - // 3) has enough data to respond - // 4) some error happens while reading data - if(fetchRequest.maxWait <= 0 || - fetchRequest.numPartitions <= 0 || - bytesReadable >= fetchRequest.minBytes || - errorReadingData) { - debug("Returning fetch response %s for fetch request with correlation id %d to client %s" - .format(dataRead.values.map(_.data.error).mkString(","), fetchRequest.correlationId, fetchRequest.clientId)) - val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data)) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) - } else { - debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, - fetchRequest.clientId)) - // create a list of (topic, partition) pairs to use as keys for this delayed request - val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new TopicPartitionRequestKey(_)) - - //val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest.maxWait, fetchRequest, - // dataRead.mapValues(_.offset)) - - def callback(succeeded: Boolean) { - if (succeeded) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(FetchResponse(fetch.correlationId, replicaManager.readMessageSets(fetchRequest).mapValues(_.data))))) - } - val delayedFetch = new DelayedFetch( - fetchRequest.maxWait, - callback, - dataRead.mapValues(_.offset), - replicaManager) + // the callback for sending the response + def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { - - // add the fetch request for watch if it's not satisfied, otherwise send the response back - val satisfiedByMe = fetchRequestPurgatory.checkAndMaybeWatch(delayedFetch) - if (satisfiedByMe) - fetchRequestPurgatory.respond(delayedFetch) + val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - } - - private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { - debug("Record follower log end offsets: %s ".format(offsets)) - offsets.foreach { - case (topicAndPartition, offset) => - replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic, - topicAndPartition.partition, replicaId, offset) - // for producer requests with ack > 1, we need to check - // if they can be unblocked after some follower's log end offsets have moved - replicaManager.unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition)) - } + // call the replica manager to append messages to the replicas + replicaManager.fetchMessages( + fetchRequest.maxWait.toLong, + fetchRequest.replicaId, + fetchRequest.minBytes, + fetchRequest.requestInfo, + sendResponseCallback) } /** diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 06e7108..eb25660 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -20,11 +20,11 @@ import kafka.api._ import kafka.common._ import kafka.utils._ import kafka.cluster.{Broker, Partition, Replica} -import kafka.log.LogManager +import kafka.log.{LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.common.TopicAndPartition -import kafka.message.MessageSet +import kafka.message.{ByteBufferMessageSet, MessageSet} import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} @@ -45,6 +45,19 @@ object ReplicaManager { case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset: LogOffsetMetadata) +case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) { + def errorCode = error match { + case None => ErrorMapping.NoError + case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + } +} + +case class LogReadResult(info: FetchDataInfo, hw: Long, error: Option[Throwable] = None) { + def errorCode = error match { + case None => ErrorMapping.NoError + case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + } +} class ReplicaManager(config: KafkaConfig, time: Time, @@ -237,74 +250,228 @@ class ReplicaManager(config: KafkaConfig, } /** - * Read from all the offset details given and return a map of - * (topic, partition) -> PartitionData + * Append messages to leader replicas of the partition, and wait for replicated to other replicas, + * the callback function will be triggered either when timeout or the required acks are satisfied */ - def readMessageSets(fetchRequest: FetchRequest) = { - val isFetchFromFollower = fetchRequest.isFromFollower - fetchRequest.requestInfo.map - { - case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => - val partitionDataAndOffsetInfo = - try { - val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) - BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes) - if (isFetchFromFollower) { - debug("Partition [%s,%d] received fetch request from follower %d" - .format(topic, partition, fetchRequest.replicaId)) - } - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, fetchInfo.messageSet), fetchInfo.fetchOffset) - } catch { - // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException - // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request - // for a partition it is the leader for - case utpe: UnknownTopicOrPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) - case nle: NotLeaderForPartitionException => - warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( - fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) - case t: Throwable => - BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() - error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s" - .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage)) - new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata) - } - (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) + def appendMessages(timeout: Long, + requiredAcks : Short, + messagesPerPartition: Map[TopicAndPartition, MessageSet], + callbackOnComplete: Map[TopicAndPartition, ProducerResponseStatus] => Unit) { + + val sTime = SystemTime.milliseconds + val localProduceResults = appendToLocalLog(messagesPerPartition) + debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) + + val produceStatus = localProduceResults.mapValues(result => + ProduceStatus( + result.info.lastOffset + 1 // required offset + ProducerResponseStatus(result.errorCode, result.info.firstOffset)) // response status + ) + + if(requiredAcks == 0) { + // if required acks = 0 we can trigger complete immediately + val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) + callbackOnComplete(produceResponseStatus) + } else if (produceRequest.requiredAcks == 1 || + messagesPerPartition.size <= 0 || + localProduceResults.values.count(_.error.isDefined) == produceRequest.numPartitions) { + // if required acks = 1 or all partition appends have failed we can trigger complete immediately + val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) + callbackOnComplete(produceResponseStatus) + } else { + // create delayed produce operation and try to watch it in the purgatory + val delayedRequest = new DelayedProduce(timeout, ProduceInfo(requiredAcks, produceStatus), this, callbackOnComplete) + val producerRequestKeys = messagesPerPartition.keys.map(TopicPartitionRequestKey(_)).toSeq + + val completedByMe = producerRequestPurgatory.tryCompleteElseWatch(delayedRequest, producerRequestKeys) + if (completedByMe) { + val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) + callbackOnComplete(produceResponseStatus) + } + } + } + + /** + * Append the messages to the local replica logs + */ + private def appendToLocalLog(messagesPerPartition: Map[TopicAndPartition, MessageSet]): Map[TopicAndPartition, LogAppendResult] = { + trace("Append [%s] to local log ".format(messagesPerPartition)) + messagesPerPartition.map { case (topicAndPartition, messages) => + try { + val partitionOpt = getPartition(topicAndPartition.topic, topicAndPartition.partition) + val info = partitionOpt match { + case Some(partition) => + partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet]) + case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" + .format(topicAndPartition, brokerId)) + } + + val numAppendedMessages = + if (info.firstOffset == -1L || info.lastOffset == -1L) + 0 + else + info.lastOffset - info.firstOffset + 1 + + // update stats for successfully appended bytes and messages as bytesInRate and messageInRate + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) + BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + + trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" + .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) + (topicAndPartition, LogAppendResult(info)) + } catch { + // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException + // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request + // for a partition it is the leader for + case e: KafkaStorageException => + fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) + Runtime.getRuntime.halt(1) + (topicAndPartition, null) + case utpe: UnknownTopicOrPartitionException => // TODO + warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage)) + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, utpe)) + case nle: NotLeaderForPartitionException => + warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( + producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, nle)) + case e: Throwable => + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + error("Error processing ProducerRequest with correlation id %d from client %s on partition %s" + .format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition), e) + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, e)) + } + } + } + + /** + * Fetch messages from the leader replica, + * the callback function will be triggered either when timeout or required fetch info is satisfied + */ + def fetchMessages(timeout: Long, + replicaId: Int, + fetchMinBytes: Int, + fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], + callbackOnComplete: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { + + val fetchOnlyLeader: Boolean = replicaId != Request.DebuggingConsumerId + val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(fetchRequest.replicaId) + + // read from local logs + val fetchResults = readFromLocalLog(fetchOnlyLeader, fetchOnlyCommitted, fetchInfo) + + // if the fetch comes from the follower, + // update its corresponding log end offset + if(Request.isValidBrokerId(fetchRequest.replicaId)) + recordFollowerLogEndOffsets(replicaId, dataRead.mapValues(_.offset)) + + // check if this fetch request can be satisfied right away + val bytesReadable = fetchResults.values.map(_.info.messageSet.sizeInBytes).sum + val errorReadingData = fetchResults.values.foldLeft(false) ((errorIncurred, readResult) => + errorIncurred || (readResult.errorCode != ErrorMapping.NoError)) + // send the data immediately if 1) fetch request does not want to wait + // 2) fetch request does not require any data + // 3) has enough data to respond + // 4) some error happens while reading data + if(timeout <= 0 || + fetchInfo.size <= 0 || + bytesReadable >= fetchMinBytes || + errorReadingData) { + val fetchPartitionData = fetchResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) + callbackOnComplete(fetchPartitionData) + } else { + val fetchStartOffsets = fetchResults.mapValues(result => result.info.fetchOffset) + val delayedFetch = new DelayedFetch(time, FetchInfo(fetchMinBytes, fetchOnlyLeader, fetchOnlyCommitted, fetchStartOffsets), this, callbackOnComplete) + // create a list of (topic, partition) pairs to use as keys for this delayed request + val delayedFetchKeys = fetchInfo.keys.map(new TopicPartitionRequestKey(_)).toSeq + + // add the fetch request for watch if it's not satisfied, otherwise send the response back + val completedByMe = fetchRequestPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + if (completedByMe) { + // fetch again to get whatever is available + val fetchPartitionData = readFromLocalLog(fetchOnlyLeader, fetchOnlyCommitted, fetchInfo) + .mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) + callbackOnComplete(fetchPartitionData) + } } } /** * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, - partition: Int, - offset: Long, - maxSize: Int, - fromReplicaId: Int): (FetchDataInfo, Long) = { - // check if the current broker is the leader for the partitions - val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) - getReplicaOrException(topic, partition) - else - getLeaderReplicaIfLocal(topic, partition) - trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = - if (Request.isValidBrokerId(fromReplicaId)) - None - else - Some(localReplica.highWatermark.messageOffset) - val fetchInfo = localReplica.log match { - case Some(log) => - log.read(offset, maxSize, maxOffsetOpt) - case None => - error("Leader for partition [%s,%d] does not have a local log".format(topic, partition)) - FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + private def readFromLocalLog(readOnlyIfLeader: Boolean, + readOnlyCommitted: Boolean, + readInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = { + + readInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => + val partitionDataAndOffsetInfo = + try { + trace("Fetching log segment for topic %s, partition %d, offset %ld, size %d" + .format(topic, partition, offset, fetchSize)) + + // decide whether to only fetch from leader + val localReplica = if (readOnlyIfLeader) + getLeaderReplicaIfLocal(topic, partition) + else + getReplicaOrException(topic, partition) + + // decide whether to only fetch committed data (i.e. messages below high watermark) + val maxOffsetOpt = if (readOnlyCommitted) + None + else + Some(localReplica.highWatermark.messageOffset) + + // read on log + val logReadInfo = localReplica.log match { + case Some(log) => + log.read(offset, maxSize, maxOffsetOpt) + case None => + error("Leader for partition [%s,%d] does not have a local log".format(topic, partition)) + FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) + } + + BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(logReadInfo.messageSet.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(logReadInfo.messageSet.sizeInBytes) + + LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, ErrorMapping.NoError) + } catch { + // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException, NotLeaderForPartitionException + // and ReplicaNotAvailableException since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request + // for a partition it is the leader for + case utpe: UnknownTopicOrPartitionException => // TODO + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, utpe) + case nle: NotLeaderForPartitionException => + warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( + fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, nle) + case rnae: ReplicaNotAvailableException => + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, rnae) + case t: Throwable => + BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() + error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s" + .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, t) + } + (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) + } + } + + private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { + debug("Record follower log end offsets: %s ".format(offsets)) + offsets.foreach { + case (topicAndPartition, offset) => + updateReplicaLEOAndPartitionHW(topicAndPartition.topic, topicAndPartition.partition, replicaId, offset) + + // for producer requests with ack > 1, we need to check + // if they can be unblocked after some follower's log end offsets have moved + unblockDelayedProduceRequests(new TopicPartitionRequestKey(topicAndPartition)) } - (fetchInfo, localReplica.highWatermark.messageOffset) } def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) { diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index dc4ce54..cbf32be 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -34,7 +34,7 @@ import com.yammer.metrics.core.Gauge * message append operation could be waiting for specified number of acks; or a delayed * message fetch operation could be waiting for a given number of bytes to accumulate. */ -abstract class DelayedRequest(delayMs: Long, onComplete: Boolean => Unit) extends DelayedItem(delayMs) { +abstract class DelayedRequest(delayMs: Long, onComplete: Any => Unit) extends DelayedItem(delayMs) { val completed = new AtomicBoolean(false) /* @@ -48,7 +48,7 @@ abstract class DelayedRequest(delayMs: Long, onComplete: Boolean => Unit) extend /* * When delayMs has elapsed, expire the delayed operation */ - def onExpired() = onComplete(false) + def onExpired() = onComplete(null) } /** @@ -97,20 +97,25 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt */ def tryCompleteElseWatch(operation: DelayedRequest, watchKeys: Seq[Any]): Boolean = { for(key <- watchKeys) { + // if the operation is already completed, stopping adding it to + // any further lists and return false + if (operation.completed.get()) + return false val watchers = watchersFor(key) - // if the operation is found completed, stop adding it to any further - // lists and return true immediately - if(!watchers.checkAndMaybeAdd(operation)) { + // if the operation is completed by myself, stop adding it to + // any further lists and return true immediately + if(! watchers.checkAndMaybeAdd(operation)) { return true } } // if it is indeed watched, add to the expire queue also - watched.getAndIncrement() - expirationReaper.enqueue(operation) + if (! operation.completed.get()) { + watched.getAndIncrement() + expirationReaper.enqueue(operation) + } false - } /** @@ -154,9 +159,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt // potentially add the element to watch if it is not satisfied yet def checkAndMaybeAdd(t: T): Boolean = { synchronized { - // if it is already satisfied, return false - if (t.completed.get()) - return false // if the operation can be completed, return false; otherwise add to watch list if(t.tryComplete()) { return false -- 1.7.12.4 From 00dc80c9b0cfd66b353fb49882a825321b2f1d82 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 13 Aug 2014 16:08:14 -0700 Subject: [PATCH 5/5] removal respond from delayed requests --- core/src/main/scala/kafka/server/DelayedFetch.scala | 5 ----- core/src/main/scala/kafka/server/DelayedProduce.scala | 17 ----------------- 2 files changed, 22 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 4b7542f..40abebe 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -101,9 +101,4 @@ class DelayedFetch(delayMs: Long, val readData = replicaManager.readMessageSets(fetch) onComplete(readData) } - - def respond(replicaManager: ReplicaManager): FetchResponse = { - val topicData = replicaManager.readMessageSets(fetch) - FetchResponse(fetch.correlationId, topicData.mapValues(_.data)) - } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 99bdf6f..1b6ad16 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -106,22 +106,5 @@ class DelayedProduce(delayMs: Long, val responseStatus = produceInfo.produceStatus.mapValues(status => status.responseStatus) onComplete(responseStatus) } - - def respond(offsetManager: OffsetManager): RequestOrResponse = { - val responseStatus = partitionStatus.mapValues(status => status.responseStatus) - - val errorCode = responseStatus.find { case (_, status) => - status.error != ErrorMapping.NoError - }.map(_._2.error).getOrElse(ErrorMapping.NoError) - - if (errorCode == ErrorMapping.NoError) { - offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) - } - - val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize)) - .getOrElse(ProducerResponse(produce.correlationId, responseStatus)) - - response - } } -- 1.7.12.4