diff --git a/config/log4j.properties b/config/log4j.properties index 1ab8507..baa698b 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -73,8 +73,6 @@ log4j.additivity.kafka.controller=false log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender log4j.additivity.kafka.log.LogCleaner=false -log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender -log4j.additivity.kafka.log.Cleaner=false log4j.logger.state.change.logger=TRACE, stateChangeAppender log4j.additivity.state.change.logger=false diff --git a/config/server.properties b/config/server.properties index 2ffe0eb..c9e923a 100644 --- a/config/server.properties +++ b/config/server.properties @@ -40,7 +40,7 @@ port=9092 num.network.threads=2 # The number of threads doing disk I/O -num.io.threads=2 +num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 @@ -100,6 +100,10 @@ log.segment.bytes=536870912 # to the retention policies log.retention.check.interval.ms=60000 +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). @@ -111,6 +115,3 @@ zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 - - -log.cleanup.policy=delete diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala index fa946ad..ade8386 100644 --- a/core/src/main/scala/kafka/log/CleanerConfig.scala +++ b/core/src/main/scala/kafka/log/CleanerConfig.scala @@ -35,7 +35,7 @@ case class CleanerConfig(val numThreads: Int = 1, val ioBufferSize: Int = 1024*1024, val maxMessageSize: Int = 32*1024*1024, val maxIoBytesPerSecond: Double = Double.MaxValue, - val backOffMs: Long = 60 * 1000, + val backOffMs: Long = 15 * 1000, val enableCleaner: Boolean = true, val hashAlgorithm: String = "MD5") { } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 6404647..312204c 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -131,6 +131,9 @@ class LogCleaner(val config: CleanerConfig, */ private class CleanerThread(threadId: Int) extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) { + + override val loggerName = classOf[LogCleaner].getName + if(config.dedupeBufferSize / config.numThreads > Int.MaxValue) warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...") @@ -185,7 +188,7 @@ class LogCleaner(val config: CleanerConfig, def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { def mb(bytes: Double) = bytes / (1024*1024) val message = - "%n\tLog cleaner %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + + "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), stats.elapsedSecs, mb(stats.bytesRead/stats.elapsedSecs)) + @@ -222,6 +225,8 @@ private[log] class Cleaner(val id: Int, throttler: Throttler, time: Time, checkDone: (TopicAndPartition) => Unit) extends Logging { + + override val loggerName = classOf[LogCleaner].getName this.logIdent = "Cleaner " + id + ": " diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 1612c8d..43e5c1f 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -40,6 +40,9 @@ private[log] case object LogCleaningPaused extends LogCleaningState * requested to be resumed. */ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging { + + override val loggerName = classOf[LogCleaner].getName + /* the offset checkpoints holding the last cleaned point for each log */ private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap @@ -65,7 +68,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe + val cleanableLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 0b32aee..18c86fe 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -34,7 +34,7 @@ import kafka.common._ * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted. * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned - * @param dedupe Should old segments in this log be deleted or deduplicated? + * @param compact Should old segments in this log be deleted or deduplicated? */ case class LogConfig(val segmentSize: Int = 1024*1024, val segmentMs: Long = Long.MaxValue, @@ -48,7 +48,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024, val fileDeleteDelayMs: Long = 60*1000, val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L, val minCleanableRatio: Double = 0.5, - val dedupe: Boolean = false) { + val compact: Boolean = false) { def toProps: Properties = { val props = new Properties() @@ -65,7 +65,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024, props.put(DeleteRetentionMsProp, deleteRetentionMs.toString) props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) - props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete") + props.put(CleanupPolicyProp, if(compact) "compact" else "delete") props } @@ -117,7 +117,7 @@ object LogConfig { fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt, deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong, minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble, - dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe") + compact = props.getProperty(CleanupPolicyProp).trim.toLowerCase != "delete") } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 10062af..bcd2bb7 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -351,7 +351,7 @@ class LogManager(val logDirs: Array[File], debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds - for(log <- allLogs; if !log.config.dedupe) { + for(log <- allLogs; if !log.config.compact) { debug("Garbage collecting '" + log.name + "'") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3c3aafc..4d10ef3 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -137,7 +137,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor", 0.9d) /* the amount of time to sleep when there are no logs to clean */ - val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L, Long.MaxValue)) + val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 15*1000, (0L, Long.MaxValue)) /* the minimum ratio of dirty log to total log for a log to eligible for cleaning */ val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5e34f95..0c43f83 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -260,7 +260,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg deleteRetentionMs = config.logCleanerDeleteRetentionMs, fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, - dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe") + compact = config.logCleanupPolicy.trim.toLowerCase == "compact") val defaultProps = defaultLogConfig.toProps val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala index 22b16e5..d20d132 100644 --- a/core/src/test/scala/other/kafka/TestLogCleaning.scala +++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala @@ -243,11 +243,11 @@ object TestLogCleaning { percentDeletes: Int): File = { val producerProps = new Properties producerProps.setProperty("producer.type", "async") - producerProps.setProperty("broker.list", brokerUrl) + producerProps.setProperty("metadata.broker.list", brokerUrl) producerProps.setProperty("serializer.class", classOf[StringEncoder].getName) producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName) producerProps.setProperty("queue.enqueue.timeout.ms", "-1") - producerProps.setProperty("batch.size", 1000.toString) + producerProps.setProperty("batch.num.messages", 1000.toString) val producer = new Producer[String, String](new ProducerConfig(producerProps)) val rand = new Random(1) val keyCount = (messages / dups).toInt @@ -275,8 +275,9 @@ object TestLogCleaning { def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = { val consumerProps = new Properties consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue)) - consumerProps.setProperty("zk.connect", zkUrl) - consumerProps.setProperty("consumer.timeout.ms", (10*1000).toString) + consumerProps.setProperty("zookeeper.connect", zkUrl) + consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString) + consumerProps.setProperty("auto.offset.reset", "smallest") new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps)) } diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 51cd94b..d10e4f4 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -33,7 +33,7 @@ import kafka.message._ class CleanerTest extends JUnitSuite { val dir = TestUtils.tempDir() - val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, dedupe=true) + val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, compact=true) val time = new MockTime() val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 1de3ef0..9aeb69d 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -101,7 +101,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { val dir = new File(logDir, "log-" + i) dir.mkdirs() val log = new Log(dir = dir, - LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true), + LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, compact = true), recoveryPoint = 0L, scheduler = time.scheduler, time = time)