From 95cda120fddd3b307ff59d86aa2236033ea54655 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Tue, 10 Mar 2015 09:32:49 -0700 Subject: [PATCH 1/8] purgatory micro benchmark --- .../other/kafka/TestPurgatoryPerformance.scala | 199 +++++++++++++++++++++ 1 file changed, 199 insertions(+) create mode 100644 core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala new file mode 100644 index 0000000..28c7ad3 --- /dev/null +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka + +import java.util.Random +import java.util.concurrent.CountDownLatch + +import joptsimple._ +import kafka.server.{DelayedOperationPurgatory, DelayedOperation} +import kafka.utils._ + +/** + * This is a benchmark test of the purgatory. + */ +object TestPurgatoryPerformance { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val numRequestsOpt = parser.accepts("num", "The number of requests") + .withRequiredArg + .describedAs("num_requests") + .ofType(classOf[java.lang.Double]) + val requestRateOpt = parser.accepts("rate", "The request rate") + .withRequiredArg + .describedAs("request_per_second") + .ofType(classOf[java.lang.Double]) + val requestDataSizeOpt = parser.accepts("size", "The request data size") + .withRequiredArg + .describedAs("num_bytes") + .ofType(classOf[java.lang.Long]) + val timeoutOpt = parser.accepts("timeout", "The request timeout") + .withRequiredArg + .describedAs("timeout_milliseconds") + .ofType(classOf[java.lang.Long]) + val pct75Opt = parser.accepts("pct75", "75th percentile of request latency (log-normal distribution)") + .withRequiredArg + .describedAs("75th_percentile") + .ofType(classOf[java.lang.Double]) + val pct50Opt = parser.accepts("pct50", "50th percentile of request latency (log-normal distribution)") + .withRequiredArg + .describedAs("50th_percentile") + .ofType(classOf[java.lang.Double]) + val verboseOpt = parser.accepts("verbose", "show additional information") + .withRequiredArg + .describedAs("true|false") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(true) + + val options = parser.parse(args : _*) + + CommandLineUtils.checkRequiredArgs(parser, options, numRequestsOpt, requestRateOpt, requestDataSizeOpt, pct75Opt, pct50Opt) + + val numRequests = options.valueOf(numRequestsOpt).intValue + val requestRate = options.valueOf(requestRateOpt).doubleValue + val requestDataSize = options.valueOf(requestDataSizeOpt).intValue + val timeout = options.valueOf(timeoutOpt).longValue + val pct75 = options.valueOf(pct75Opt).doubleValue + val pct50 = options.valueOf(pct50Opt).doubleValue + val verbose = options.valueOf(verboseOpt).booleanValue + + val latencySamples = new LatencySamples(1000000, pct75, pct50) + val intervalSamples = new IntervalSamples(1000000, requestRate) + + val purgatory = new DelayedOperationPurgatory[FakeOperation]("fake purgatory") + + val latch = new CountDownLatch(numRequests) + val start = System.currentTimeMillis + var qtime = start + var end = 0L + var due = start + val generator = new Runnable { + def run(): Unit = { + var i = numRequests + while (i > 0) { + i -= 1 + val interval = intervalSamples.next() + val latency = latencySamples.next() + val now = System.currentTimeMillis + qtime = qtime + interval + due = math.max(due, qtime + math.min(latency, timeout)) + + if (qtime > now) Thread.sleep(qtime - now) + + val request = new FakeOperation(timeout, requestDataSize, latency, latch) + purgatory.tryCompleteElseWatch(request, Seq("fakeKey1", "fakeKey2", "fakeKey3")) + } + end = System.currentTimeMillis + } + } + val generatorThread = new Thread(generator) + generatorThread.start() + generatorThread.join() + latch.await() + val done = System.currentTimeMillis + + if (verbose) { + latencySamples.printlnStats() + intervalSamples.printStats() + println(s"# enqueue rate ($numRequests requests):") + println(s"# target actual") + } + + val targetRate = numRequests.toDouble * 1000d / (qtime - start).toDouble + val actualRate = numRequests.toDouble * 1000d / (end - start).toDouble + + println(s"$targetRate $actualRate") + + purgatory.shutdown() + } + + class LogNormalDistribution(mu: Double, sigma: Double) { + val rand = new Random + def next(): Double = { + val n = rand.nextGaussian() * sigma + mu + math.exp(n) + } + } + + class ExponentialDistribution(lambda: Double) { + val rand = new Random + def next(): Double = { + math.log(1d - rand.nextDouble()) / (- lambda) + } + } + + class LatencySamples(sampleSize: Int, pct75: Double, pct50: Double) { + private[this] val rand = new Random + private[this] val samples = { + val normalMean = math.log(pct50) + val normalStDev = (math.log(pct75) - normalMean) / 0.674490d + val dist = new LogNormalDistribution(normalMean, normalStDev) + (0 until sampleSize).map { _ => dist.next().toLong }.toArray + } + def next() = samples(rand.nextInt(sampleSize)) + + def printlnStats(): Unit = { + val p75 = samples.sorted.apply((sampleSize.toDouble * 0.75d).toInt) + val p50 = samples.sorted.apply((sampleSize.toDouble * 0.5d).toInt) + + println(s"# latency samples: pct75 = $p75, pct50 = $p50, min = ${samples.min}, max = ${samples.max}") + } + } + + class IntervalSamples(sampleSize: Int, requestPerSecond: Double) { + private[this] val rand = new Random + private[this] val samples = { + val dist = new ExponentialDistribution(requestPerSecond / 1000d) + var residue = 0.0 + (0 until sampleSize).map { _ => + val interval = dist.next() + residue + val roundedInterval = interval.toLong + residue = interval - roundedInterval.toDouble + roundedInterval + }.toArray + } + + def next() = samples(rand.nextInt(sampleSize)) + + def printStats(): Unit = { + println(s"# interval samples: rate = ${1000d / (samples.map(_.toDouble).sum / sampleSize.toDouble)}, min = ${samples.min}, max = ${samples.max}") + } + } + + class FakeOperation(delayMs: Long, size: Int, latencyMs: Long, latch: CountDownLatch) extends DelayedOperation(delayMs) { + private[this] val data = new Array[Byte](size) + private[this] val completesAt = System.currentTimeMillis + latencyMs + + def onExpiration(): Unit = {} + def onComplete(): Unit = { + latch.countDown() + } + def tryComplete(): Boolean = { + if (System.currentTimeMillis >= completesAt) + forceComplete() + else + false + } + override def isCompleted(): Boolean = { + if (System.currentTimeMillis >= completesAt) forceComplete() + super.isCompleted() + } + } + +} -- 2.3.0 From 61787b15ee324d06794f4a61ee96f94c4fab04e4 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Mon, 16 Mar 2015 13:17:12 -0700 Subject: [PATCH 2/8] purgatory micro benchmark --- .../other/kafka/TestPurgatoryPerformance.scala | 49 +++++++++++++++------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index 28c7ad3..acf47e1 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -43,6 +43,11 @@ object TestPurgatoryPerformance { .withRequiredArg .describedAs("num_bytes") .ofType(classOf[java.lang.Long]) + val numKeysOpt = parser.accepts("keys", "The number of keys") + .withRequiredArg + .describedAs("num_keys") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) val timeoutOpt = parser.accepts("timeout", "The request timeout") .withRequiredArg .describedAs("timeout_milliseconds") @@ -68,6 +73,7 @@ object TestPurgatoryPerformance { val numRequests = options.valueOf(numRequestsOpt).intValue val requestRate = options.valueOf(requestRateOpt).doubleValue val requestDataSize = options.valueOf(requestDataSizeOpt).intValue + val numKeys = options.valueOf(numKeysOpt).intValue val timeout = options.valueOf(timeoutOpt).longValue val pct75 = options.valueOf(pct75Opt).doubleValue val pct50 = options.valueOf(pct50Opt).doubleValue @@ -80,24 +86,23 @@ object TestPurgatoryPerformance { val latch = new CountDownLatch(numRequests) val start = System.currentTimeMillis - var qtime = start - var end = 0L - var due = start + @volatile var requestArrivalTime = start + @volatile var end = 0L val generator = new Runnable { + val keys = (0 until numKeys).map(i => s"fakeKey$i") def run(): Unit = { var i = numRequests while (i > 0) { i -= 1 - val interval = intervalSamples.next() - val latency = latencySamples.next() + val requestArrivalInterval = intervalSamples.next() + val latencyToComplete = latencySamples.next() val now = System.currentTimeMillis - qtime = qtime + interval - due = math.max(due, qtime + math.min(latency, timeout)) + requestArrivalTime = requestArrivalTime + requestArrivalInterval - if (qtime > now) Thread.sleep(qtime - now) + if (requestArrivalTime > now) Thread.sleep(requestArrivalTime - now) - val request = new FakeOperation(timeout, requestDataSize, latency, latch) - purgatory.tryCompleteElseWatch(request, Seq("fakeKey1", "fakeKey2", "fakeKey3")) + val request = new FakeOperation(timeout, requestDataSize, latencyToComplete, latch) + purgatory.tryCompleteElseWatch(request, keys) } end = System.currentTimeMillis } @@ -115,7 +120,7 @@ object TestPurgatoryPerformance { println(s"# target actual") } - val targetRate = numRequests.toDouble * 1000d / (qtime - start).toDouble + val targetRate = numRequests.toDouble * 1000d / (requestArrivalTime - start).toDouble val actualRate = numRequests.toDouble * 1000d / (end - start).toDouble println(s"$targetRate $actualRate") @@ -123,7 +128,10 @@ object TestPurgatoryPerformance { purgatory.shutdown() } - class LogNormalDistribution(mu: Double, sigma: Double) { + // log-normal distribution (http://en.wikipedia.org/wiki/Log-normal_distribution) + // mu: the mean of the underlying normal distribution (not the mean of this log-normal distribution) + // sigma: the standard deviation of the underlying normal distribution (not the stdev of this log-normal distribution) + private class LogNormalDistribution(mu: Double, sigma: Double) { val rand = new Random def next(): Double = { val n = rand.nextGaussian() * sigma + mu @@ -131,14 +139,20 @@ object TestPurgatoryPerformance { } } - class ExponentialDistribution(lambda: Double) { + // exponential distribution (http://en.wikipedia.org/wiki/Exponential_distribution) + // lambda : the rate parameter of the exponential distribution + private class ExponentialDistribution(lambda: Double) { val rand = new Random def next(): Double = { math.log(1d - rand.nextDouble()) / (- lambda) } } - class LatencySamples(sampleSize: Int, pct75: Double, pct50: Double) { + // Samples of Latencies to completion + // They are drawn from a log normal distribution. + // A latency value can never be negative. A log-normal distribution is a convenient why to + // model such a random variable. + private class LatencySamples(sampleSize: Int, pct75: Double, pct50: Double) { private[this] val rand = new Random private[this] val samples = { val normalMean = math.log(pct50) @@ -156,7 +170,10 @@ object TestPurgatoryPerformance { } } - class IntervalSamples(sampleSize: Int, requestPerSecond: Double) { + // Samples of Request arrival intervals + // The request arrival is modeled as a Poisson process. + // So, the internals are drawn from an exponential distribution. + private class IntervalSamples(sampleSize: Int, requestPerSecond: Double) { private[this] val rand = new Random private[this] val samples = { val dist = new ExponentialDistribution(requestPerSecond / 1000d) @@ -176,7 +193,7 @@ object TestPurgatoryPerformance { } } - class FakeOperation(delayMs: Long, size: Int, latencyMs: Long, latch: CountDownLatch) extends DelayedOperation(delayMs) { + private class FakeOperation(delayMs: Long, size: Int, latencyMs: Long, latch: CountDownLatch) extends DelayedOperation(delayMs) { private[this] val data = new Array[Byte](size) private[this] val completesAt = System.currentTimeMillis + latencyMs -- 2.3.0 From 941f6518b3e08351f66501f794acdd992be4c413 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Mon, 16 Mar 2015 14:12:58 -0700 Subject: [PATCH 3/8] purgatory micro benchmark --- core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index acf47e1..ec73e2f 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -116,14 +116,14 @@ object TestPurgatoryPerformance { if (verbose) { latencySamples.printlnStats() intervalSamples.printStats() - println(s"# enqueue rate ($numRequests requests):") - println(s"# target actual") + println("# enqueue rate (%d requests):".format(numRequests)) + println("# target actual") } val targetRate = numRequests.toDouble * 1000d / (requestArrivalTime - start).toDouble val actualRate = numRequests.toDouble * 1000d / (end - start).toDouble - println(s"$targetRate $actualRate") + println("%f %f".format(targetRate, actualRate)) purgatory.shutdown() } -- 2.3.0 From b7550b3049db2b10272cc21e6659841d3dcc308f Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Mon, 16 Mar 2015 14:38:27 -0700 Subject: [PATCH 4/8] purgatory micro benchmark --- .../test/scala/other/kafka/TestPurgatoryPerformance.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index ec73e2f..8d275cf 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -89,7 +89,7 @@ object TestPurgatoryPerformance { @volatile var requestArrivalTime = start @volatile var end = 0L val generator = new Runnable { - val keys = (0 until numKeys).map(i => s"fakeKey$i") + val keys = (0 until numKeys).map(i => "fakeKey%d".format(i)) def run(): Unit = { var i = numRequests while (i > 0) { @@ -114,7 +114,7 @@ object TestPurgatoryPerformance { val done = System.currentTimeMillis if (verbose) { - latencySamples.printlnStats() + latencySamples.printStats() intervalSamples.printStats() println("# enqueue rate (%d requests):".format(numRequests)) println("# target actual") @@ -162,11 +162,11 @@ object TestPurgatoryPerformance { } def next() = samples(rand.nextInt(sampleSize)) - def printlnStats(): Unit = { + def printStats(): Unit = { val p75 = samples.sorted.apply((sampleSize.toDouble * 0.75d).toInt) val p50 = samples.sorted.apply((sampleSize.toDouble * 0.5d).toInt) - println(s"# latency samples: pct75 = $p75, pct50 = $p50, min = ${samples.min}, max = ${samples.max}") + println("# latency samples: pct75 = %d, pct50 = %d, min = %d, max = %d".format(p75, p50, samples.min, samples.max)) } } @@ -189,7 +189,10 @@ object TestPurgatoryPerformance { def next() = samples(rand.nextInt(sampleSize)) def printStats(): Unit = { - println(s"# interval samples: rate = ${1000d / (samples.map(_.toDouble).sum / sampleSize.toDouble)}, min = ${samples.min}, max = ${samples.max}") + println( + "# interval samples: rate = %f, min = %d, max = %d" + .format(1000d / (samples.map(_.toDouble).sum / sampleSize.toDouble), samples.min, samples.max) + ) } } -- 2.3.0 From 48e94f215e733b817b64f5473a504a7948d77fbd Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Wed, 18 Mar 2015 13:10:31 -0700 Subject: [PATCH 5/8] cpu and gc --- .../other/kafka/TestPurgatoryPerformance.scala | 25 ++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index 8d275cf..aeb3218 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -17,6 +17,7 @@ package kafka +import java.lang.management.ManagementFactory import java.util.Random import java.util.concurrent.CountDownLatch @@ -24,6 +25,9 @@ import joptsimple._ import kafka.server.{DelayedOperationPurgatory, DelayedOperation} import kafka.utils._ +import scala.util.Try +import scala.collection.JavaConversions._ + /** * This is a benchmark test of the purgatory. */ @@ -79,11 +83,22 @@ object TestPurgatoryPerformance { val pct50 = options.valueOf(pct50Opt).doubleValue val verbose = options.valueOf(verboseOpt).booleanValue + val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().sortBy(_.getName) + val osMXBean = Try(ManagementFactory.getOperatingSystemMXBean().asInstanceOf[com.sun.management.OperatingSystemMXBean]).toOption + val latencySamples = new LatencySamples(1000000, pct75, pct50) val intervalSamples = new IntervalSamples(1000000, requestRate) val purgatory = new DelayedOperationPurgatory[FakeOperation]("fake purgatory") + val gcNames = gcMXBeans.map(_.getName) + + System.gc() + + val initialCpuTime = osMXBean.map(_.getProcessCpuTime) + val initialGcCounts = gcMXBeans.map(_.getCollectionCount) + val initialGcTimes = gcMXBeans.map(_.getCollectionTime) + val latch = new CountDownLatch(numRequests) val start = System.currentTimeMillis @volatile var requestArrivalTime = start @@ -117,13 +132,19 @@ object TestPurgatoryPerformance { latencySamples.printStats() intervalSamples.printStats() println("# enqueue rate (%d requests):".format(numRequests)) - println("# target actual") + val gcCountHeader = gcNames.map("<" + _ + " count>").mkString(" ") + val gcTimeHeader = gcNames.map("<" + _ + " time>").mkString(" ") + println("# %s %s".format(gcCountHeader, gcTimeHeader)) } val targetRate = numRequests.toDouble * 1000d / (requestArrivalTime - start).toDouble val actualRate = numRequests.toDouble * 1000d / (end - start).toDouble - println("%f %f".format(targetRate, actualRate)) + val cpuTime = osMXBean.map(x => (x.getProcessCpuTime - initialCpuTime.get).toDouble / 1000d) + val gcCounts = (initialGcCounts zip gcMXBeans.map(_.getCollectionCount)).map { case (ini, fin) => fin - ini } + val gcTimes = (initialGcTimes zip gcMXBeans.map(_.getCollectionTime)).map { case (ini, fin) => fin - ini } + + println("%d %f %f %f %s %s".format(done - start, targetRate, actualRate, cpuTime.getOrElse(-1d), gcCounts.mkString(" "), gcTimes.mkString(" "))) purgatory.shutdown() } -- 2.3.0 From 387bc6703cf9eb9d573029da80f967db935d26ff Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Wed, 18 Mar 2015 21:02:12 -0700 Subject: [PATCH 6/8] actually completes requests --- .../other/kafka/TestPurgatoryPerformance.scala | 68 +++++++++++++++++----- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index aeb3218..9f15fac 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -19,12 +19,13 @@ package kafka import java.lang.management.ManagementFactory import java.util.Random -import java.util.concurrent.CountDownLatch +import java.util.concurrent._ import joptsimple._ import kafka.server.{DelayedOperationPurgatory, DelayedOperation} import kafka.utils._ +import scala.math._ import scala.util.Try import scala.collection.JavaConversions._ @@ -90,21 +91,16 @@ object TestPurgatoryPerformance { val intervalSamples = new IntervalSamples(1000000, requestRate) val purgatory = new DelayedOperationPurgatory[FakeOperation]("fake purgatory") + val queue = new CompletionQueue() val gcNames = gcMXBeans.map(_.getName) - System.gc() - - val initialCpuTime = osMXBean.map(_.getProcessCpuTime) - val initialGcCounts = gcMXBeans.map(_.getCollectionCount) - val initialGcTimes = gcMXBeans.map(_.getCollectionTime) - val latch = new CountDownLatch(numRequests) val start = System.currentTimeMillis + val keys = (0 until numKeys).map(i => "fakeKey%d".format(i)) @volatile var requestArrivalTime = start @volatile var end = 0L val generator = new Runnable { - val keys = (0 until numKeys).map(i => "fakeKey%d".format(i)) def run(): Unit = { var i = numRequests while (i > 0) { @@ -117,16 +113,19 @@ object TestPurgatoryPerformance { if (requestArrivalTime > now) Thread.sleep(requestArrivalTime - now) val request = new FakeOperation(timeout, requestDataSize, latencyToComplete, latch) + if (latencyToComplete < timeout) queue.add(request) purgatory.tryCompleteElseWatch(request, keys) } end = System.currentTimeMillis } } val generatorThread = new Thread(generator) + generatorThread.start() generatorThread.join() latch.await() val done = System.currentTimeMillis + queue.shutdown() if (verbose) { latencySamples.printStats() @@ -140,9 +139,9 @@ object TestPurgatoryPerformance { val targetRate = numRequests.toDouble * 1000d / (requestArrivalTime - start).toDouble val actualRate = numRequests.toDouble * 1000d / (end - start).toDouble - val cpuTime = osMXBean.map(x => (x.getProcessCpuTime - initialCpuTime.get).toDouble / 1000d) - val gcCounts = (initialGcCounts zip gcMXBeans.map(_.getCollectionCount)).map { case (ini, fin) => fin - ini } - val gcTimes = (initialGcTimes zip gcMXBeans.map(_.getCollectionTime)).map { case (ini, fin) => fin - ini } + val cpuTime = osMXBean.map(x => x.getProcessCpuTime.toDouble / 1000d) + val gcCounts = gcMXBeans.map(_.getCollectionCount) + val gcTimes = gcMXBeans.map(_.getCollectionTime) println("%d %f %f %f %s %s".format(done - start, targetRate, actualRate, cpuTime.getOrElse(-1d), gcCounts.mkString(" "), gcTimes.mkString(" "))) @@ -217,24 +216,67 @@ object TestPurgatoryPerformance { } } - private class FakeOperation(delayMs: Long, size: Int, latencyMs: Long, latch: CountDownLatch) extends DelayedOperation(delayMs) { + private class FakeOperation(delayMs: Long, size: Int, val latencyMs: Long, latch: CountDownLatch) extends DelayedOperation(delayMs) { private[this] val data = new Array[Byte](size) - private[this] val completesAt = System.currentTimeMillis + latencyMs + val completesAt = System.currentTimeMillis + latencyMs def onExpiration(): Unit = {} + def onComplete(): Unit = { latch.countDown() } + def tryComplete(): Boolean = { if (System.currentTimeMillis >= completesAt) forceComplete() else false } + override def isCompleted(): Boolean = { if (System.currentTimeMillis >= completesAt) forceComplete() super.isCompleted() } } + private class CompletionQueue { + + @volatile private[this] var stop = false + private[this] val delayQueue = new DelayQueue[Scheduled]() + private[this] val thread = new Thread(new Runnable { + def run(): Unit = { + while (!stop) { + var scheduled = delayQueue.poll(100, TimeUnit.MILLISECONDS) + while (scheduled != null) { + scheduled.operation.tryComplete() + scheduled = delayQueue.poll() + } + } + } + }) + thread.start() + + def add(operation: FakeOperation): Unit = { + delayQueue.offer(new Scheduled(operation)) + } + + def shutdown() = { + stop = true + } + + private class Scheduled(val operation: FakeOperation) extends Delayed { + def getDelay(unit: TimeUnit): Long = { + unit.convert(max(operation.completesAt - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) + } + + def compareTo(d: Delayed): Int = { + + val other = d.asInstanceOf[Scheduled] + + if (operation.completesAt < other.operation.completesAt) -1 + else if (operation.completesAt > other.operation.completesAt) 1 + else 0 + } + } + } } -- 2.3.0 From 2698192b6f4abbb836b2099219d97da70d7f264a Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 19 Mar 2015 16:27:04 -0700 Subject: [PATCH 7/8] actively completes operations in a separate thread --- core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index 9f15fac..61ef9e5 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -95,6 +95,7 @@ object TestPurgatoryPerformance { val gcNames = gcMXBeans.map(_.getName) + val initialCpuTimeNano = osMXBean.map(x => x.getProcessCpuTime) val latch = new CountDownLatch(numRequests) val start = System.currentTimeMillis val keys = (0 until numKeys).map(i => "fakeKey%d".format(i)) @@ -139,11 +140,11 @@ object TestPurgatoryPerformance { val targetRate = numRequests.toDouble * 1000d / (requestArrivalTime - start).toDouble val actualRate = numRequests.toDouble * 1000d / (end - start).toDouble - val cpuTime = osMXBean.map(x => x.getProcessCpuTime.toDouble / 1000d) + val cpuTime = osMXBean.map(x => (x.getProcessCpuTime - initialCpuTimeNano.get) / 1000000L) val gcCounts = gcMXBeans.map(_.getCollectionCount) val gcTimes = gcMXBeans.map(_.getCollectionTime) - println("%d %f %f %f %s %s".format(done - start, targetRate, actualRate, cpuTime.getOrElse(-1d), gcCounts.mkString(" "), gcTimes.mkString(" "))) + println("%d %f %f %d %s %s".format(done - start, targetRate, actualRate, cpuTime.getOrElse(-1L), gcCounts.mkString(" "), gcTimes.mkString(" "))) purgatory.shutdown() } -- 2.3.0 From a475b7e0f34b73bed4edaf5cc5f85a900e5e21f2 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Tue, 31 Mar 2015 17:28:56 -0700 Subject: [PATCH 8/8] review comments --- .../other/kafka/TestPurgatoryPerformance.scala | 93 ++++++++++------------ 1 file changed, 44 insertions(+), 49 deletions(-) diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index 61ef9e5..539e596 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -26,7 +26,6 @@ import kafka.server.{DelayedOperationPurgatory, DelayedOperation} import kafka.utils._ import scala.math._ -import scala.util.Try import scala.collection.JavaConversions._ /** @@ -37,41 +36,41 @@ object TestPurgatoryPerformance { def main(args: Array[String]): Unit = { val parser = new OptionParser val numRequestsOpt = parser.accepts("num", "The number of requests") - .withRequiredArg - .describedAs("num_requests") - .ofType(classOf[java.lang.Double]) + .withRequiredArg + .describedAs("num_requests") + .ofType(classOf[java.lang.Double]) val requestRateOpt = parser.accepts("rate", "The request rate") - .withRequiredArg - .describedAs("request_per_second") - .ofType(classOf[java.lang.Double]) + .withRequiredArg + .describedAs("request_per_second") + .ofType(classOf[java.lang.Double]) val requestDataSizeOpt = parser.accepts("size", "The request data size") - .withRequiredArg - .describedAs("num_bytes") - .ofType(classOf[java.lang.Long]) + .withRequiredArg + .describedAs("num_bytes") + .ofType(classOf[java.lang.Long]) val numKeysOpt = parser.accepts("keys", "The number of keys") - .withRequiredArg - .describedAs("num_keys") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3) + .withRequiredArg + .describedAs("num_keys") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) val timeoutOpt = parser.accepts("timeout", "The request timeout") - .withRequiredArg - .describedAs("timeout_milliseconds") - .ofType(classOf[java.lang.Long]) + .withRequiredArg + .describedAs("timeout_milliseconds") + .ofType(classOf[java.lang.Long]) val pct75Opt = parser.accepts("pct75", "75th percentile of request latency (log-normal distribution)") - .withRequiredArg - .describedAs("75th_percentile") - .ofType(classOf[java.lang.Double]) + .withRequiredArg + .describedAs("75th_percentile") + .ofType(classOf[java.lang.Double]) val pct50Opt = parser.accepts("pct50", "50th percentile of request latency (log-normal distribution)") - .withRequiredArg - .describedAs("50th_percentile") - .ofType(classOf[java.lang.Double]) + .withRequiredArg + .describedAs("50th_percentile") + .ofType(classOf[java.lang.Double]) val verboseOpt = parser.accepts("verbose", "show additional information") - .withRequiredArg - .describedAs("true|false") - .ofType(classOf[java.lang.Boolean]) - .defaultsTo(true) + .withRequiredArg + .describedAs("true|false") + .ofType(classOf[java.lang.Boolean]) + .defaultsTo(true) - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, numRequestsOpt, requestRateOpt, requestDataSizeOpt, pct75Opt, pct50Opt) @@ -85,8 +84,11 @@ object TestPurgatoryPerformance { val verbose = options.valueOf(verboseOpt).booleanValue val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().sortBy(_.getName) - val osMXBean = Try(ManagementFactory.getOperatingSystemMXBean().asInstanceOf[com.sun.management.OperatingSystemMXBean]).toOption - + val osMXBean = try { + Some(ManagementFactory.getOperatingSystemMXBean().asInstanceOf[com.sun.management.OperatingSystemMXBean]) + } catch { + case _: Throwable => None + } val latencySamples = new LatencySamples(1000000, pct75, pct50) val intervalSamples = new IntervalSamples(1000000, requestRate) @@ -133,8 +135,8 @@ object TestPurgatoryPerformance { intervalSamples.printStats() println("# enqueue rate (%d requests):".format(numRequests)) val gcCountHeader = gcNames.map("<" + _ + " count>").mkString(" ") - val gcTimeHeader = gcNames.map("<" + _ + " time>").mkString(" ") - println("# %s %s".format(gcCountHeader, gcTimeHeader)) + val gcTimeHeader = gcNames.map("<" + _ + " time ms>").mkString(" ") + println("# %s %s".format(gcCountHeader, gcTimeHeader)) } val targetRate = numRequests.toDouble * 1000d / (requestArrivalTime - start).toDouble @@ -171,13 +173,13 @@ object TestPurgatoryPerformance { // Samples of Latencies to completion // They are drawn from a log normal distribution. - // A latency value can never be negative. A log-normal distribution is a convenient why to + // A latency value can never be negative. A log-normal distribution is a convenient way to // model such a random variable. private class LatencySamples(sampleSize: Int, pct75: Double, pct50: Double) { private[this] val rand = new Random private[this] val samples = { val normalMean = math.log(pct50) - val normalStDev = (math.log(pct75) - normalMean) / 0.674490d + val normalStDev = (math.log(pct75) - normalMean) / 0.674490d // 0.674490 is 75th percentile point in N(0,1) val dist = new LogNormalDistribution(normalMean, normalStDev) (0 until sampleSize).map { _ => dist.next().toLong }.toArray } @@ -233,28 +235,20 @@ object TestPurgatoryPerformance { else false } - - override def isCompleted(): Boolean = { - if (System.currentTimeMillis >= completesAt) forceComplete() - super.isCompleted() - } } private class CompletionQueue { @volatile private[this] var stop = false private[this] val delayQueue = new DelayQueue[Scheduled]() - private[this] val thread = new Thread(new Runnable { - def run(): Unit = { - while (!stop) { - var scheduled = delayQueue.poll(100, TimeUnit.MILLISECONDS) - while (scheduled != null) { - scheduled.operation.tryComplete() - scheduled = delayQueue.poll() - } + private[this] val thread = new ShutdownableThread("completion thread") { + override def doWork(): Unit = { + val scheduled = delayQueue.poll(100, TimeUnit.MILLISECONDS) + if (scheduled != null) { + scheduled.operation.forceComplete() } } - }) + } thread.start() def add(operation: FakeOperation): Unit = { @@ -262,7 +256,8 @@ object TestPurgatoryPerformance { } def shutdown() = { - stop = true + thread.shutdown() + thread.awaitShutdown() } private class Scheduled(val operation: FakeOperation) extends Delayed { -- 2.3.0