diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index d3be2da..b8ff984 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -37,7 +37,6 @@ class Partition(val topic: String, private val localBrokerId = replicaManager.config.brokerId private val logManager = replicaManager.logManager private val replicaFetcherManager = replicaManager.replicaFetcherManager - private val highwaterMarkCheckpoint = replicaManager.highWatermarkCheckpoint private val zkClient = replicaManager.zkClient var leaderReplicaIdOpt: Option[Int] = None var inSyncReplicas: Set[Replica] = Set.empty[Replica] @@ -69,8 +68,8 @@ class Partition(val topic: String, case None => if (isReplicaLocal(replicaId)) { val log = logManager.getOrCreateLog(topic, partitionId) - val localReplica = new Replica(replicaId, this, time, - highwaterMarkCheckpoint.read(topic, partitionId).min(log.logEndOffset), Some(log)) + val offset = replicaManager.highWatermarkCheckpoints(log.dir.getParent).read(topic, partitionId).min(log.logEndOffset) + val localReplica = new Replica(replicaId, this, time, offset, Some(log)) addReplicaIfNotExists(localReplica) } else { diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7f5e9aa..eaedfd7 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -26,61 +26,73 @@ import kafka.server.{HighwaterMarkCheckpoint, KafkaConfig} /** - * The guy who creates and hands out logs + * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. + * All read and write operations are delegated to the individual log instances. + * + * The log manager maintains logs in one or more directories. New logs are created in each directory in a + * round-robin fashion beginning with a random directory. + * + * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe private[kafka] class LogManager(val config: KafkaConfig, scheduler: KafkaScheduler, - private val time: Time, - val logRollDefaultIntervalMs: Long, - val logCleanupIntervalMs: Long, - val logCleanupDefaultAgeMs: Long, - needRecovery: Boolean) extends Logging { + private val time: Time) extends Logging { - val logDir: File = new File(config.logDir) + val CleanShutdownFile = ".kafka_cleanshutdown" + val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray private val logFileSizeMap = config.logFileSizeMap - private val flushInterval = config.flushInterval - private val logCreationLock = new Object + private val logFlushInterval = config.flushInterval private val logFlushIntervals = config.flushIntervalMap + private val logCreationLock = new Object private val logRetentionSizeMap = config.logRetentionSizeMap private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) - this.logIdent = "[Log Manager on Broker " + config.brokerId + "] " + private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours + private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes + private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours - /* Initialize a log for each subdirectory of the main log directory */ - private val logs = new Pool[String, Pool[Int, Log]]() - if(!logDir.exists()) { - info("No log directory found, creating '" + logDir.getAbsolutePath() + "'") - logDir.mkdirs() - } - if(!logDir.isDirectory() || !logDir.canRead()) - throw new KafkaException(logDir.getAbsolutePath() + " is not a readable log directory.") - val subDirs = logDir.listFiles() - if(subDirs != null) { - for(dir <- subDirs) { - if(dir.getName.equals(HighwaterMarkCheckpoint.highWatermarkFileName)){ - // skip valid metadata file - } - else if(!dir.isDirectory()) { - warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") - } else { - info("Loading log '" + dir.getName() + "'") - val topicPartition = parseTopicPartitionName(dir.getName) - val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs) - val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize) - val log = new Log(dir, - maxLogFileSize, - config.maxMessageSize, - flushInterval, - rollIntervalMs, - needRecovery, - config.logIndexMaxSizeBytes, - config.logIndexIntervalBytes, - time, - config.brokerId) - logs.putIfNotExists(topicPartition.topic, new Pool[Int, Log]()) - val parts = logs.get(topicPartition.topic) - parts.put(topicPartition.partition, log) + this.logIdent = "[Log Manager on Broker " + config.brokerId + "] " + private val logs = new Pool[TopicAndPartition, Log]() + + /* initialize a log for each subdirectory of the main log directories */ + for(logDir <- logDirs) { + if(!logDir.exists) { + info("Log directory '" + logDir.getAbsolutePath + "' not found, creating it.") + logDir.mkdirs() + } + + if(!logDir.isDirectory || !logDir.canRead) + throw new KafkaException(logDir.getAbsolutePath + " is not a readable log directory.") + + /* check if this set of logs was shut down cleanly */ + val cleanShutDownFile = new File(logDir, CleanShutdownFile) + val needsRecovery = cleanShutDownFile.exists + cleanShutDownFile.delete + + /* load the logs */ + val subDirs = logDir.listFiles() + if(subDirs != null) { + for(dir <- subDirs) { + if(dir.isDirectory){ + info("Loading log '" + dir.getName + "'") + val topicPartition = parseTopicPartitionName(dir.getName) + val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs) + val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize) + val log = new Log(dir, + maxLogFileSize, + config.maxMessageSize, + logFlushInterval, + rollIntervalMs, + needsRecovery, + config.logIndexMaxSizeBytes, + config.logIndexIntervalBytes, + time, + config.brokerId) + val previous = logs.put(topicPartition, log) + if(previous != null) + throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) + } } } } @@ -100,19 +112,53 @@ private[kafka] class LogManager(val config: KafkaConfig, } } - /** * Create a log for the given topic and the given partition + * If the log already exists, just return a copy of the existing log */ - private def createLog(topic: String, partition: Int): Log = { + private def createLogIfNotExists(topicAndPartition: TopicAndPartition): Log = { logCreationLock synchronized { - val d = new File(logDir, topic + "-" + partition) - d.mkdirs() - val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) - val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) - new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needsRecovery = false, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, time, config.brokerId) + var log = logs.get(topicAndPartition) + + // check if the log has already been created in another thread + if(log != null) + return log + + // if not, create it + val dataDir = nextLogDir() + val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition) + dir.mkdirs() + val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs) + val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize) + log = new Log(dir, + maxLogFileSize, + config.maxMessageSize, + logFlushInterval, + rollIntervalMs, + needsRecovery = false, + config.logIndexMaxSizeBytes, + config.logIndexIntervalBytes, + time, + config.brokerId) + info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath)) + logs.put(topicAndPartition, log) + log } } + + /** + * Choose the next directory in which to create a log and increment the index + */ + private def nextLogDir(): File = { + // count the number of logs in each parent directory (including 0 for empty directories + val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) + val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap + var dirCounts = (zeros ++ logCounts).toBuffer + + // chose the directory with the least logs in it + val leastLoaded = dirCounts.sortBy(_._2).first + new File(leastLoaded._1) + } def getOffsets(topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val log = getLog(topicAndPartition.topic, topicAndPartition.partition) @@ -126,45 +172,28 @@ private[kafka] class LogManager(val config: KafkaConfig, * Get the log if it exists */ def getLog(topic: String, partition: Int): Option[Log] = { - val parts = logs.get(topic) - if (parts == null) None - else { - val log = parts.get(partition) - if(log == null) None - else Some(log) - } + val topicAndPartiton = TopicAndPartition(topic, partition) + val log = logs.get(topicAndPartiton) + if (log == null) + None + else + Some(log) } /** * Create the log if it does not exist, if it exists just return it */ def getOrCreateLog(topic: String, partition: Int): Log = { - var hasNewTopic = false - var parts = logs.get(topic) - if (parts == null) { - val found = logs.putIfNotExists(topic, new Pool[Int, Log]) - if (found == null) - hasNewTopic = true - parts = logs.get(topic) + val topicAndPartition = TopicAndPartition(topic, partition) + logs.get(topicAndPartition) match { + case null => createLogIfNotExists(topicAndPartition) + case log: Log => log } - var log = parts.get(partition) - if(log == null) { - // check if this broker hosts this partition - log = createLog(topic, partition) - val found = parts.putIfNotExists(partition, log) - if(found != null) { - // there was already somebody there - log.close() - log = found - } - else - info("Created log for '" + topic + "'-" + partition) - } - - log } - /* Runs through the log removing segments older than a certain age */ + /** + * Runs through the log removing segments older than a certain age + */ private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds val topic = parseTopicPartitionName(log.name).topic @@ -216,14 +245,18 @@ private[kafka] class LogManager(val config: KafkaConfig, */ def shutdown() { debug("Shutting down.") + // close the logs allLogs.foreach(_.close()) + // mark that the shutdown was clean by creating the clean shutdown marker file + for(logDir <- logDirs) + Utils.swallow(new File(logDir, CleanShutdownFile).createNewFile()) debug("Shutdown complete.") } /** * Get all the partition logs */ - def allLogs() = logs.values.flatMap(_.values) + def allLogs(): Iterable[Log] = logs.values /** * Flush any log which has exceeded its flush interval and has unwritten messages. @@ -253,12 +286,11 @@ private[kafka] class LogManager(val config: KafkaConfig, } } - - def topics(): Iterable[String] = logs.keys - private def parseTopicPartitionName(name: String): TopicAndPartition = { val index = name.lastIndexOf('-') TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) } + def topics(): Iterable[String] = logs.keys.map(_.topic) + } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e25cf81..6a5b4de 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -21,7 +21,7 @@ import java.util.Properties import kafka.message.Message import kafka.consumer.ConsumerConfig import java.net.InetAddress -import kafka.utils.{VerifiableProperties, ZKConfig} +import kafka.utils.{VerifiableProperties, ZKConfig, Utils} /** * Configuration settings for the kafka server @@ -74,7 +74,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue)) /* the directories in which the log data is kept */ - val logDir = props.getString("log.dir") + val logDirs = Utils.parseCsvList(props.getString("log.directories", props.getString("log.dir", ""))) + require(logDirs.size > 0) /* the maximum size of a single log file */ val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index cc83198..8250a8a 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -32,7 +32,6 @@ import kafka.controller.KafkaController */ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging { this.logIdent = "[Kafka Server " + config.brokerId + "], " - val CleanShutdownFile = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) var socketServer: SocketServer = null @@ -53,12 +52,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg info("starting") isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) - var needRecovery = true - val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile) - if (cleanShutDownFile.exists) { - needRecovery = false - cleanShutDownFile.delete - } /* start scheduler */ kafkaScheduler.startup @@ -66,11 +59,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start log manager */ logManager = new LogManager(config, kafkaScheduler, - time, - 1000L * 60 * 60 * config.logRollHours, - 1000L * 60 * config.logCleanupIntervalMinutes, - 1000L * 60 * 60 * config.logRetentionHours, - needRecovery) + time) logManager.startup() socketServer = new SocketServer(config.brokerId, @@ -128,9 +117,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if(kafkaController != null) kafkaController.shutdown() - val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile) - debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath()) - cleanShutDownFile.createNewFile shutdownLatch.countDown() info("shut down completed") } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8461dbe..0bc0c14 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -44,8 +44,7 @@ class ReplicaManager(val config: KafkaConfig, val replicaFetcherManager = new ReplicaFetcherManager(config, this) this.logIdent = "Replica Manager on Broker " + config.brokerId + ": " private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) - val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) - info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) + val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap newGauge( "LeaderCount", @@ -244,22 +243,24 @@ class ReplicaManager(val config: KafkaConfig, * Flushes the highwatermark value for all partitions to the highwatermark file */ def checkpointHighWatermarks() { - val highWaterarksForAllPartitions = allPartitions.map { - partition => - val topic = partition._1._1 - val partitionId = partition._1._2 - val localReplicaOpt = partition._2.getReplica(config.brokerId) - val hw = localReplicaOpt match { - case Some(localReplica) => localReplica.highWatermark - case None => - error("Highwatermark for topic %s partition %d doesn't exist during checkpointing" - .format(topic, partitionId)) - 0L - } - (topic, partitionId) -> hw - }.toMap - highWatermarkCheckpoint.write(highWaterarksForAllPartitions) - trace("Checkpointed high watermark data: %s".format(highWaterarksForAllPartitions)) + // partition up the partitions by the log dir used to store the data + val partitionsByDir = allPartitions.values.groupBy(p => logManager.getLog(p.topic, p.partitionId).get.dir.getParent()) + // for each parent log dir write out the highwater marks for that directories logs + for((dir, partitions) <- partitionsByDir) { + val highWatermarks = partitions.map { + partition => + val hw = partition.getReplica(config.brokerId) match { + case Some(localReplica) => localReplica.highWatermark + case None => + error("Highwatermark for topic %s partition %d doesn't exist during checkpointing" + .format(partition.topic, partition.partitionId)) + 0L + } + (partition.topic, partition.partitionId) -> hw + }.toMap + highWatermarkCheckpoints(dir).write(highWatermarks) + trace("Checkpointed high watermark data: %s to %s.".format(highWatermarks, dir)) + } } def shutdown() { diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 753234e..898a5b2 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -226,22 +226,28 @@ object Utils extends Logging { def rm(file: String): Unit = rm(new File(file)) /** + * Recursively delete the list of files/directories and any subfiles (if any exist) + * @param a sequence of files to be deleted + */ + def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f))) + + /** * Recursively delete the given file/directory and any subfiles (if any exist) * @param file The root file at which to begin deleting */ - def rm(file: File): Unit = { - if(file == null) { - return - } else if(file.isDirectory) { - val files = file.listFiles() - if(files != null) { - for(f <- files) - rm(f) - } - file.delete() - } else { - file.delete() - } + def rm(file: File) { + if(file == null) { + return + } else if(file.isDirectory) { + val files = file.listFiles() + if(files != null) { + for(f <- files) + rm(f) + } + file.delete() + } else { + file.delete() + } } /** diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala index 30c2758..d694ba9 100644 --- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -22,6 +22,8 @@ import scala.collection._ class VerifiableProperties(val props: Properties) extends Logging { private val referenceSet = mutable.HashSet[String]() + + def this() = this(new Properties) def containsKey(name: String): Boolean = { props.containsKey(name) @@ -185,4 +187,6 @@ class VerifiableProperties(val props: Properties) extends Logging { info("Property %s is overridden to %s".format(key, props.getProperty(key))) } } + + override def toString(): String = props.toString } diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index b4ee2d9..194dd70 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -40,7 +40,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDir)) + servers.map(server => server.config.logDirs.map(Utils.rm(_))) super.tearDown } } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index ab6ef43..31a1b49 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -29,10 +29,10 @@ class LogManagerTest extends JUnit3Suite { val time: MockTime = new MockTime() val maxRollInterval = 100 - val maxLogAge = 1000 + val maxLogAgeHours = 10 var logDir: File = null var logManager: LogManager = null - var config:KafkaConfig = null + var config: KafkaConfig = null val name = "kafka" val veryLargeLogFlushInterval = 10000000L val scheduler = new KafkaScheduler(2) @@ -41,12 +41,13 @@ class LogManagerTest extends JUnit3Suite { super.setUp() config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) { override val logFileSize = 1024 - override val flushInterval = 100 + override val flushInterval = 10000 + override val logRetentionHours = maxLogAgeHours } scheduler.startup - logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) + logManager = new LogManager(config, scheduler, time) logManager.startup - logDir = logManager.logDir + logDir = logManager.logDirs(0) } override def tearDown() { @@ -54,13 +55,14 @@ class LogManagerTest extends JUnit3Suite { if(logManager != null) logManager.shutdown() Utils.rm(logDir) + logManager.logDirs.map(Utils.rm(_)) super.tearDown() } @Test def testCreateLog() { val log = logManager.getOrCreateLog(name, 0) - val logFile = new File(config.logDir, name + "-0") + val logFile = new File(config.logDirs(0), name + "-0") assertTrue(logFile.exists) log.append(TestUtils.singleMessageSet("test".getBytes())) } @@ -68,7 +70,7 @@ class LogManagerTest extends JUnit3Suite { @Test def testGetLog() { val log = logManager.getLog(name, 0) - val logFile = new File(config.logDir, name + "-0") + val logFile = new File(config.logDirs(0), name + "-0") assertTrue(!logFile.exists) } @@ -87,9 +89,9 @@ class LogManagerTest extends JUnit3Suite { // update the last modified time of all log segments val logSegments = log.segments.view - logSegments.foreach(s => s.messageSet.file.setLastModified(time.currentMs)) + logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs)) - time.currentMs += maxLogAge + 3000 + time.currentMs += maxLogAgeHours*60*60*1000 + 1 logManager.cleanupLogs() assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments) assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes) @@ -115,8 +117,9 @@ class LogManagerTest extends JUnit3Suite { override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] override val logRetentionHours = retentionHours override val flushInterval = 100 + override val logRollHours = maxRollInterval } - logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, retentionMs, false) + logManager = new LogManager(config, scheduler, time) logManager.startup // create a log @@ -157,17 +160,37 @@ class LogManagerTest extends JUnit3Suite { override val logFileSize = 1024 *1024 *1024 override val flushSchedulerThreadRate = 50 override val flushInterval = Int.MaxValue + override val logRollHours = maxRollInterval override val flushIntervalMap = Map("timebasedflush" -> 100) } - logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) + logManager = new LogManager(config, scheduler, time) logManager.startup val log = logManager.getOrCreateLog(name, 0) for(i <- 0 until 200) { var set = TestUtils.singleMessageSet("test".getBytes()) log.append(set) } - println("now = " + System.currentTimeMillis + " last flush = " + log.getLastFlushedTime) - assertTrue("The last flush time has to be within defaultflushInterval of current time ", - (System.currentTimeMillis - log.getLastFlushedTime) < 150) + val ellapsed = System.currentTimeMillis - log.getLastFlushedTime + assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed), + ellapsed < 2*config.flushSchedulerThreadRate) + } + + @Test + def testLeastLoadedAssignment() { + // create a log manager with multiple data directories + val props = TestUtils.createBrokerConfig(0, -1) + val dirs = Seq(TestUtils.tempDir().getAbsolutePath, + TestUtils.tempDir().getAbsolutePath, + TestUtils.tempDir().getAbsolutePath) + props.put("log.directories", dirs.mkString(",")) + logManager = new LogManager(new KafkaConfig(props), scheduler, time) + + // verify that logs are always assigned to the least loaded partition + for(partition <- 0 until 20) { + logManager.getOrCreateLog("test", partition) + val counts = logManager.logDirs.map(_.list.size) + assertEquals("We should have created the right number of logs", partition + 1, counts.sum) + assertTrue("Load should balance evenly", counts.max <= counts.min + 1) + } } } diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 6f27ffc..b444dd8 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -79,8 +79,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ server1.awaitShutdown() server2.shutdown server2.awaitShutdown() - Utils.rm(server1.config.logDir) - Utils.rm(server2.config.logDir) + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index ca5fcb2..214c21b 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -16,14 +16,15 @@ */ package kafka.server -import kafka.log.Log +import kafka.log.{Log, LogManager} import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite import org.easymock.EasyMock +import org.junit._ import org.junit.Assert._ import kafka.common.KafkaException import kafka.cluster.Replica -import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime} +import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils} class HighwatermarkPersistenceTest extends JUnit3Suite { @@ -31,30 +32,37 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { override val defaultFlushIntervalMs = 100 }) val topic = "foo" + val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime)) + + @After + def teardown() { + for(manager <- logManagers; dir <- manager.logDirs) + Utils.rm(dir) + } def testHighWatermarkPersistenceSinglePartition() { // mock zkclient val zkClient = EasyMock.createMock(classOf[ZkClient]) EasyMock.replay(zkClient) + // create kafka scheduler val scheduler = new KafkaScheduler(2) scheduler.startup // create replica manager - val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null) + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0)) replicaManager.startup() replicaManager.checkpointHighWatermarks() - var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) + var fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(0L, fooPartition0Hw) val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1) - // create leader log - val log0 = getMockLog // create leader and follower replicas + val log0 = logManagers(0).getOrCreateLog(topic, 0) val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0)) partition0.addReplicaIfNotExists(leaderReplicaPartition0) val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime) partition0.addReplicaIfNotExists(followerReplicaPartition0) replicaManager.checkpointHighWatermarks() - fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) + fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) try { followerReplicaPartition0.highWatermark @@ -65,10 +73,9 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { // set the highwatermark for local replica partition0.getReplica().get.highWatermark = 5L replicaManager.checkpointHighWatermarks() - fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0) + fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw) EasyMock.verify(zkClient) - EasyMock.verify(log0) } def testHighWatermarkPersistenceMultiplePartitions() { @@ -81,35 +88,35 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { val scheduler = new KafkaScheduler(2) scheduler.startup // create replica manager - val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null) + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0)) replicaManager.startup() replicaManager.checkpointHighWatermarks() - var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(0L, topic1Partition0Hw) val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1) // create leader log - val topic1Log0 = getMockLog + val topic1Log0 = logManagers(0).getOrCreateLog(topic1, 0) // create a local replica for topic1 val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0)) topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0) replicaManager.checkpointHighWatermarks() - topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw) // set the highwatermark for local replica topic1Partition0.getReplica().get.highWatermark = 5L replicaManager.checkpointHighWatermarks() - topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark) assertEquals(5L, topic1Partition0Hw) // add another partition and set highwatermark val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1) // create leader log - val topic2Log0 = getMockLog + val topic2Log0 = logManagers(0).getOrCreateLog(topic2, 0) // create a local replica for topic2 val leaderReplicaTopic2Partition0 = new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0)) topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0) replicaManager.checkpointHighWatermarks() - var topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0) + var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw) // set the highwatermark for local replica topic2Partition0.getReplica().get.highWatermark = 15L @@ -119,19 +126,16 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark) replicaManager.checkpointHighWatermarks() // verify checkpointed hw for topic 2 - topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0) + topic2Partition0Hw = hwmFor(replicaManager, topic2, 0) assertEquals(15L, topic2Partition0Hw) // verify checkpointed hw for topic 1 - topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0) + topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(10L, topic1Partition0Hw) EasyMock.verify(zkClient) - EasyMock.verify(topic1Log0) - EasyMock.verify(topic2Log0) } - private def getMockLog: Log = { - val log = EasyMock.createMock(classOf[kafka.log.Log]) - EasyMock.replay(log) - log + def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { + replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read(topic, partition) } + } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 8239b64..3eae29e 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -45,7 +45,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDir)) + servers.map(server => Utils.rm(server.config.logDirs)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 59efaf4..cce858f 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -45,8 +45,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val message = new Message("hello".getBytes()) var producer: Producer[Int, Message] = null - var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir) - var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir) + var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0)) + var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] def testHWCheckpointNoFailuresSingleLogSegment { @@ -83,7 +83,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(numMessages, leaderHW) val followerHW = hwFile2.read(topic, 0) assertEquals(numMessages, followerHW) - servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)}) + servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))}) } def testHWCheckpointWithFailuresSingleLogSegment { @@ -148,7 +148,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read(topic, 0)) assertEquals(hw, hwFile2.read(topic, 0)) - servers.foreach(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointNoFailuresMultipleLogSegments { @@ -165,8 +165,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2 = TestUtils.createServer(configs.last) servers ++= List(server1, server2) - hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir) - hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir) + hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0)) + hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0)) val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) producerProps.put("producer.request.timeout.ms", "1000") @@ -193,7 +193,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(hw, leaderHW) val followerHW = hwFile2.read(topic, 0) assertEquals(hw, followerHW) - servers.foreach(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointWithFailuresMultipleLogSegments { @@ -210,8 +210,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2 = TestUtils.createServer(configs.last) servers ++= List(server1, server2) - hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir) - hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir) + hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0)) + hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0)) val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) producerProps.put("producer.request.timeout.ms", "1000") @@ -263,7 +263,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read(topic, 0)) assertEquals(hw, hwFile2.read(topic, 0)) - servers.foreach(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDirs)) } private def sendMessages(n: Int = 1) { diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index d738010..fae0719 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -56,8 +56,10 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { // do a clean shutdown server.shutdown() - val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile) - assertTrue(cleanShutDownFile.exists) + for(logDir <- config.logDirs) { + val cleanShutDownFile = new File(logDir, server.logManager.CleanShutdownFile) + assertTrue(cleanShutDownFile.exists) + } producer.close() } @@ -93,7 +95,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator) server.shutdown() - Utils.rm(server.config.logDir) + Utils.rm(server.config.logDirs) producer.close() } diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index e99aebe..3aae5ce 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -74,7 +74,6 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.config).andReturn(configs.head) EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) EasyMock.replay(replicaManager) @@ -169,7 +168,6 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.config).andReturn(configs.head) EasyMock.expect(replicaManager.logManager).andReturn(logManager) EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager])) - EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint])) EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) EasyMock.replay(replicaManager) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a5c663c..8dbd85e 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -483,6 +483,7 @@ object TestUtils extends Logging { byteBuffer.rewind() byteBuffer } + } object TestZKUtils {