diff --git a/build.gradle b/build.gradle index 3db5f67..0d30b8e 100644 --- a/build.gradle +++ b/build.gradle @@ -147,6 +147,7 @@ project(':core') { compile 'org.apache.zookeeper:zookeeper:3.3.4' compile 'com.101tec:zkclient:0.3' compile 'com.yammer.metrics:metrics-core:2.2.0' + compile 'com.yammer.metrics:metrics-annotation:2.2.0' compile 'net.sf.jopt-simple:jopt-simple:3.2' compile 'org.xerial.snappy:snappy-java:1.0.5' @@ -180,7 +181,7 @@ project(':core') { } tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type: Tar) { - into "kafka_${baseScalaVersion}-${version}" + into "." compression = Compression.GZIP from(project.file("../bin")) { into "bin/" } from(project.file("../config")) { into "config/" } diff --git a/config/log4j.properties b/config/log4j.properties index baa698b..1ab8507 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -73,6 +73,8 @@ log4j.additivity.kafka.controller=false log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender log4j.additivity.kafka.log.LogCleaner=false +log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender +log4j.additivity.kafka.log.Cleaner=false log4j.logger.state.change.logger=TRACE, stateChangeAppender log4j.additivity.state.change.logger=false diff --git a/config/server.properties b/config/server.properties index c9e923a..2ffe0eb 100644 --- a/config/server.properties +++ b/config/server.properties @@ -40,7 +40,7 @@ port=9092 num.network.threads=2 # The number of threads doing disk I/O -num.io.threads=8 +num.io.threads=2 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 @@ -100,10 +100,6 @@ log.segment.bytes=536870912 # to the retention policies log.retention.check.interval.ms=60000 -# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. -# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. -log.cleaner.enable=false - ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). @@ -115,3 +111,6 @@ zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 + + +log.cleanup.policy=delete diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 789c69d..fc8d686 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -34,9 +34,9 @@ object TopicCommand { val opts = new TopicCommandOptions(args) // should have exactly one action - val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _) + val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _) if(actions != 1) { - System.err.println("Command must include exactly one action: --list, --describe, --create or --alter") + System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter") opts.parser.printHelpOn(System.err) System.exit(1) } @@ -50,6 +50,8 @@ object TopicCommand { createTopic(zkClient, opts) else if(opts.options.has(opts.alterOpt)) alterTopic(zkClient, opts) + else if(opts.options.has(opts.deleteOpt)) + deleteTopic(zkClient, opts) else if(opts.options.has(opts.listOpt)) listTopics(zkClient, opts) else if(opts.options.has(opts.describeOpt)) @@ -112,6 +114,14 @@ object TopicCommand { } } + def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + val topics = getTopics(zkClient, opts) + topics.foreach { topic => + AdminUtils.deleteTopic(zkClient, topic) + println("Topic \"%s\" queued for deletion.".format(topic)) + } + } + def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) for(topic <- topics) @@ -206,9 +216,10 @@ object TopicCommand { val listOpt = parser.accepts("list", "List all available topics.") val createOpt = parser.accepts("create", "Create a new topic.") val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.") + val deleteOpt = parser.accepts("delete", "Delete the topic.") val describeOpt = parser.accepts("describe", "List details for the given topics.") val helpOpt = parser.accepts("help", "Print usage information.") - val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " + + val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe. Can also accept a regular " + "expression except for --create option") .withRequiredArg .describedAs("topic") @@ -244,7 +255,7 @@ object TopicCommand { val options = parser.parse(args : _*) - val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt) + val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, deleteOpt, describeOpt, listOpt) def checkArgs() { // check required args diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 810952e..5c5f549 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -56,7 +56,7 @@ class Partition(val topic: String, * each partition. */ private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId) - private val stateChangeLogger = KafkaController.stateChangeLogger + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) @@ -88,7 +88,7 @@ class Partition(val topic: String, if (isReplicaLocal(replicaId)) { val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) - val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) + val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId)) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index f17d976..8ab8ab6 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -114,7 +114,7 @@ class RequestSendThread(val controllerId: Int, val channel: BlockingChannel) extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { private val lock = new Object() - private val stateChangeLogger = KafkaController.stateChangeLogger + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) connectToBroker(toBroker, channel) override def doWork(): Unit = { @@ -188,7 +188,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]] val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[StopReplicaRequestInfo]] val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]] - private val stateChangeLogger = KafkaController.stateChangeLogger + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) def newBatch() { // raise error if the previous batch is not empty diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index d6c0321..f12ffc2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -35,9 +35,9 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import org.apache.log4j.Logger -import java.util.concurrent.locks.ReentrantLock import scala.Some import kafka.common.TopicAndPartition +import java.util.concurrent.locks.ReentrantLock class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int) { @@ -125,12 +125,10 @@ trait KafkaControllerMBean { object KafkaController extends Logging { val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps" - val stateChangeLogger = new StateChangeLogger("state.change.logger") + val stateChangeLogger = "state.change.logger" val InitialControllerEpoch = 1 val InitialControllerEpochZkVersion = 1 - case class StateChangeLogger(override val loggerName: String) extends Logging - def parseControllerId(controllerInfoString: String): Int = { try { Json.parseFull(controllerInfoString) match { @@ -156,7 +154,7 @@ object KafkaController extends Logging { class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true - private val stateChangeLogger = KafkaController.stateChangeLogger + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) @@ -337,21 +335,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ def onControllerResignation() { inLock(controllerContext.controllerLock) { + if (config.autoLeaderRebalanceEnable) + autoRebalanceScheduler.shutdown() + deleteTopicManager.shutdown() Utils.unregisterMBean(KafkaController.MBeanName) - - if(deleteTopicManager != null) - deleteTopicManager.shutdown() - partitionStateMachine.shutdown() replicaStateMachine.shutdown() - - if(config.autoLeaderRebalanceEnable) - autoRebalanceScheduler.shutdown() - if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null - info("Controller shutdown complete") } } } @@ -648,7 +640,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def shutdown() = { inLock(controllerContext.controllerLock) { isRunning = false - onControllerResignation() + partitionStateMachine.shutdown() + replicaStateMachine.shutdown() + if (config.autoLeaderRebalanceEnable) + autoRebalanceScheduler.shutdown() + if(controllerContext.controllerChannelManager != null) { + controllerContext.controllerChannelManager.shutdown() + controllerContext.controllerChannelManager = null + info("Controller shutdown complete") + } } } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 6457b56..c69077e 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -50,7 +50,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val hasStarted = new AtomicBoolean(false) private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " - private val stateChangeLogger = KafkaController.stateChangeLogger + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) private var topicChangeListener: TopicChangeListener = null private var deleteTopicsListener: DeleteTopicsListener = null private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty @@ -72,8 +72,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // register topic and partition change listeners def registerListeners() { registerTopicChangeListener() - if(controller.config.deleteTopicEnable) - registerDeleteTopicListener() + registerDeleteTopicListener() } /** diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 4da43c4..5e016d5 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -52,7 +52,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) private val hasStarted = new AtomicBoolean(false) this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " - private val stateChangeLogger = KafkaController.stateChangeLogger + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) /** * Invoked on successful controller election. First registers a broker change listener since that triggers all diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 40c4c57..58f1c42 100644 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -22,8 +22,6 @@ import kafka.utils.Utils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.atomic.AtomicBoolean /** * This manages the state machine for topic deletion. @@ -73,34 +71,28 @@ class TopicDeletionManager(controller: KafkaController, val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted - val deleteLock = new ReentrantLock() var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) - val deleteTopicsCond = deleteLock.newCondition() - var deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) + val deleteTopicsCond = controllerContext.controllerLock.newCondition() + var deleteTopicStateChanged: Boolean = false var deleteTopicsThread: DeleteTopicsThread = null - val isDeleteTopicEnabled = controller.config.deleteTopicEnable /** * Invoked at the end of new controller initiation */ def start() { - if(isDeleteTopicEnabled) { - deleteTopicsThread = new DeleteTopicsThread() - deleteTopicStateChanged.set(true) - deleteTopicsThread.start() - } + deleteTopicsThread = new DeleteTopicsThread() + deleteTopicStateChanged = true + deleteTopicsThread.start() } /** * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared */ def shutdown() { - if(isDeleteTopicEnabled) { - deleteTopicsThread.shutdown() - topicsToBeDeleted.clear() - topicsIneligibleForDeletion.clear() - } + deleteTopicsThread.shutdown() + topicsToBeDeleted.clear() + topicsIneligibleForDeletion.clear() } /** @@ -110,10 +102,8 @@ class TopicDeletionManager(controller: KafkaController, * @param topics Topics that should be deleted */ def enqueueTopicsForDeletion(topics: Set[String]) { - if(isDeleteTopicEnabled) { - topicsToBeDeleted ++= topics - resumeTopicDeletionThread() - } + topicsToBeDeleted ++= topics + resumeTopicDeletionThread() } /** @@ -125,12 +115,10 @@ class TopicDeletionManager(controller: KafkaController, * @param topics Topics for which deletion can be resumed */ def resumeDeletionForTopics(topics: Set[String] = Set.empty) { - if(isDeleteTopicEnabled) { - val topicsToResumeDeletion = topics & topicsToBeDeleted - if(topicsToResumeDeletion.size > 0) { - topicsIneligibleForDeletion --= topicsToResumeDeletion - resumeTopicDeletionThread() - } + val topicsToResumeDeletion = topics & topicsToBeDeleted + if(topicsToResumeDeletion.size > 0) { + topicsIneligibleForDeletion --= topicsToResumeDeletion + resumeTopicDeletionThread() } } @@ -143,16 +131,14 @@ class TopicDeletionManager(controller: KafkaController, * @param replicas Replicas for which deletion has failed */ def failReplicaDeletion(replicas: Set[PartitionAndReplica]) { - if(isDeleteTopicEnabled) { - val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic)) - if(replicasThatFailedToDelete.size > 0) { - val topics = replicasThatFailedToDelete.map(_.topic) - debug("Deletion failed for replicas %s. Halting deletion for topics %s" - .format(replicasThatFailedToDelete.mkString(","), topics)) - controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible) - markTopicIneligibleForDeletion(topics) - resumeTopicDeletionThread() - } + val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic)) + if(replicasThatFailedToDelete.size > 0) { + val topics = replicasThatFailedToDelete.map(_.topic) + debug("Deletion failed for replicas %s. Halting deletion for topics %s" + .format(replicasThatFailedToDelete.mkString(","), topics)) + controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible) + markTopicIneligibleForDeletion(topics) + resumeTopicDeletionThread() } } @@ -164,33 +150,22 @@ class TopicDeletionManager(controller: KafkaController, * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion */ def markTopicIneligibleForDeletion(topics: Set[String]) { - if(isDeleteTopicEnabled) { - val newTopicsToHaltDeletion = topicsToBeDeleted & topics - topicsIneligibleForDeletion ++= newTopicsToHaltDeletion - if(newTopicsToHaltDeletion.size > 0) - info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(","))) - } + val newTopicsToHaltDeletion = topicsToBeDeleted & topics + topicsIneligibleForDeletion ++= newTopicsToHaltDeletion + if(newTopicsToHaltDeletion.size > 0) + info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(","))) } def isTopicIneligibleForDeletion(topic: String): Boolean = { - if(isDeleteTopicEnabled) { - topicsIneligibleForDeletion.contains(topic) - } else - true + topicsIneligibleForDeletion.contains(topic) } def isTopicDeletionInProgress(topic: String): Boolean = { - if(isDeleteTopicEnabled) { - controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic) - } else - false + controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic) } def isTopicQueuedUpForDeletion(topic: String): Boolean = { - if(isDeleteTopicEnabled) { - topicsToBeDeleted.contains(topic) - } else - false + topicsToBeDeleted.contains(topic) } /** @@ -198,22 +173,19 @@ class TopicDeletionManager(controller: KafkaController, * controllerLock should be acquired before invoking this API */ private def awaitTopicDeletionNotification() { - inLock(deleteLock) { - while(!deleteTopicStateChanged.compareAndSet(true, false)) { - info("Waiting for signal to start or continue topic deletion") - deleteTopicsCond.await() - } + while(!deleteTopicStateChanged) { + info("Waiting for signal to start or continue topic deletion") + deleteTopicsCond.await() } + deleteTopicStateChanged = false } /** * Signals the delete-topic-thread to process topic deletion */ private def resumeTopicDeletionThread() { - deleteTopicStateChanged.set(true) - inLock(deleteLock) { - deleteTopicsCond.signal() - } + deleteTopicStateChanged = true + deleteTopicsCond.signal() } /** @@ -358,9 +330,8 @@ class TopicDeletionManager(controller: KafkaController, class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") { val zkClient = controllerContext.zkClient override def doWork() { - awaitTopicDeletionNotification() - inLock(controllerContext.controllerLock) { + awaitTopicDeletionNotification() val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted if(topicsQueuedForDeletion.size > 0) info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala index ade8386..fa946ad 100644 --- a/core/src/main/scala/kafka/log/CleanerConfig.scala +++ b/core/src/main/scala/kafka/log/CleanerConfig.scala @@ -35,7 +35,7 @@ case class CleanerConfig(val numThreads: Int = 1, val ioBufferSize: Int = 1024*1024, val maxMessageSize: Int = 32*1024*1024, val maxIoBytesPerSecond: Double = Double.MaxValue, - val backOffMs: Long = 15 * 1000, + val backOffMs: Long = 60 * 1000, val enableCleaner: Boolean = true, val hashAlgorithm: String = "MD5") { } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 312204c..6404647 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -131,9 +131,6 @@ class LogCleaner(val config: CleanerConfig, */ private class CleanerThread(threadId: Int) extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) { - - override val loggerName = classOf[LogCleaner].getName - if(config.dedupeBufferSize / config.numThreads > Int.MaxValue) warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...") @@ -188,7 +185,7 @@ class LogCleaner(val config: CleanerConfig, def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { def mb(bytes: Double) = bytes / (1024*1024) val message = - "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + + "%n\tLog cleaner %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead), stats.elapsedSecs, mb(stats.bytesRead/stats.elapsedSecs)) + @@ -225,8 +222,6 @@ private[log] class Cleaner(val id: Int, throttler: Throttler, time: Time, checkDone: (TopicAndPartition) => Unit) extends Logging { - - override val loggerName = classOf[LogCleaner].getName this.logIdent = "Cleaner " + id + ": " diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 79e9d55..1612c8d 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -40,9 +40,6 @@ private[log] case object LogCleaningPaused extends LogCleaningState * requested to be resumed. */ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging { - - override val loggerName = classOf[LogCleaner].getName - /* the offset checkpoints holding the last cleaned point for each log */ private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap @@ -68,11 +65,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val cleanableLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe - .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each - lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) - val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes + val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe + .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress + .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each + val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio if(dirtyLogs.isEmpty) { None diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 18c86fe..0b32aee 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -34,7 +34,7 @@ import kafka.common._ * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted. * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned - * @param compact Should old segments in this log be deleted or deduplicated? + * @param dedupe Should old segments in this log be deleted or deduplicated? */ case class LogConfig(val segmentSize: Int = 1024*1024, val segmentMs: Long = Long.MaxValue, @@ -48,7 +48,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024, val fileDeleteDelayMs: Long = 60*1000, val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L, val minCleanableRatio: Double = 0.5, - val compact: Boolean = false) { + val dedupe: Boolean = false) { def toProps: Properties = { val props = new Properties() @@ -65,7 +65,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024, props.put(DeleteRetentionMsProp, deleteRetentionMs.toString) props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString) props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString) - props.put(CleanupPolicyProp, if(compact) "compact" else "delete") + props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete") props } @@ -117,7 +117,7 @@ object LogConfig { fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt, deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong, minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble, - compact = props.getProperty(CleanupPolicyProp).trim.toLowerCase != "delete") + dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe") } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index bcd2bb7..e7a4f9c 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -52,7 +52,7 @@ class LogManager(val logDirs: Array[File], private val logs = new Pool[TopicAndPartition, Log]() createAndValidateLogDirs(logDirs) - private var dirLocks = lockLogDirs(logDirs) + private val dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap loadLogs(logDirs) @@ -351,7 +351,7 @@ class LogManager(val logDirs: Array[File], debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds - for(log <- allLogs; if !log.config.compact) { + for(log <- allLogs; if !log.config.dedupe) { debug("Garbage collecting '" + log.name + "'") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 826831f..a6ec970 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -31,9 +31,6 @@ import org.apache.log4j.Logger object RequestChannel extends Logging { val AllDone = new Request(1, 2, getShutdownReceive(), 0) - val requestLogger = new RequestLogger("kafka.request.logger") - - case class RequestLogger(override val loggerName: String) extends Logging def getShutdownReceive() = { val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) @@ -52,7 +49,7 @@ object RequestChannel extends Logging { val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer = null - private val requestLogger = RequestChannel.requestLogger + private val requestLogger = Logger.getLogger("kafka.request.logger") trace("Processor %d received request : %s".format(processor, requestObj)) def updateRequestMetrics() { @@ -84,10 +81,10 @@ object RequestChannel extends Logging { m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } - if(requestLogger.logger.isTraceEnabled) + if(requestLogger.isTraceEnabled) requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" .format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) - else { + else if(requestLogger.isDebugEnabled) { requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" .format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0f137c5..ae2df20 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -138,9 +138,10 @@ class KafkaApis(val requestChannel: RequestChannel, updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) updateMetadataRequest.partitionStateInfos.foreach { partitionState => metadataCache.put(partitionState._1, partitionState._2) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, - updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + if(stateChangeLogger.isTraceEnabled) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) } // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are // currently being deleted by the controller @@ -154,9 +155,10 @@ class KafkaApis(val requestChannel: RequestChannel, }.keySet partitionsToBeDeleted.foreach { partition => metadataCache.remove(partition) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition, - updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + if(stateChangeLogger.isTraceEnabled) + stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) } } val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0a288f9..3c3aafc 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -116,7 +116,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue)) - /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "compact" */ + /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */ val logCleanupPolicy = props.getString("log.cleanup.policy", "delete") /* the number of background threads to use for log cleaning */ @@ -137,7 +137,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor", 0.9d) /* the amount of time to sleep when there are no logs to clean */ - val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 15*1000, (0L, Long.MaxValue)) + val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L, Long.MaxValue)) /* the minimum ratio of dirty log to total log for a log to eligible for cleaning */ val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5) @@ -248,7 +248,4 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum size for a metadata entry associated with an offset commit */ val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", 1024) - - /* Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off */ - val deleteTopicEnable = props.getBoolean("delete.topic.enable", false) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0c43f83..5e34f95 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -260,7 +260,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg deleteRetentionMs = config.logCleanerDeleteRetentionMs, fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, - compact = config.logCleanupPolicy.trim.toLowerCase == "compact") + dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe") val defaultProps = defaultLogConfig.toProps val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 7af2f43..19f61a9 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -90,7 +90,7 @@ class OffsetCheckpoint(val file: File) extends Logging { val topic = pieces(0) val partition = pieces(1).toInt val offset = pieces(2).toLong - offsets += (TopicAndPartition(topic, partition) -> offset) + offsets += (TopicAndPartition(pieces(0), partition) -> offset) line = reader.readLine() } if(offsets.size != expectedSize) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7df56ce..8e57635 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -53,10 +53,10 @@ class ReplicaManager(val config: KafkaConfig, private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) - val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap + val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap private var hwThreadInitialized = false this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " - val stateChangeLogger = KafkaController.stateChangeLogger + val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) newGauge( "LeaderCount", @@ -440,7 +440,7 @@ class ReplicaManager(val config: KafkaConfig, */ def checkpointHighWatermarks() { val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) + val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap try { diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala index d20d132..22b16e5 100644 --- a/core/src/test/scala/other/kafka/TestLogCleaning.scala +++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala @@ -243,11 +243,11 @@ object TestLogCleaning { percentDeletes: Int): File = { val producerProps = new Properties producerProps.setProperty("producer.type", "async") - producerProps.setProperty("metadata.broker.list", brokerUrl) + producerProps.setProperty("broker.list", brokerUrl) producerProps.setProperty("serializer.class", classOf[StringEncoder].getName) producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName) producerProps.setProperty("queue.enqueue.timeout.ms", "-1") - producerProps.setProperty("batch.num.messages", 1000.toString) + producerProps.setProperty("batch.size", 1000.toString) val producer = new Producer[String, String](new ProducerConfig(producerProps)) val rand = new Random(1) val keyCount = (messages / dups).toInt @@ -275,9 +275,8 @@ object TestLogCleaning { def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = { val consumerProps = new Properties consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue)) - consumerProps.setProperty("zookeeper.connect", zkUrl) - consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString) - consumerProps.setProperty("auto.offset.reset", "smallest") + consumerProps.setProperty("zk.connect", zkUrl) + consumerProps.setProperty("consumer.timeout.ms", (10*1000).toString) new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps)) } diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index ca94b9d..dbe078c 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -219,10 +219,8 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers - val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) + val allServers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) @@ -261,10 +259,8 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers - val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) + val allServers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) @@ -432,10 +428,8 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(3) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers - val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) + val servers = TestUtils.createBrokerConfigs(3).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // wait until replica log is created on every broker diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index d10e4f4..51cd94b 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -33,7 +33,7 @@ import kafka.message._ class CleanerTest extends JUnitSuite { val dir = TestUtils.tempDir() - val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, compact=true) + val logConfig = LogConfig(segmentSize=1024, maxIndexSize=1024, dedupe=true) val time = new MockTime() val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 5bfa764..1de3ef0 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -92,7 +92,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { def makeCleaner(parts: Int, minDirtyMessages: Int = 0, numThreads: Int = 1, - defaultPolicy: String = "compact", + defaultPolicy: String = "dedupe", policyOverrides: Map[String, String] = Map()): LogCleaner = { // create partitions and add them to the pool @@ -101,7 +101,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { val dir = new File(logDir, "log-" + i) dir.mkdirs() val log = new Log(dir = dir, - LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, compact = true), + LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true), recoveryPoint = 0L, scheduler = time.scheduler, time = time) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b4bee33..be1a1ee 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -201,6 +201,7 @@ class LogManagerTest extends JUnit3Suite { /** * Test that it is not possible to open two log managers using the same data directory */ + @Test def testTwoLogManagersUsingSameDirFails() { try { new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) @@ -209,24 +210,75 @@ class LogManagerTest extends JUnit3Suite { case e: KafkaException => // this is good } } - + /** * Test that recovery points are correctly written out to disk */ + @Test def testCheckpointRecoveryPoints() { - val topicA = TopicAndPartition("test-a", 1) - val topicB = TopicAndPartition("test-b", 1) - val logA = this.logManager.createLog(topicA, logConfig) - val logB = this.logManager.createLog(topicB, logConfig) - for(i <- 0 until 50) - logA.append(TestUtils.singleMessageSet("test".getBytes())) - for(i <- 0 until 100) - logB.append(TestUtils.singleMessageSet("test".getBytes())) - logA.flush() - logB.flush() + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1), TopicAndPartition("test-b", 1)), logManager) + } + + /** + * Test that recovery points directory checking works with trailing slash + */ + @Test + def testRecoveryDirectoryMappingWithTrailingSlash() { + logManager.shutdown() + logDir = TestUtils.tempDir() + logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) + } + + /** + * Test that recovery points directory checking works with relative directory + */ + @Test + def testRecoveryDirectoryMappingWithRelativeDirectory() { + logManager.shutdown() + logDir = new File("data" + File.separator + logDir.getName) + logDir.mkdirs() + logDir.deleteOnExit() + logManager = new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) + } + + + private def verifyCheckpointRecovery(topicAndPartitions: Seq[TopicAndPartition], + logManager: LogManager) { + val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig)) + logs.foreach(log => { + for(i <- 0 until 50) + log.append(TestUtils.singleMessageSet("test".getBytes())) + + log.flush() + }) + logManager.checkpointRecoveryPointOffsets() val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() - assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint) - assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint) + + topicAndPartitions.zip(logs).foreach { + case(tp, log) => { + assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint) + } + } } } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 67497dd..49fc240 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -6,32 +6,35 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.log4j -import java.util.Properties -import java.io.File import kafka.consumer.SimpleConsumer import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{TestUtils, Utils, Logging} -import junit.framework.Assert._ import kafka.api.FetchRequestBuilder import kafka.producer.async.MissingConfigException import kafka.serializer.Encoder import kafka.zk.ZooKeeperTestHarness + +import java.util.Properties +import java.io.File + import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.{PropertyConfigurator, Logger} import org.junit.{After, Before, Test} import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ + class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { var logDirZk: File = null @@ -56,7 +59,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) server = TestUtils.createServer(config); - simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") + simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 1024, "") } @After @@ -72,8 +75,8 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -82,15 +85,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -99,15 +102,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -116,7 +119,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } @@ -132,7 +135,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with // serializer missing try { PropertyConfigurator.configure(props) - }catch { + } catch { case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder") } } @@ -156,7 +159,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with } private def getLog4jConfig: Properties = { - var props = new Properties() + val props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index a78f7cf..02c188a 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -143,7 +143,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { } def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { - replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L) + replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read.getOrElse(TopicAndPartition(topic, partition), 0L) } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala deleted file mode 100644 index b5936d4..0000000 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import org.scalatest.junit.JUnit3Suite -import org.junit.Test -import kafka.utils.{MockScheduler, MockTime, TestUtils} -import java.util.concurrent.atomic.AtomicBoolean -import java.io.File -import org.easymock.EasyMock -import org.I0Itec.zkclient.ZkClient -import kafka.cluster.Replica -import kafka.log.{LogManager, LogConfig, Log} - -class ReplicaManagerTest extends JUnit3Suite { - @Test - def testHighwaterMarkDirectoryMapping() { - val props = TestUtils.createBrokerConfig(1) - val dir = "/tmp/kafka-logs/" - new File(dir).mkdir() - props.setProperty("log.dirs", dir) - val config = new KafkaConfig(props) - val zkClient = EasyMock.createMock(classOf[ZkClient]) - val mockLogMgr = EasyMock.createMock(classOf[LogManager]) - val time: MockTime = new MockTime() - val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) - val partition = rm.getOrCreatePartition("test-topic", 1, 1) - partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null)))) - rm.checkpointHighWatermarks() - } -} diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index c7e058f..20fe93e 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -96,25 +96,5 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() server.shutdown() Utils.rm(server.config.logDirs) - verifyNonDaemonThreadsStatus - } - - @Test - def testCleanShutdownWithDeleteTopicEnabled() { - val newProps = TestUtils.createBrokerConfig(0, port) - newProps.setProperty("delete.topic.enable", "true") - val newConfig = new KafkaConfig(newProps) - var server = new KafkaServer(newConfig) - server.startup() - server.shutdown() - server.awaitShutdown() - Utils.rm(server.config.logDirs) - verifyNonDaemonThreadsStatus - } - - def verifyNonDaemonThreadsStatus() { - assertEquals(0, Thread.getAllStackTraces.keySet().toArray - .map(_.asInstanceOf[Thread]) - .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) } }