diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 75082e6..bfe0cb0 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -600,5 +600,8 @@ private[kafka] class Log(val dir: File, def getLastFlushedTime():Long = { return lastflushedTime.get } + + override def toString() = "Log(" + this.dir + ")" + } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 5b30604..bdfeddb 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -78,7 +78,9 @@ class LogSegment(val messageSet: FileMessageSet, * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. */ def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = { - if(maxSize <= 0) + if(maxSize < 0) + throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) + if(maxSize == 0) return MessageSet.Empty val startPosition = translateOffset(startOffset) @@ -95,6 +97,8 @@ class LogSegment(val messageSet: FileMessageSet, maxSize case Some(offset) => { // there is a max offset, translate it to a file position and use that to calculate the max read size + if(offset < startOffset) + throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset) val endPosition = if(mapping == null) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6a5b4de..c51095f 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -159,4 +159,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ val numReplicaFetchers = props.getInt("replica.fetchers", 1) + /* the frequency with which the highwater mark is saved out to disk */ + val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L) + } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d28dcff..0cdc939 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -68,7 +68,7 @@ class ReplicaManager(val config: KafkaConfig, def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) - kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) + kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs) } /** diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 4a70e81..5a222b8 100644 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -34,7 +34,7 @@ class KafkaScheduler(val numThreads: Int) extends Logging { } private val threadNamesAndIds = new HashMap[String, AtomicInteger]() - def startup = { + def startup() = { executor = new ScheduledThreadPoolExecutor(numThreads) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala index 1e0e92d..5408db8 100644 --- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala +++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala @@ -63,7 +63,7 @@ object TestEndToEndLatency { println(i + "\t" + elapsed / 1000.0 / 1000.0) totalTime += elapsed } - println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms" + println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms") producer.close() connector.shutdown() System.exit(0) diff --git a/core/src/test/scala/other/kafka/TestFileChannelLocking.scala b/core/src/test/scala/other/kafka/TestFileChannelLocking.scala new file mode 100644 index 0000000..b6f018d --- /dev/null +++ b/core/src/test/scala/other/kafka/TestFileChannelLocking.scala @@ -0,0 +1,52 @@ +package kafka + +import java.io._ +import java.nio._ +import java.nio.channels._ + +object TestFileChannelLocking { + + def main(args: Array[String]) { + val fileSize = args(0).toInt + val sleepTime = args(1).toLong + val interval = args(2).toInt + val numFiles = args(3).toInt + val paths = (0 until numFiles).map(_ => File.createTempFile("kafka", ".dat")) + paths.foreach(_.deleteOnExit) + val channels = paths.map{p => + val file = new RandomAccessFile(p, "rw") + file.setLength(fileSize) + file.getChannel() + } + val flushThread = new Thread() { + override def run() { + while(!isInterrupted) { + println("flushing") + val start = System.nanoTime + channels.foreach(_.force(false)) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + println("flush completed in " + ellapsed + " ms") + Thread.sleep(sleepTime) + } + } + } + flushThread.setDaemon(true) + flushThread.start() + var maxTime = -1d + val buffer = ByteBuffer.allocate(8) + buffer.putLong(1.toLong) + for(i <- 0 until (fileSize)) { + buffer.rewind() + val start = System.nanoTime + channels(i % numFiles).write(buffer) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + + maxTime = scala.math.max(maxTime, ellapsed) + if(i % interval == interval - 1) { + println(maxTime) + maxTime = -1L + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/other/kafka/TestFileChannelReadLocking.scala b/core/src/test/scala/other/kafka/TestFileChannelReadLocking.scala new file mode 100644 index 0000000..e36549f --- /dev/null +++ b/core/src/test/scala/other/kafka/TestFileChannelReadLocking.scala @@ -0,0 +1,66 @@ +package kafka + +import java.io._ +import java.nio._ +import java.util.concurrent.atomic._ +import java.util.Random +import scala.math._ + +object TestFileChannelReadLocking { + + def main(args: Array[String]) { + val interval = args(0).toInt + val file = File.createTempFile("kafka", ".dat") + file.deleteOnExit() + val channel = new RandomAccessFile(file, "rw").getChannel + val flushThread = new Thread() { + override def run() { + while(!isInterrupted) { + println("flushing") + val start = System.nanoTime + channel.force(false) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + println("flush completed in " + ellapsed) + Thread.sleep(20000) + } + } + } + flushThread.setDaemon(true) + flushThread.start() + + val size = new AtomicLong(0) + val writeThread = new Thread() { + override def run() { + val bytes = 64 + val buffer = ByteBuffer.allocate(bytes) + for(i <- 0 until bytes) + buffer.put(i.toByte) + while(!isInterrupted) { + buffer.rewind + channel.write(buffer) + } + } + } + writeThread.setDaemon(true) + writeThread.start() + Thread.sleep(500) + + var maxTime = -1d + val buffer = ByteBuffer.allocate(8) + val random = new Random + for(i <- 0 until Int.MaxValue) { + buffer.rewind() + val position = random.nextInt(max(1, size.get.toInt - 8)) + val start = System.nanoTime + channel.read(buffer, position) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + + maxTime = max(maxTime, ellapsed) + if(i % interval == interval - 1) { + println(maxTime) + maxTime = -1L + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 002fc6d..36d52e7 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -20,16 +20,22 @@ package kafka import java.io._ import java.nio._ import java.nio.channels._ +import scala.math._ import joptsimple._ object TestLinearWriteSpeed { def main(args: Array[String]): Unit = { val parser = new OptionParser + val dirOpt = parser.accepts("dir", "The directory to write to.") + .withRequiredArg + .describedAs("path") + .ofType(classOf[java.lang.String]) + .defaultsTo(System.getProperty("java.io.tmpdir")) val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.") .withRequiredArg .describedAs("num_bytes") - .ofType(classOf[java.lang.Integer]) + .ofType(classOf[java.lang.Long]) val sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.") .withRequiredArg .describedAs("num_bytes") @@ -39,7 +45,18 @@ object TestLinearWriteSpeed { .describedAs("num_files") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - + val reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(1000) + val maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.") + .withRequiredArg + .describedAs("mb") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Integer.MAX_VALUE) + val mmapOpt = parser.accepts("mmap", "Mmap file.") + val options = parser.parse(args : _*) for(arg <- List(bytesOpt, sizeOpt, filesOpt)) { @@ -50,27 +67,84 @@ object TestLinearWriteSpeed { } } - val bytesToWrite = options.valueOf(bytesOpt).intValue + var bytesToWrite = options.valueOf(bytesOpt).longValue + val mmap = options.has(mmapOpt) val bufferSize = options.valueOf(sizeOpt).intValue val numFiles = options.valueOf(filesOpt).intValue + val reportingInterval = options.valueOf(reportingIntervalOpt).longValue + val dir = options.valueOf(dirOpt) + val maxThroughputBytes = options.valueOf(maxThroughputOpt).intValue * 1024L * 1024L val buffer = ByteBuffer.allocate(bufferSize) while(buffer.hasRemaining) buffer.put(123.asInstanceOf[Byte]) - val channels = new Array[FileChannel](numFiles) + val writables = new Array[Writable](numFiles) for(i <- 0 until numFiles) { - val file = File.createTempFile("kafka-test", ".dat") + val file = new File(dir, "kafka-test-" + i + ".dat") file.deleteOnExit() - channels(i) = new RandomAccessFile(file, "rw").getChannel() + val raf = new RandomAccessFile(file, "rw") + raf.setLength(bytesToWrite / numFiles) + if(mmap) + writables(i) = new MmapWritable(raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length())) + else + writables(i) = new ChannelWritable(raf.getChannel()) } + bytesToWrite = (bytesToWrite / numFiles) * numFiles - val begin = System.currentTimeMillis - for(i <- 0 until bytesToWrite / bufferSize) { + println("%10s\t%10s\t%10s".format("mb_sec", "avg_latency", "max_latency")) + + val beginTest = System.nanoTime + var maxLatency = 0L + var totalLatency = 0L + var count = 0L + var written = 0L + var totalWritten = 0L + var lastReport = beginTest + while(totalWritten + bufferSize < bytesToWrite) { buffer.rewind() - channels(i % numFiles).write(buffer) + val start = System.nanoTime + writables((count % numFiles).toInt.abs).write(buffer) + val ellapsed = System.nanoTime - start + maxLatency = max(ellapsed, maxLatency) + totalLatency += ellapsed + written += bufferSize + count += 1 + totalWritten += bufferSize + if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) { + val ellapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0) + val mb = written / (1024.0*1024.0) + println("%10.3f\t%10.3f\t%10.3f".format(mb / ellapsedSecs, totalLatency / count.toDouble / (1000.0*1000.0), maxLatency / (1000.0 * 1000.0))) + lastReport = start + written = 0 + maxLatency = 0L + totalLatency = 0L + } else if(written > maxThroughputBytes) { + // if we have written enough, just sit out this reporting interval + val lastReportMs = lastReport / (1000*1000) + val now = System.nanoTime / (1000*1000) + val sleepMs = lastReportMs + reportingInterval - now + if(sleepMs > 0) + Thread.sleep(sleepMs) + } + } + val elapsedSecs = (System.nanoTime - beginTest) / (1000.0*1000.0*1000.0) + println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec") + } + + trait Writable { + def write(buffer: ByteBuffer) + } + + class MmapWritable(val buffer: ByteBuffer) extends Writable { + def write(b: ByteBuffer) { + buffer.put(b) + } + } + + class ChannelWritable(val channel: FileChannel) extends Writable { + def write(b: ByteBuffer) { + channel.write(b) } - val elapsedSecs = (System.currentTimeMillis - begin) / 1000.0 - System.out.println(bytesToWrite / (1024 * 1024 * elapsedSecs) + " MB per sec") } } diff --git a/core/src/test/scala/other/kafka/TestMmapLocking.scala b/core/src/test/scala/other/kafka/TestMmapLocking.scala new file mode 100644 index 0000000..264e54f --- /dev/null +++ b/core/src/test/scala/other/kafka/TestMmapLocking.scala @@ -0,0 +1,52 @@ +package kafka + +import java.io._ +import java.nio._ +import java.nio.channels._ +import scala.math._ + +object TestMmapLocking { + + def main(args: Array[String]) { + val fileSize = args(0).toInt + val sleepTime = args(1).toLong + val interval = args(2).toInt + val numFiles = args(3).toInt + val paths = (0 until numFiles).map(_ => File.createTempFile("kafka", ".dat")) + paths.foreach(_.deleteOnExit) + val mmaps = paths.map{p => + val file = new RandomAccessFile(p, "rw") + file.setLength(fileSize) + file.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, fileSize) + } + + val flushThread = new Thread() { + override def run() { + while(!isInterrupted) { + println("flushing") + val start = System.nanoTime + mmaps.foreach(_.force()) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + println("flush completed in " + ellapsed + " ms") + Thread.sleep(sleepTime) + } + } + } + flushThread.setDaemon(true) + flushThread.start() + var maxTime = -1d + var i = 0 + while(true) { + val start = System.nanoTime + mmaps(i.abs % numFiles).putLong(i.toLong) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + maxTime = scala.math.max(maxTime, ellapsed) + if(i % interval == interval - 1) { + println(maxTime) + maxTime = -1L + } + i += 1 + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 7c41310..e00d143 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -41,9 +41,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val port = TestUtils.choosePort val props = TestUtils.createBrokerConfig(0, port) - val config = new KafkaConfig(props) { - override val flushInterval = 1 - } + val config = new KafkaConfig(props) val configs = List(config) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 58961ad..1b12706 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -33,7 +33,7 @@ import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetada class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[Broker] = null override def setUp() { diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 6ac59ce..3f7c029 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -37,7 +37,7 @@ import kafka.utils._ class AsyncProducerTest extends JUnit3Suite { val props = createBrokerConfigs(1) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) override def setUp() { super.setUp() diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 214c21b..ff3b5c4 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -28,9 +28,7 @@ import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils} class HighwatermarkPersistenceTest extends JUnit3Suite { - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { - override val defaultFlushIntervalMs = 100 - }) + val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_)) val topic = "foo" val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime)) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index cce858f..c67c58f 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -30,7 +30,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 10 override val replicaMinBytes = 20 }) val topic = "new-topic" @@ -155,7 +154,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 10 override val replicaMinBytes = 20 override val logFileSize = 30 }) @@ -200,7 +198,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 1000 override val replicaMinBytes = 20 override val logFileSize = 30 }) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 749c0ae..a5f211c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -28,7 +28,7 @@ import junit.framework.Assert._ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(2) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[KafkaServer] = null val topic1 = "foo" val topic2 = "bar" diff --git a/perf/src/main/scala/kafka/perf/LogPerformance.scala b/perf/src/main/scala/kafka/perf/LogPerformance.scala new file mode 100644 index 0000000..99ef27c --- /dev/null +++ b/perf/src/main/scala/kafka/perf/LogPerformance.scala @@ -0,0 +1,339 @@ +package kafka.perf + +import java.io.File +import java.util.{Random, Properties} +import java.util.concurrent._ +import java.util.concurrent.atomic._ +import com.yammer.metrics.stats.UniformSample +import scala.collection.JavaConversions._ +import kafka.common._ +import kafka.message._ +import kafka.log._ +import kafka.utils._ +import kafka.server.KafkaConfig + +/** + * A performance test that interacts directly with the log subsystem without the rest of the broker running + * or any remote communication. + */ +object LogPerformance { + + def main(args: Array[String]) { + val config = new LogPerfConfig(args) + + if(config.help) { + config.parser.printHelpOn(System.err) + System.exit(0) + } + + val dirs = config.dataDirs.map(new File(_)) + dirs.map(_.mkdirs()) + val props = new Properties + props.put("brokerid", 0.toString) + props.put("log.dir", config.dataDirs.mkString(",")) + props.put("log.flush.interval", config.flushInterval.toString) + props.put("log.default.flush.interval.ms", config.flushMs.toString) + props.put("log.index.interval.bytes", config.indexInterval.toString) + props.put("log.async.flush.on.roll", config.asyncFlush.toString) + println("Using async flusher: " + props.get("log.async.flush.on.roll")) + + val scheduler = new KafkaScheduler(1) + scheduler.startup() + val logManager = new LogManager(config = new KafkaConfig(props), + scheduler = scheduler, + time = SystemTime) + logManager.startup() + + // create metrics + val globalMetrics = new Metrics(readBytes = new Metric(4096), + readTimes = new Metric(4096), + writeBytes = new Metric(4096), + writeTimes = new Metric(4096)) + val snapshotMetrics = new Metrics(readBytes = new Metric(4096, globalMetrics.readBytes), + readTimes = new Metric(4096, globalMetrics.readTimes), + writeBytes = new Metric(4096, globalMetrics.writeBytes), + writeTimes = new Metric(4096, globalMetrics.writeTimes)) + + // start writers + val writers = + for(i <- 0 until config.numWriters) + yield new WriterThread(topic = "test", + numPartitions = config.numPartitions, + codec = config.codec, + numMessages = config.numMessages, + messageSize = config.messageSize, + writeSize = config.writerBatchSize, + logManager = logManager, + metrics = snapshotMetrics) + writers.foreach(_.start()) + + // start readers + val readers = + for(i <- 0 until config.numReaders) + yield new ReaderThread(topic = "test", + numPartitions = config.numPartitions, + readSize = config.readerBatchSize, + logManager = logManager, + metrics = snapshotMetrics) + readers.foreach(_.start()) + + // start reporter + globalMetrics.printHeader() + val reporter = new ReporterThread(config.reportingInterval.toLong, snapshotMetrics) + reporter.start() + + // await completion of writers + writers.foreach(_.join) + + // stop readers + readers.foreach(_.shutdown()) + readers.foreach(_.join) + + // stop reporter + reporter.interrupt() + + // print totals + globalMetrics.printReport() + + // shutdown + scheduler.shutdownNow() + logManager.shutdown() + } + + abstract class PerfThread(bytes: Metric, times: Metric) extends Thread { + val logManager: LogManager + val running = new AtomicBoolean(true) + var started = SystemTime.nanoseconds + + override def run() { + while(running.get) { + val start = SystemTime.nanoseconds + val b = work() + times.record(SystemTime.nanoseconds - start) + bytes.record(b) + } + } + + def reset() { + times.reset() + bytes.reset() + } + + def shutdown() { + running.set(false) + } + + def work(): Long + } + + class ReaderThread(val topic: String, + val numPartitions: Int, + val readSize: Int, + val logManager: LogManager, + val metrics: Metrics) extends PerfThread(metrics.readBytes, metrics.readTimes) { + var offset = 0L + var part = 0 + + def work(): Long = { + try { + val log = logManager.getOrCreateLog(topic, part) + val messages = log.read(offset, Int.MaxValue, Some(offset + readSize)) + part = (part + 1) % numPartitions + offset += 1 + messages.sizeInBytes + } catch { + case e: OffsetOutOfRangeException => + offset = 0L + 0L + } + } + } + + class WriterThread(val topic: String, + val numPartitions: Int, + val codec: CompressionCodec, + val numMessages: Long, + val messageSize: Int, + val writeSize: Int, + val logManager: LogManager, + val metrics: Metrics) extends PerfThread(metrics.writeBytes, metrics.writeTimes) { + var part = 0 + var written = 0L + val random = new Random() + val messages = new ByteBufferMessageSet(codec, (0 until writeSize).map(_ => makeMessage(messageSize)):_*) + + def work(): Long = { + val log = logManager.getOrCreateLog(topic, part) + log.append(messages, assignOffsets = true) + part = (part + 1) % numPartitions + written += writeSize + if(written >= numMessages) + shutdown() + messages.sizeInBytes + } + + private def makeMessage(size: Int): Message = { + val bytes = new Array[Byte](size) + random.nextBytes(bytes) + new Message(bytes) + } + } + + class ReporterThread(val sleepMs: Long, + val metrics: Metrics) extends Thread { + override def run() { + try { + while(!Thread.interrupted()) { + Thread.sleep(sleepMs) + metrics.printReport() + metrics.reset() + } + } catch { + case e: InterruptedException => + } + } + } + + class Metrics(val readBytes: Metric, + val readTimes: Metric, + val writeBytes: Metric, + val writeTimes: Metric) { + def printHeader() { + print("%13s %13s %13s ".format("mb/sec_read", "avg_read_ms", "99th_read_ms")) + println("%13s %13s %13s".format("mb/sec_write", "avg_write_ms", "99th_write_ms")) + } + + def printReport() { + println("%13.3f %13.1f %13.1f %13.3f %13.1f %13.1f".format(mbsSec(readBytes), + ns2ms(readTimes.avg), + ns2ms(readTimes.quantile(0.99)), + mbsSec(writeBytes), + ns2ms(writeTimes.avg), + ns2ms(writeTimes.quantile(0.99)))) + } + + def reset() { + readBytes.reset() + readTimes.reset() + writeBytes.reset() + writeTimes.reset() + } + + private def ns2ms(n: Double) = n / (1000.0*1000.0) + + private def mbsSec(metric: Metric) = metric.rate(TimeUnit.SECONDS) / (1024*1024.0) + } + + class Metric(samples: Int = 4096, val parent: Metric = null) { + private var start = SystemTime.nanoseconds + private val sampler = new UniformSample(samples) + private var _max = Long.MinValue + private var _total = 0L + private var _count = 0L + + def record(value: Long) { + this.synchronized { + this._total += value + this._count += 1 + this.sampler.update(value) + if(value > _max) + this._max = value + } + if(parent != null) + parent.record(value) + } + + def reset() { + this synchronized { + this.start = SystemTime.nanoseconds + this._total = 0L + this._count = 0L + this._max = Long.MinValue + this.sampler.clear() + } + } + + def ellapsed(unit: TimeUnit) = unit.convert(SystemTime.nanoseconds - this.synchronized(start), TimeUnit.NANOSECONDS) + + def rate(unit: TimeUnit) = this synchronized {_total.toDouble / ellapsed(unit)} + + def avg = this synchronized {_total/_count.toDouble} + + def count = this synchronized _count + + def total = this synchronized _total + + def max = this synchronized _max + + def quantile(q: Double) = this synchronized {this.sampler.getSnapshot().getValue(q)} + + } + + class LogPerfConfig(args: Array[String]) extends PerfConfig(args) { + val dataDirsOpt = parser.accepts("dir", "The log directory.") + .withRequiredArg + .describedAs("path") + .ofType(classOf[String]) + .defaultsTo(new File(System.getProperty("java.io.tmpdir"), + "kafka-" + new Random().nextInt(10000000)).getAbsolutePath) + val numPartitionsOpt = parser.accepts("partitions", "The number of partitions.") + .withRequiredArg + .describedAs("num_partitions") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val readerBatchSizeOpt = parser.accepts("reader-batch-size", "The number of messages to write at once.") + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val writerBatchSizeOpt = parser.accepts("writer-batch-size", "The number of messages to write at once.") + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val numReadersOpt = parser.accepts("readers", "The number of reader threads.") + .withRequiredArg + .describedAs("num_threads") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val numWritersOpt = parser.accepts("writers", "The number of writer threads.") + .withRequiredArg + .describedAs("num_threads") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val flushIntervalOpt = parser.accepts("flush-interval", "The number of messages in a partition between flushes.") + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Int.MaxValue) + val flushMsOpt = parser.accepts("flush-time", "The time between flushes.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Int.MaxValue) + val indexIntervalOpt = parser.accepts("index-interval", "The number of bytes in between index entries.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(4096) + val asyncFlushOpt = parser.accepts("async-flush", "Use async flusher.") + + val options = parser.parse(args : _*) + + var dataDirs = options.valuesOf(dataDirsOpt) + val numMessages = options.valueOf(numMessagesOpt).intValue + val messageSize = options.valueOf(messageSizeOpt).intValue + val numPartitions = options.valueOf(numPartitionsOpt).intValue + val readerBatchSize = options.valueOf(readerBatchSizeOpt).intValue + val writerBatchSize = options.valueOf(writerBatchSizeOpt).intValue + val numReaders = options.valueOf(numReadersOpt).intValue + val numWriters = options.valueOf(numWritersOpt).intValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val flushInterval = options.valueOf(flushIntervalOpt).intValue + val flushMs = options.valueOf(flushMsOpt).intValue + val codec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) + val indexInterval = options.valueOf(indexIntervalOpt).intValue + val asyncFlush = options.has(asyncFlushOpt) + val help = options.has(helpOpt) + } +} \ No newline at end of file diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala index 952df97..5dabeb5 100644 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ b/perf/src/main/scala/kafka/perf/PerfConfig.scala @@ -41,4 +41,20 @@ class PerfConfig(args: Array[String]) { val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + "interval as configured by reporting-interval") val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") + val messageSizeOpt = parser.accepts("message-size", "The size of each message.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") + .withRequiredArg + .describedAs("compression codec ") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val helpOpt = parser.accepts("help", "Print usage.") } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index c343d84..81ea19c 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -93,18 +93,8 @@ object ProducerPerformance extends Logging { .withRequiredArg() .ofType(classOf[java.lang.Integer]) .defaultsTo(-1) - val messageSizeOpt = parser.accepts("message-size", "The size of each message.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.") - .withRequiredArg - .describedAs("batch size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") .withRequiredArg .describedAs("number of threads") @@ -126,6 +116,20 @@ object ProducerPerformance extends Logging { .describedAs("message send time gap") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) + val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3000) + val produceRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + + "to complete") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) + val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + "set, the csv metrics will be outputed here") @@ -153,10 +157,10 @@ object ProducerPerformance extends Logging { var isSync = options.has(syncOpt) var batchSize = options.valueOf(batchSizeOpt).intValue var numThreads = options.valueOf(numThreadsOpt).intValue - val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) val seqIdMode = options.has(initialMessageIdOpt) var initialMessageId: Int = 0 - if (seqIdMode) + if(seqIdMode) initialMessageId = options.valueOf(initialMessageIdOpt).intValue() val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()