diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 75082e6..bfe0cb0 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -600,5 +600,8 @@ private[kafka] class Log(val dir: File, def getLastFlushedTime():Long = { return lastflushedTime.get } + + override def toString() = "Log(" + this.dir + ")" + } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 5b30604..bdfeddb 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -78,7 +78,9 @@ class LogSegment(val messageSet: FileMessageSet, * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. */ def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = { - if(maxSize <= 0) + if(maxSize < 0) + throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) + if(maxSize == 0) return MessageSet.Empty val startPosition = translateOffset(startOffset) @@ -95,6 +97,8 @@ class LogSegment(val messageSet: FileMessageSet, maxSize case Some(offset) => { // there is a max offset, translate it to a file position and use that to calculate the max read size + if(offset < startOffset) + throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset) val endPosition = if(mapping == null) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6a5b4de..c51095f 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -159,4 +159,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ val numReplicaFetchers = props.getInt("replica.fetchers", 1) + /* the frequency with which the highwater mark is saved out to disk */ + val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L) + } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d28dcff..0cdc939 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -68,7 +68,7 @@ class ReplicaManager(val config: KafkaConfig, def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) - kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) + kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs) } /** diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 4a70e81..5a222b8 100644 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -34,7 +34,7 @@ class KafkaScheduler(val numThreads: Int) extends Logging { } private val threadNamesAndIds = new HashMap[String, AtomicInteger]() - def startup = { + def startup() = { executor = new ScheduledThreadPoolExecutor(numThreads) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala index 1e0e92d..5408db8 100644 --- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala +++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala @@ -63,7 +63,7 @@ object TestEndToEndLatency { println(i + "\t" + elapsed / 1000.0 / 1000.0) totalTime += elapsed } - println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms" + println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms") producer.close() connector.shutdown() System.exit(0) diff --git a/core/src/test/scala/other/kafka/TestFileChannelLocking.scala b/core/src/test/scala/other/kafka/TestFileChannelLocking.scala new file mode 100644 index 0000000..b6f018d --- /dev/null +++ b/core/src/test/scala/other/kafka/TestFileChannelLocking.scala @@ -0,0 +1,52 @@ +package kafka + +import java.io._ +import java.nio._ +import java.nio.channels._ + +object TestFileChannelLocking { + + def main(args: Array[String]) { + val fileSize = args(0).toInt + val sleepTime = args(1).toLong + val interval = args(2).toInt + val numFiles = args(3).toInt + val paths = (0 until numFiles).map(_ => File.createTempFile("kafka", ".dat")) + paths.foreach(_.deleteOnExit) + val channels = paths.map{p => + val file = new RandomAccessFile(p, "rw") + file.setLength(fileSize) + file.getChannel() + } + val flushThread = new Thread() { + override def run() { + while(!isInterrupted) { + println("flushing") + val start = System.nanoTime + channels.foreach(_.force(false)) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + println("flush completed in " + ellapsed + " ms") + Thread.sleep(sleepTime) + } + } + } + flushThread.setDaemon(true) + flushThread.start() + var maxTime = -1d + val buffer = ByteBuffer.allocate(8) + buffer.putLong(1.toLong) + for(i <- 0 until (fileSize)) { + buffer.rewind() + val start = System.nanoTime + channels(i % numFiles).write(buffer) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + + maxTime = scala.math.max(maxTime, ellapsed) + if(i % interval == interval - 1) { + println(maxTime) + maxTime = -1L + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/other/kafka/TestFileChannelReadLocking.scala b/core/src/test/scala/other/kafka/TestFileChannelReadLocking.scala new file mode 100644 index 0000000..e36549f --- /dev/null +++ b/core/src/test/scala/other/kafka/TestFileChannelReadLocking.scala @@ -0,0 +1,66 @@ +package kafka + +import java.io._ +import java.nio._ +import java.util.concurrent.atomic._ +import java.util.Random +import scala.math._ + +object TestFileChannelReadLocking { + + def main(args: Array[String]) { + val interval = args(0).toInt + val file = File.createTempFile("kafka", ".dat") + file.deleteOnExit() + val channel = new RandomAccessFile(file, "rw").getChannel + val flushThread = new Thread() { + override def run() { + while(!isInterrupted) { + println("flushing") + val start = System.nanoTime + channel.force(false) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + println("flush completed in " + ellapsed) + Thread.sleep(20000) + } + } + } + flushThread.setDaemon(true) + flushThread.start() + + val size = new AtomicLong(0) + val writeThread = new Thread() { + override def run() { + val bytes = 64 + val buffer = ByteBuffer.allocate(bytes) + for(i <- 0 until bytes) + buffer.put(i.toByte) + while(!isInterrupted) { + buffer.rewind + channel.write(buffer) + } + } + } + writeThread.setDaemon(true) + writeThread.start() + Thread.sleep(500) + + var maxTime = -1d + val buffer = ByteBuffer.allocate(8) + val random = new Random + for(i <- 0 until Int.MaxValue) { + buffer.rewind() + val position = random.nextInt(max(1, size.get.toInt - 8)) + val start = System.nanoTime + channel.read(buffer, position) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + + maxTime = max(maxTime, ellapsed) + if(i % interval == interval - 1) { + println(maxTime) + maxTime = -1L + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index 002fc6d..36d52e7 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -20,16 +20,22 @@ package kafka import java.io._ import java.nio._ import java.nio.channels._ +import scala.math._ import joptsimple._ object TestLinearWriteSpeed { def main(args: Array[String]): Unit = { val parser = new OptionParser + val dirOpt = parser.accepts("dir", "The directory to write to.") + .withRequiredArg + .describedAs("path") + .ofType(classOf[java.lang.String]) + .defaultsTo(System.getProperty("java.io.tmpdir")) val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.") .withRequiredArg .describedAs("num_bytes") - .ofType(classOf[java.lang.Integer]) + .ofType(classOf[java.lang.Long]) val sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.") .withRequiredArg .describedAs("num_bytes") @@ -39,7 +45,18 @@ object TestLinearWriteSpeed { .describedAs("num_files") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - + val reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(1000) + val maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.") + .withRequiredArg + .describedAs("mb") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Integer.MAX_VALUE) + val mmapOpt = parser.accepts("mmap", "Mmap file.") + val options = parser.parse(args : _*) for(arg <- List(bytesOpt, sizeOpt, filesOpt)) { @@ -50,27 +67,84 @@ object TestLinearWriteSpeed { } } - val bytesToWrite = options.valueOf(bytesOpt).intValue + var bytesToWrite = options.valueOf(bytesOpt).longValue + val mmap = options.has(mmapOpt) val bufferSize = options.valueOf(sizeOpt).intValue val numFiles = options.valueOf(filesOpt).intValue + val reportingInterval = options.valueOf(reportingIntervalOpt).longValue + val dir = options.valueOf(dirOpt) + val maxThroughputBytes = options.valueOf(maxThroughputOpt).intValue * 1024L * 1024L val buffer = ByteBuffer.allocate(bufferSize) while(buffer.hasRemaining) buffer.put(123.asInstanceOf[Byte]) - val channels = new Array[FileChannel](numFiles) + val writables = new Array[Writable](numFiles) for(i <- 0 until numFiles) { - val file = File.createTempFile("kafka-test", ".dat") + val file = new File(dir, "kafka-test-" + i + ".dat") file.deleteOnExit() - channels(i) = new RandomAccessFile(file, "rw").getChannel() + val raf = new RandomAccessFile(file, "rw") + raf.setLength(bytesToWrite / numFiles) + if(mmap) + writables(i) = new MmapWritable(raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, raf.length())) + else + writables(i) = new ChannelWritable(raf.getChannel()) } + bytesToWrite = (bytesToWrite / numFiles) * numFiles - val begin = System.currentTimeMillis - for(i <- 0 until bytesToWrite / bufferSize) { + println("%10s\t%10s\t%10s".format("mb_sec", "avg_latency", "max_latency")) + + val beginTest = System.nanoTime + var maxLatency = 0L + var totalLatency = 0L + var count = 0L + var written = 0L + var totalWritten = 0L + var lastReport = beginTest + while(totalWritten + bufferSize < bytesToWrite) { buffer.rewind() - channels(i % numFiles).write(buffer) + val start = System.nanoTime + writables((count % numFiles).toInt.abs).write(buffer) + val ellapsed = System.nanoTime - start + maxLatency = max(ellapsed, maxLatency) + totalLatency += ellapsed + written += bufferSize + count += 1 + totalWritten += bufferSize + if((start - lastReport)/(1000.0*1000.0) > reportingInterval.doubleValue) { + val ellapsedSecs = (start - lastReport) / (1000.0*1000.0*1000.0) + val mb = written / (1024.0*1024.0) + println("%10.3f\t%10.3f\t%10.3f".format(mb / ellapsedSecs, totalLatency / count.toDouble / (1000.0*1000.0), maxLatency / (1000.0 * 1000.0))) + lastReport = start + written = 0 + maxLatency = 0L + totalLatency = 0L + } else if(written > maxThroughputBytes) { + // if we have written enough, just sit out this reporting interval + val lastReportMs = lastReport / (1000*1000) + val now = System.nanoTime / (1000*1000) + val sleepMs = lastReportMs + reportingInterval - now + if(sleepMs > 0) + Thread.sleep(sleepMs) + } + } + val elapsedSecs = (System.nanoTime - beginTest) / (1000.0*1000.0*1000.0) + println(bytesToWrite / (1024.0 * 1024.0 * elapsedSecs) + " MB per sec") + } + + trait Writable { + def write(buffer: ByteBuffer) + } + + class MmapWritable(val buffer: ByteBuffer) extends Writable { + def write(b: ByteBuffer) { + buffer.put(b) + } + } + + class ChannelWritable(val channel: FileChannel) extends Writable { + def write(b: ByteBuffer) { + channel.write(b) } - val elapsedSecs = (System.currentTimeMillis - begin) / 1000.0 - System.out.println(bytesToWrite / (1024 * 1024 * elapsedSecs) + " MB per sec") } } diff --git a/core/src/test/scala/other/kafka/TestMmapLocking.scala b/core/src/test/scala/other/kafka/TestMmapLocking.scala new file mode 100644 index 0000000..264e54f --- /dev/null +++ b/core/src/test/scala/other/kafka/TestMmapLocking.scala @@ -0,0 +1,52 @@ +package kafka + +import java.io._ +import java.nio._ +import java.nio.channels._ +import scala.math._ + +object TestMmapLocking { + + def main(args: Array[String]) { + val fileSize = args(0).toInt + val sleepTime = args(1).toLong + val interval = args(2).toInt + val numFiles = args(3).toInt + val paths = (0 until numFiles).map(_ => File.createTempFile("kafka", ".dat")) + paths.foreach(_.deleteOnExit) + val mmaps = paths.map{p => + val file = new RandomAccessFile(p, "rw") + file.setLength(fileSize) + file.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, fileSize) + } + + val flushThread = new Thread() { + override def run() { + while(!isInterrupted) { + println("flushing") + val start = System.nanoTime + mmaps.foreach(_.force()) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + println("flush completed in " + ellapsed + " ms") + Thread.sleep(sleepTime) + } + } + } + flushThread.setDaemon(true) + flushThread.start() + var maxTime = -1d + var i = 0 + while(true) { + val start = System.nanoTime + mmaps(i.abs % numFiles).putLong(i.toLong) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + maxTime = scala.math.max(maxTime, ellapsed) + if(i % interval == interval - 1) { + println(maxTime) + maxTime = -1L + } + i += 1 + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 7c41310..e00d143 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -41,9 +41,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val port = TestUtils.choosePort val props = TestUtils.createBrokerConfig(0, port) - val config = new KafkaConfig(props) { - override val flushInterval = 1 - } + val config = new KafkaConfig(props) val configs = List(config) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 58961ad..1b12706 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -33,7 +33,7 @@ import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetada class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[Broker] = null override def setUp() { diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 6ac59ce..3f7c029 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -37,7 +37,7 @@ import kafka.utils._ class AsyncProducerTest extends JUnit3Suite { val props = createBrokerConfigs(1) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) override def setUp() { super.setUp() diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 214c21b..ff3b5c4 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -28,9 +28,7 @@ import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils} class HighwatermarkPersistenceTest extends JUnit3Suite { - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { - override val defaultFlushIntervalMs = 100 - }) + val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_)) val topic = "foo" val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime)) diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index cce858f..c67c58f 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -30,7 +30,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 10 override val replicaMinBytes = 20 }) val topic = "new-topic" @@ -155,7 +154,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 10 override val replicaMinBytes = 20 override val logFileSize = 30 }) @@ -200,7 +198,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 1000 override val replicaMinBytes = 20 override val logFileSize = 30 }) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 749c0ae..a5f211c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -28,7 +28,7 @@ import junit.framework.Assert._ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(2) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[KafkaServer] = null val topic1 = "foo" val topic2 = "bar" diff --git a/perf/src/main/scala/kafka/perf/LogPerformance.scala b/perf/src/main/scala/kafka/perf/LogPerformance.scala index d133bf1..99ef27c 100644 --- a/perf/src/main/scala/kafka/perf/LogPerformance.scala +++ b/perf/src/main/scala/kafka/perf/LogPerformance.scala @@ -6,16 +6,26 @@ import java.util.concurrent._ import java.util.concurrent.atomic._ import com.yammer.metrics.stats.UniformSample import scala.collection.JavaConversions._ +import kafka.common._ import kafka.message._ import kafka.log._ import kafka.utils._ import kafka.server.KafkaConfig +/** + * A performance test that interacts directly with the log subsystem without the rest of the broker running + * or any remote communication. + */ object LogPerformance { def main(args: Array[String]) { val config = new LogPerfConfig(args) + if(config.help) { + config.parser.printHelpOn(System.err) + System.exit(0) + } + val dirs = config.dataDirs.map(new File(_)) dirs.map(_.mkdirs()) val props = new Properties @@ -23,14 +33,15 @@ object LogPerformance { props.put("log.dir", config.dataDirs.mkString(",")) props.put("log.flush.interval", config.flushInterval.toString) props.put("log.default.flush.interval.ms", config.flushMs.toString) - - val logManager = new LogManager(new KafkaConfig(props), - new KafkaScheduler(1), - SystemTime, - Long.MaxValue, - 60*1000, - 24*60*60*1000, - false) + props.put("log.index.interval.bytes", config.indexInterval.toString) + props.put("log.async.flush.on.roll", config.asyncFlush.toString) + println("Using async flusher: " + props.get("log.async.flush.on.roll")) + + val scheduler = new KafkaScheduler(1) + scheduler.startup() + val logManager = new LogManager(config = new KafkaConfig(props), + scheduler = scheduler, + time = SystemTime) logManager.startup() // create metrics @@ -48,7 +59,8 @@ object LogPerformance { for(i <- 0 until config.numWriters) yield new WriterThread(topic = "test", numPartitions = config.numPartitions, - codec = NoCompressionCodec, + codec = config.codec, + numMessages = config.numMessages, messageSize = config.messageSize, writeSize = config.writerBatchSize, logManager = logManager, @@ -66,24 +78,40 @@ object LogPerformance { readers.foreach(_.start()) // start reporter + globalMetrics.printHeader() val reporter = new ReporterThread(config.reportingInterval.toLong, snapshotMetrics) + reporter.start() // await completion of writers writers.foreach(_.join) + // stop readers + readers.foreach(_.shutdown()) + readers.foreach(_.join) + + // stop reporter + reporter.interrupt() + // print totals + globalMetrics.printReport() + + // shutdown + scheduler.shutdownNow() logManager.shutdown() } abstract class PerfThread(bytes: Metric, times: Metric) extends Thread { val logManager: LogManager + val running = new AtomicBoolean(true) var started = SystemTime.nanoseconds override def run() { - val start = SystemTime.nanoseconds - val b = work() - times.record(SystemTime.nanoseconds - start) - bytes.record(b) + while(running.get) { + val start = SystemTime.nanoseconds + val b = work() + times.record(SystemTime.nanoseconds - start) + bytes.record(b) + } } def reset() { @@ -91,6 +119,10 @@ object LogPerformance { bytes.reset() } + def shutdown() { + running.set(false) + } + def work(): Long } @@ -99,34 +131,44 @@ object LogPerformance { val readSize: Int, val logManager: LogManager, val metrics: Metrics) extends PerfThread(metrics.readBytes, metrics.readTimes) { - var offset = 0 + var offset = 0L var part = 0 def work(): Long = { - val log = logManager.getOrCreateLog(topic, part) - val messages = log.read(offset, Int.MaxValue, Some(readSize)) - part = (part + 1) % numPartitions - offset += 1 - messages.sizeInBytes + try { + val log = logManager.getOrCreateLog(topic, part) + val messages = log.read(offset, Int.MaxValue, Some(offset + readSize)) + part = (part + 1) % numPartitions + offset += 1 + messages.sizeInBytes + } catch { + case e: OffsetOutOfRangeException => + offset = 0L + 0L + } } } class WriterThread(val topic: String, val numPartitions: Int, val codec: CompressionCodec, + val numMessages: Long, val messageSize: Int, val writeSize: Int, val logManager: LogManager, val metrics: Metrics) extends PerfThread(metrics.writeBytes, metrics.writeTimes) { var part = 0 + var written = 0L val random = new Random() - val messages = new ByteBufferMessageSet(compressionCodec = codec, - messages = (0 until writeSize).map(_ => makeMessage(messageSize)):_*) + val messages = new ByteBufferMessageSet(codec, (0 until writeSize).map(_ => makeMessage(messageSize)):_*) def work(): Long = { val log = logManager.getOrCreateLog(topic, part) log.append(messages, assignOffsets = true) part = (part + 1) % numPartitions + written += writeSize + if(written >= numMessages) + shutdown() messages.sizeInBytes } @@ -157,17 +199,17 @@ object LogPerformance { val writeBytes: Metric, val writeTimes: Metric) { def printHeader() { - print("%20s %20s %20s ".format("mb/sec_read", "avg_read_latency", "99th_read_latency")) - println("%20s %20s %20s".format("mb/sec_written", "avg_write_latency", "99th_write_latency")) + print("%13s %13s %13s ".format("mb/sec_read", "avg_read_ms", "99th_read_ms")) + println("%13s %13s %13s".format("mb/sec_write", "avg_write_ms", "99th_write_ms")) } def printReport() { - println("%20.3f %20.1f %20.1f %20.3f %20.1f %20.1f".format(mbsSec(readBytes), - ns2Secs(readTimes.avg), - ns2Secs(readTimes.quantile(0.99)), + println("%13.3f %13.1f %13.1f %13.3f %13.1f %13.1f".format(mbsSec(readBytes), + ns2ms(readTimes.avg), + ns2ms(readTimes.quantile(0.99)), mbsSec(writeBytes), - ns2Secs(writeTimes.avg), - ns2Secs(writeTimes.quantile(0.99)))) + ns2ms(writeTimes.avg), + ns2ms(writeTimes.quantile(0.99)))) } def reset() { @@ -177,9 +219,9 @@ object LogPerformance { writeTimes.reset() } - private def ns2Secs(n: Double) = n / (1000.0*1000.0*1000.0) + private def ns2ms(n: Double) = n / (1000.0*1000.0) - private def mbsSec(metric: Metric) = metric.rate(TimeUnit.SECONDS) / (1024*1024*1024.0) + private def mbsSec(metric: Metric) = metric.rate(TimeUnit.SECONDS) / (1024*1024.0) } class Metric(samples: Int = 4096, val parent: Metric = null) { @@ -211,7 +253,7 @@ object LogPerformance { } } - def ellapsed(unit: TimeUnit) = this synchronized {SystemTime.nanoseconds - start} + def ellapsed(unit: TimeUnit) = unit.convert(SystemTime.nanoseconds - this.synchronized(start), TimeUnit.NANOSECONDS) def rate(unit: TimeUnit) = this synchronized {_total.toDouble / ellapsed(unit)} @@ -237,52 +279,61 @@ object LogPerformance { val numPartitionsOpt = parser.accepts("partitions", "The number of partitions.") .withRequiredArg .describedAs("num_partitions") - .ofType(classOf[Integer]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(1) val readerBatchSizeOpt = parser.accepts("reader-batch-size", "The number of messages to write at once.") .withRequiredArg .describedAs("num_messages") - .ofType(classOf[Integer]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(200) val writerBatchSizeOpt = parser.accepts("writer-batch-size", "The number of messages to write at once.") .withRequiredArg .describedAs("num_messages") - .ofType(classOf[Integer]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(200) val numReadersOpt = parser.accepts("readers", "The number of reader threads.") .withRequiredArg .describedAs("num_threads") - .ofType(classOf[Integer]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(1) val numWritersOpt = parser.accepts("writers", "The number of writer threads.") .withRequiredArg .describedAs("num_threads") - .ofType(classOf[Integer]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(1) val flushIntervalOpt = parser.accepts("flush-interval", "The number of messages in a partition between flushes.") .withRequiredArg .describedAs("num_messages") - .ofType(classOf[Integer]) + .ofType(classOf[java.lang.Integer]) .defaultsTo(Int.MaxValue) val flushMsOpt = parser.accepts("flush-time", "The time between flushes.") .withRequiredArg .describedAs("ms") - .ofType(classOf[Long]) - .defaultsTo(Long.MaxValue) + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Int.MaxValue) + val indexIntervalOpt = parser.accepts("index-interval", "The number of bytes in between index entries.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(4096) + val asyncFlushOpt = parser.accepts("async-flush", "Use async flusher.") val options = parser.parse(args : _*) var dataDirs = options.valuesOf(dataDirsOpt) - val numMessages = options.valueOf(numMessagesOpt).longValue - val messageSize = options.valueOf(messageSizeOpt) - val numPartitions = options.valueOf(numPartitionsOpt) - val readerBatchSize = options.valueOf(readerBatchSizeOpt) - val writerBatchSize = options.valueOf(writerBatchSizeOpt) - val numReaders = options.valueOf(numReadersOpt) - val numWriters = options.valueOf(numWritersOpt) - val reportingInterval = options.valueOf(reportingIntervalOpt) - val flushInterval = options.valueOf(flushIntervalOpt) - val flushMs = options.valueOf(flushMsOpt) + val numMessages = options.valueOf(numMessagesOpt).intValue + val messageSize = options.valueOf(messageSizeOpt).intValue + val numPartitions = options.valueOf(numPartitionsOpt).intValue + val readerBatchSize = options.valueOf(readerBatchSizeOpt).intValue + val writerBatchSize = options.valueOf(writerBatchSizeOpt).intValue + val numReaders = options.valueOf(numReadersOpt).intValue + val numWriters = options.valueOf(numWritersOpt).intValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val flushInterval = options.valueOf(flushIntervalOpt).intValue + val flushMs = options.valueOf(flushMsOpt).intValue + val codec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) + val indexInterval = options.valueOf(indexIntervalOpt).intValue + val asyncFlush = options.has(asyncFlushOpt) + val help = options.has(helpOpt) } - } \ No newline at end of file diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala index 711f65c..5dabeb5 100644 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ b/perf/src/main/scala/kafka/perf/PerfConfig.scala @@ -51,9 +51,10 @@ class PerfConfig(args: Array[String]) { .describedAs("size") .ofType(classOf[java.lang.Integer]) .defaultsTo(200) - val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed") + val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") .withRequiredArg .describedAs("compression codec ") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) + val helpOpt = parser.accepts("help", "Print usage.") } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index ff76547..81ea19c 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -93,18 +93,8 @@ object ProducerPerformance extends Logging { .withRequiredArg() .ofType(classOf[java.lang.Integer]) .defaultsTo(-1) - val messageSizeOpt = parser.accepts("message-size", "The size of each message.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.") - .withRequiredArg - .describedAs("batch size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") .withRequiredArg .describedAs("number of threads") @@ -167,10 +157,10 @@ object ProducerPerformance extends Logging { var isSync = options.has(syncOpt) var batchSize = options.valueOf(batchSizeOpt).intValue var numThreads = options.valueOf(numThreadsOpt).intValue - val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) val seqIdMode = options.has(initialMessageIdOpt) var initialMessageId: Int = 0 - if (seqIdMode) + if(seqIdMode) initialMessageId = options.valueOf(initialMessageIdOpt).intValue() val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()