diff --git a/config/server.properties b/config/server.properties index 9ca0f8d..375fb9a 100644 --- a/config/server.properties +++ b/config/server.properties @@ -70,10 +70,10 @@ num.partitions=1 # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk -log.flush.interval=10000 +log.flush.interval=100000000 # The maximum amount of time a message can sit in a log before we force a flush -log.default.flush.interval.ms=1000 +log.default.flush.interval.ms=1000000 # Per-topic overrides for log.default.flush.interval.ms #topic.flush.intervals.ms=topic1:1000, topic2:3000 diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 75082e6..dd6ddc2 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -101,7 +101,7 @@ object Log { private[kafka] class Log(val dir: File, val maxLogFileSize: Int, val maxMessageSize: Int, - val flushInterval: Int = Int.MaxValue, + val flushInterval: Long = Long.MaxValue, val rollIntervalMs: Long = Long.MaxValue, val needsRecovery: Boolean, val maxIndexSize: Int = (10*1024*1024), @@ -476,6 +476,7 @@ private[kafka] class Log(val dir: File, segments.view.last.flush() unflushed.set(0) lastflushedTime.set(time.milliseconds) + debug("Flush complete") } } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 5b30604..bdfeddb 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -78,7 +78,9 @@ class LogSegment(val messageSet: FileMessageSet, * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified. */ def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = { - if(maxSize <= 0) + if(maxSize < 0) + throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) + if(maxSize == 0) return MessageSet.Empty val startPosition = translateOffset(startOffset) @@ -95,6 +97,8 @@ class LogSegment(val messageSet: FileMessageSet, maxSize case Some(offset) => { // there is a max offset, translate it to a file position and use that to calculate the max read size + if(offset < startOffset) + throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset) val endPosition = if(mapping == null) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e25cf81..2edfc2b 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -110,16 +110,16 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue)) /* the number of messages accumulated on a log partition before messages are flushed to disk */ - val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue)) + val flushInterval = props.getLongInRange("log.flush.interval", Long.MaxValue, (1, Long.MaxValue)) /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ - val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt) + val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toLong > 0).mapValues(_.toLong) /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */ - val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000) + val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 5000) /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */ - val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate) + val defaultFlushIntervalMs = props.getLong("log.default.flush.interval.ms", Long.MaxValue) /* enable auto creation of topic on the server */ val autoCreateTopics = props.getBoolean("auto.create.topics", true) @@ -135,8 +135,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* default replication factors for automatically created topics */ val defaultReplicationFactor = props.getInt("default.replication.factor", 1) + /* the maximum amount of time a replica can lag behind and still be considered "in sync" */ val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000) + /* the maximum number of bytes behind the leader a replica can be without being considered behind */ val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000) /* the socket timeout for network requests */ @@ -158,4 +160,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ val numReplicaFetchers = props.getInt("replica.fetchers", 1) + /* the frequency with which the highwater mark is saved out to disk */ + val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L) + } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8461dbe..4291579 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -69,7 +69,7 @@ class ReplicaManager(val config: KafkaConfig, def startHighWaterMarksCheckPointThread() = { if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) - kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) + kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs) } /** diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 4a70e81..5a222b8 100644 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -34,7 +34,7 @@ class KafkaScheduler(val numThreads: Int) extends Logging { } private val threadNamesAndIds = new HashMap[String, AtomicInteger]() - def startup = { + def startup() = { executor = new ScheduledThreadPoolExecutor(numThreads) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala index 1e0e92d..5408db8 100644 --- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala +++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala @@ -63,7 +63,7 @@ object TestEndToEndLatency { println(i + "\t" + elapsed / 1000.0 / 1000.0) totalTime += elapsed } - println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0)) + "ms" + println("Avg latency: " + (totalTime / numMessages / 1000.0 / 1000.0) + "ms") producer.close() connector.shutdown() System.exit(0) diff --git a/core/src/test/scala/other/kafka/TestFileChannelLocking.scala b/core/src/test/scala/other/kafka/TestFileChannelLocking.scala new file mode 100644 index 0000000..d8a4b8d --- /dev/null +++ b/core/src/test/scala/other/kafka/TestFileChannelLocking.scala @@ -0,0 +1,49 @@ +package kafka + +import java.io._ +import java.nio._ +import java.nio.channels._ + +object TestFileChannelLocking { + + def main(args: Array[String]) { + val fileSize = args(0).toInt + val sleepTime = args(1).toLong + val interval = args(2).toInt + val numFiles = args(3).toInt + val paths = (0 until numFiles).map(_ => File.createTempFile("kafka", ".dat")) + paths.foreach(_.deleteOnExit) + val channels = paths.map{p => + val file = new RandomAccessFile(p, "rw") + file.setLength(fileSize) + file.getChannel() + } + val flushThread = new Thread() { + override def run() { + while(!isInterrupted) { + println("flushing") + channels.foreach(_.force(false)) + Thread.sleep(sleepTime) + } + } + } + flushThread.setDaemon(true) + flushThread.start() + var maxTime = -1d + val buffer = ByteBuffer.allocate(8) + buffer.putLong(1.toLong) + for(i <- 0 until (fileSize)) { + buffer.rewind() + val start = System.nanoTime + channels(i % numFiles).write(buffer) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + + maxTime = scala.math.max(maxTime, ellapsed) + if(i % interval == interval - 1) { + println(maxTime) + maxTime = -1L + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/other/kafka/TestMmapLocking.scala b/core/src/test/scala/other/kafka/TestMmapLocking.scala new file mode 100644 index 0000000..799b75b --- /dev/null +++ b/core/src/test/scala/other/kafka/TestMmapLocking.scala @@ -0,0 +1,47 @@ +package kafka + +import java.io._ +import java.nio._ +import java.nio.channels._ + +object TestMmapLocking { + + def main(args: Array[String]) { + val fileSize = args(0).toInt + val sleepTime = args(1).toLong + val interval = args(2).toInt + val numFiles = args(3).toInt + val paths = (0 until numFiles).map(_ => File.createTempFile("kafka", ".dat")) + paths.foreach(_.deleteOnExit) + val mmaps = paths.map{p => + val file = new RandomAccessFile(p, "rw") + file.setLength(fileSize) + file.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, fileSize) + } + + val flushThread = new Thread() { + override def run() { + while(!isInterrupted) { + println("flushing") + mmaps.foreach(_.force()) + println("flush done") + Thread.sleep(sleepTime) + } + } + } + flushThread.setDaemon(true) + flushThread.start() + var maxTime = -1d + for(i <- 0 until fileSize/8) { + val start = System.nanoTime + mmaps(i % numFiles).putLong(i.toLong) + val ellapsed = (System.nanoTime - start)/(1000.0*1000.0) + maxTime = scala.math.max(maxTime, ellapsed) + if(i % interval == interval - 1) { + println(maxTime) + maxTime = -1L + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 7c41310..e00d143 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -41,9 +41,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val port = TestUtils.choosePort val props = TestUtils.createBrokerConfig(0, port) - val config = new KafkaConfig(props) { - override val flushInterval = 1 - } + val config = new KafkaConfig(props) val configs = List(config) val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 58961ad..1b12706 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -33,7 +33,7 @@ import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetada class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[Broker] = null override def setUp() { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index ab6ef43..7216bc4 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -41,7 +41,6 @@ class LogManagerTest extends JUnit3Suite { super.setUp() config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) { override val logFileSize = 1024 - override val flushInterval = 100 } scheduler.startup logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) @@ -114,7 +113,6 @@ class LogManagerTest extends JUnit3Suite { override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] override val logRetentionHours = retentionHours - override val flushInterval = 100 } logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, retentionMs, false) logManager.startup @@ -156,8 +154,8 @@ class LogManagerTest extends JUnit3Suite { config = new KafkaConfig(props) { override val logFileSize = 1024 *1024 *1024 override val flushSchedulerThreadRate = 50 - override val flushInterval = Int.MaxValue - override val flushIntervalMap = Map("timebasedflush" -> 100) + override val flushInterval = Long.MaxValue + override val flushIntervalMap = Map("timebasedflush" -> 100L) } logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) logManager.startup diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 6ac59ce..3f7c029 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -37,7 +37,7 @@ import kafka.utils._ class AsyncProducerTest extends JUnit3Suite { val props = createBrokerConfigs(1) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) override def setUp() { super.setUp() diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index ca5fcb2..560bc44 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -27,9 +27,7 @@ import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime} class HighwatermarkPersistenceTest extends JUnit3Suite { - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { - override val defaultFlushIntervalMs = 100 - }) + val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_)) val topic = "foo" def testHighWatermarkPersistenceSinglePartition() { diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 59efaf4..37ac10c 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -30,7 +30,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 10 override val replicaMinBytes = 20 }) val topic = "new-topic" @@ -155,7 +154,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 10 override val replicaMinBytes = 20 override val logFileSize = 30 }) @@ -200,7 +198,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaMaxLagTimeMs = 5000L override val replicaMaxLagBytes = 10L - override val flushInterval = 1000 override val replicaMinBytes = 20 override val logFileSize = 30 }) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 749c0ae..a5f211c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -28,7 +28,7 @@ import junit.framework.Assert._ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(2) - val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) + val configs = props.map(p => new KafkaConfig(p)) var brokers: Seq[KafkaServer] = null val topic1 = "foo" val topic2 = "bar" diff --git a/perf/src/main/scala/kafka/perf/LogPerformance.scala b/perf/src/main/scala/kafka/perf/LogPerformance.scala new file mode 100644 index 0000000..6cab09f --- /dev/null +++ b/perf/src/main/scala/kafka/perf/LogPerformance.scala @@ -0,0 +1,339 @@ +package kafka.perf + +import java.io.File +import java.util.{Random, Properties} +import java.util.concurrent._ +import java.util.concurrent.atomic._ +import com.yammer.metrics.stats.UniformSample +import scala.collection.JavaConversions._ +import kafka.common._ +import kafka.message._ +import kafka.log._ +import kafka.utils._ +import kafka.server.KafkaConfig + +/** + * A performance test that interacts directly with the log subsystem without the rest of the broker running + * or any remote communication. + */ +object LogPerformance { + + def main(args: Array[String]) { + val config = new LogPerfConfig(args) + + if(config.help) { + config.parser.printHelpOn(System.err) + System.exit(0) + } + + val dirs = config.dataDirs.map(new File(_)) + dirs.map(_.mkdirs()) + val props = new Properties + props.put("brokerid", 0.toString) + props.put("log.dir", config.dataDirs.mkString(",")) + props.put("log.flush.interval", config.flushInterval.toString) + props.put("log.default.flush.interval.ms", config.flushMs.toString) + props.put("log.index.interval.bytes", config.indexInterval.toString) + + val scheduler = new KafkaScheduler(1) + scheduler.startup() + val logManager = new LogManager(config = new KafkaConfig(props), + scheduler = scheduler, + time = SystemTime, + logRollDefaultIntervalMs = Long.MaxValue, + logCleanupIntervalMs = 60*1000, + logCleanupDefaultAgeMs = 24*60*60*1000, + needRecovery = false) + logManager.startup() + + // create metrics + val globalMetrics = new Metrics(readBytes = new Metric(4096), + readTimes = new Metric(4096), + writeBytes = new Metric(4096), + writeTimes = new Metric(4096)) + val snapshotMetrics = new Metrics(readBytes = new Metric(4096, globalMetrics.readBytes), + readTimes = new Metric(4096, globalMetrics.readTimes), + writeBytes = new Metric(4096, globalMetrics.writeBytes), + writeTimes = new Metric(4096, globalMetrics.writeTimes)) + + // start writers + val writers = + for(i <- 0 until config.numWriters) + yield new WriterThread(topic = "test", + numPartitions = config.numPartitions, + codec = config.codec, + numMessages = config.numMessages, + messageSize = config.messageSize, + writeSize = config.writerBatchSize, + logManager = logManager, + metrics = snapshotMetrics) + writers.foreach(_.start()) + + // start readers + val readers = + for(i <- 0 until config.numReaders) + yield new ReaderThread(topic = "test", + numPartitions = config.numPartitions, + readSize = config.readerBatchSize, + logManager = logManager, + metrics = snapshotMetrics) + readers.foreach(_.start()) + + // start reporter + globalMetrics.printHeader() + val reporter = new ReporterThread(config.reportingInterval.toLong, snapshotMetrics) + reporter.start() + + // await completion of writers + writers.foreach(_.join) + + // stop readers + readers.foreach(_.shutdown()) + readers.foreach(_.join) + + // stop reporter + reporter.interrupt() + + // print totals + globalMetrics.printReport() + + // shutdown + scheduler.shutdownNow() + logManager.shutdown() + } + + abstract class PerfThread(bytes: Metric, times: Metric) extends Thread { + val logManager: LogManager + val running = new AtomicBoolean(true) + var started = SystemTime.nanoseconds + + override def run() { + while(running.get) { + val start = SystemTime.nanoseconds + val b = work() + times.record(SystemTime.nanoseconds - start) + bytes.record(b) + } + } + + def reset() { + times.reset() + bytes.reset() + } + + def shutdown() { + running.set(false) + } + + def work(): Long + } + + class ReaderThread(val topic: String, + val numPartitions: Int, + val readSize: Int, + val logManager: LogManager, + val metrics: Metrics) extends PerfThread(metrics.readBytes, metrics.readTimes) { + var offset = 0L + var part = 0 + + def work(): Long = { + try { + val log = logManager.getOrCreateLog(topic, part) + val messages = log.read(offset, Int.MaxValue, Some(offset + readSize)) + part = (part + 1) % numPartitions + offset += 1 + messages.sizeInBytes + } catch { + case e: OffsetOutOfRangeException => + offset = 0L + 0L + } + } + } + + class WriterThread(val topic: String, + val numPartitions: Int, + val codec: CompressionCodec, + val numMessages: Long, + val messageSize: Int, + val writeSize: Int, + val logManager: LogManager, + val metrics: Metrics) extends PerfThread(metrics.writeBytes, metrics.writeTimes) { + var part = 0 + var written = 0L + val random = new Random() + val messages = new ByteBufferMessageSet(codec, (0 until writeSize).map(_ => makeMessage(messageSize)):_*) + + def work(): Long = { + val log = logManager.getOrCreateLog(topic, part) + log.append(messages, assignOffsets = true) + part = (part + 1) % numPartitions + written += writeSize + if(written >= numMessages) + shutdown() + messages.sizeInBytes + } + + private def makeMessage(size: Int): Message = { + val bytes = new Array[Byte](size) + random.nextBytes(bytes) + new Message(bytes) + } + } + + class ReporterThread(val sleepMs: Long, + val metrics: Metrics) extends Thread { + override def run() { + try { + while(!Thread.interrupted()) { + Thread.sleep(sleepMs) + metrics.printReport() + metrics.reset() + } + } catch { + case e: InterruptedException => + } + } + } + + class Metrics(val readBytes: Metric, + val readTimes: Metric, + val writeBytes: Metric, + val writeTimes: Metric) { + def printHeader() { + print("%13s %13s %13s ".format("mb/sec_read", "avg_read_ms", "99th_read_ms")) + println("%13s %13s %13s".format("mb/sec_write", "avg_write_ms", "99th_write_ms")) + } + + def printReport() { + println("%13.3f %13.1f %13.1f %13.3f %13.1f %13.1f".format(mbsSec(readBytes), + ns2ms(readTimes.avg), + ns2ms(readTimes.quantile(0.99)), + mbsSec(writeBytes), + ns2ms(writeTimes.avg), + ns2ms(writeTimes.quantile(0.99)))) + } + + def reset() { + readBytes.reset() + readTimes.reset() + writeBytes.reset() + writeTimes.reset() + } + + private def ns2ms(n: Double) = n / (1000.0*1000.0) + + private def mbsSec(metric: Metric) = metric.rate(TimeUnit.SECONDS) / (1024*1024.0) + } + + class Metric(samples: Int = 4096, val parent: Metric = null) { + private var start = SystemTime.nanoseconds + private val sampler = new UniformSample(samples) + private var _max = Long.MinValue + private var _total = 0L + private var _count = 0L + + def record(value: Long) { + this.synchronized { + this._total += value + this._count += 1 + this.sampler.update(value) + if(value > _max) + this._max = value + } + if(parent != null) + parent.record(value) + } + + def reset() { + this synchronized { + this.start = SystemTime.nanoseconds + this._total = 0L + this._count = 0L + this._max = Long.MinValue + this.sampler.clear() + } + } + + def ellapsed(unit: TimeUnit) = unit.convert(SystemTime.nanoseconds - this.synchronized(start), TimeUnit.NANOSECONDS) + + def rate(unit: TimeUnit) = this synchronized {_total.toDouble / ellapsed(unit)} + + def avg = this synchronized {_total/_count.toDouble} + + def count = this synchronized _count + + def total = this synchronized _total + + def max = this synchronized _max + + def quantile(q: Double) = this synchronized {this.sampler.getSnapshot().getValue(q)} + + } + + class LogPerfConfig(args: Array[String]) extends PerfConfig(args) { + val dataDirsOpt = parser.accepts("dir", "The log directory.") + .withRequiredArg + .describedAs("path") + .ofType(classOf[String]) + .defaultsTo(new File(System.getProperty("java.io.tmpdir"), + "kafka-" + new Random().nextInt(10000000)).getAbsolutePath) + val numPartitionsOpt = parser.accepts("partitions", "The number of partitions.") + .withRequiredArg + .describedAs("num_partitions") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val readerBatchSizeOpt = parser.accepts("reader-batch-size", "The number of messages to write at once.") + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val writerBatchSizeOpt = parser.accepts("writer-batch-size", "The number of messages to write at once.") + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val numReadersOpt = parser.accepts("readers", "The number of reader threads.") + .withRequiredArg + .describedAs("num_threads") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val numWritersOpt = parser.accepts("writers", "The number of writer threads.") + .withRequiredArg + .describedAs("num_threads") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val flushIntervalOpt = parser.accepts("flush-interval", "The number of messages in a partition between flushes.") + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Int.MaxValue) + val flushMsOpt = parser.accepts("flush-time", "The time between flushes.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(Int.MaxValue) + val indexIntervalOpt = parser.accepts("index-interval", "The number of bytes in between index entries.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(4096) + + val options = parser.parse(args : _*) + + var dataDirs = options.valuesOf(dataDirsOpt) + val numMessages = options.valueOf(numMessagesOpt).intValue + val messageSize = options.valueOf(messageSizeOpt).intValue + val numPartitions = options.valueOf(numPartitionsOpt).intValue + val readerBatchSize = options.valueOf(readerBatchSizeOpt).intValue + val writerBatchSize = options.valueOf(writerBatchSizeOpt).intValue + val numReaders = options.valueOf(numReadersOpt).intValue + val numWriters = options.valueOf(numWritersOpt).intValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val flushInterval = options.valueOf(flushIntervalOpt).intValue + val flushMs = options.valueOf(flushMsOpt).intValue + val codec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) + val indexInterval = options.valueOf(indexIntervalOpt).intValue + val help = options.has(helpOpt) + } +} \ No newline at end of file diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala index db2c1a1..a96b28e 100644 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ b/perf/src/main/scala/kafka/perf/PerfConfig.scala @@ -45,4 +45,20 @@ class PerfConfig(args: Array[String]) { val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + "interval as configured by reporting-interval") val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") + val messageSizeOpt = parser.accepts("message-size", "The size of each message.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") + .withRequiredArg + .describedAs("compression codec ") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val helpOpt = parser.accepts("help", "Print usage.") } diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index b8925ca..f1423c8 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -86,28 +86,13 @@ object ProducerPerformance extends Logging { .withRequiredArg() .ofType(classOf[java.lang.Integer]) .defaultsTo(-1) - val messageSizeOpt = parser.accepts("message-size", "The size of each message.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.") - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed") - .withRequiredArg - .describedAs("compression codec ") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) val initialMessageIdOpt = parser.accepts("initial-message-id", "If set, messages will be tagged with an " + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + "in the form of 'Message:000...1:xxx...'") @@ -142,7 +127,7 @@ object ProducerPerformance extends Logging { var isAsync = options.has(asyncOpt) var batchSize = options.valueOf(batchSizeOpt).intValue var numThreads = options.valueOf(numThreadsOpt).intValue - val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) val initialMessageId = options.valueOf(initialMessageIdOpt).intValue() val seqIdMode = options.has(initialMessageIdOpt) val produceRequestTimeoutMs = options.valueOf(produceRequestTimeoutMsOpt).intValue()