diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 23f1c2d..c4d29b5 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -18,6 +18,8 @@ package kafka.log import java.io._ +import java.util.Random +import java.util.concurrent.atomic._ import kafka.utils._ import scala.collection._ import kafka.log.Log._ @@ -26,7 +28,13 @@ import kafka.server.{HighwaterMarkCheckpoint, KafkaConfig} /** - * The guy who creates and hands out logs + * The kafka log subsystem. The log manager is responsible for log creation, retrieval, and cleaning. + * All read and write operations are delegated to the individual log instances. + * + * The log manager maintains logs in one or more directories. New logs are created in each directory in a + * round-robin fashion beginning with a random directory. + * + * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe private[kafka] class LogManager(val config: KafkaConfig, @@ -34,54 +42,61 @@ private[kafka] class LogManager(val config: KafkaConfig, private val time: Time, val logRollDefaultIntervalMs: Long, val logCleanupIntervalMs: Long, - val logCleanupDefaultAgeMs: Long, - needRecovery: Boolean) extends Logging { + val logCleanupDefaultAgeMs: Long) extends Logging { - val logDir: File = new File(config.logDir) + val CleanShutdownFile = ".kafka_cleanshutdown" + val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray private val logFileSizeMap = config.logFileSizeMap - private val flushInterval = config.flushInterval - private val logCreationLock = new Object + private val logFlushInterval = config.flushInterval private val logFlushIntervals = config.flushIntervalMap + private val logCreationLock = new Object private val logRetentionSizeMap = config.logRetentionSizeMap private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) this.logIdent = "[Log Manager on Broker " + config.brokerId + "] " + private var nextLogCreationIndex = new Random().nextInt(logDirs.length) - /* Initialize a log for each subdirectory of the main log directory */ private val logs = new Pool[String, Pool[Int, Log]]() - if(!logDir.exists()) { - info("No log directory found, creating '" + logDir.getAbsolutePath() + "'") - logDir.mkdirs() - } - if(!logDir.isDirectory() || !logDir.canRead()) - throw new KafkaException(logDir.getAbsolutePath() + " is not a readable log directory.") - val subDirs = logDir.listFiles() - if(subDirs != null) { - for(dir <- subDirs) { - if(dir.getName.equals(HighwaterMarkCheckpoint.highWatermarkFileName)){ - // skip valid metadata file - } - else if(!dir.isDirectory()) { - warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") - } else { - info("Loading log '" + dir.getName() + "'") - val topic = Utils.getTopicPartition(dir.getName)._1 - val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) - val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) - val log = new Log(dir, - maxLogFileSize, - config.maxMessageSize, - flushInterval, - rollIntervalMs, - needRecovery, - config.logIndexMaxSizeBytes, - config.logIndexIntervalBytes, - time, - config.brokerId) - val topicPartition = Utils.getTopicPartition(dir.getName) - logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]()) - val parts = logs.get(topicPartition._1) - parts.put(topicPartition._2, log) + + /* initialize a log for each subdirectory of the main log directories */ + for(logDir <- logDirs) { + if(!logDir.exists) { + info("Log directory '" + logDir.getAbsolutePath + "' not found, creating it.") + logDir.mkdirs() + } + + if(!logDir.isDirectory || !logDir.canRead) + throw new KafkaException(logDir.getAbsolutePath + " is not a readable log directory.") + + /* check if this set of logs was shut down cleanly */ + val cleanShutDownFile = new File(logDir, CleanShutdownFile) + val needsRecovery = cleanShutDownFile.exists + cleanShutDownFile.delete + + /* load the logs */ + val subDirs = logDir.listFiles() + if(subDirs != null) { + for(dir <- subDirs) { + if(dir.isDirectory){ + info("Loading log '" + dir.getName + "'") + val topic = Utils.getTopicPartition(dir.getName)._1 + val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) + val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) + val log = new Log(dir, + maxLogFileSize, + config.maxMessageSize, + logFlushInterval, + rollIntervalMs, + needsRecovery, + config.logIndexMaxSizeBytes, + config.logIndexIntervalBytes, + time, + config.brokerId) + val topicPartition = Utils.getTopicPartition(dir.getName) + logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]()) + val parts = logs.get(topicPartition._1) + parts.put(topicPartition._2, log) + } } } } @@ -107,13 +122,31 @@ private[kafka] class LogManager(val config: KafkaConfig, */ private def createLog(topic: String, partition: Int): Log = { logCreationLock synchronized { - val d = new File(logDir, topic + "-" + partition) - d.mkdirs() + val dir = new File(nextLogDir(), topic + "-" + partition) + dir.mkdirs() val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) - new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needsRecovery = false, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, time, config.brokerId) + new Log(dir, + maxLogFileSize, + config.maxMessageSize, + logFlushInterval, + rollIntervalMs, + needsRecovery = false, + config.logIndexMaxSizeBytes, + config.logIndexIntervalBytes, + time, + config.brokerId) } } + + /** + * Choose the next directory in which to create a log and increment the index + */ + private def nextLogDir(): File = { + val logDir = logDirs(nextLogCreationIndex) + nextLogCreationIndex = (nextLogCreationIndex + 1) % logDirs.length + logDir + } def getOffsets(topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val log = getLog(topicAndPartition.topic, topicAndPartition.partition) @@ -165,7 +198,9 @@ private[kafka] class LogManager(val config: KafkaConfig, log } - /* Runs through the log removing segments older than a certain age */ + /** + * Runs through the log removing segments older than a certain age + */ private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds val topic = Utils.getTopicPartition(log.name)._1 @@ -217,7 +252,11 @@ private[kafka] class LogManager(val config: KafkaConfig, */ def shutdown() { debug("Shutting down.") + // close the logs allLogs.foreach(_.close()) + // mark that the shutdown was clean by creating the clean shutdown marker file + for(logDir <- logDirs) + Utils.swallow(new File(logDir, CleanShutdownFile).createNewFile()) debug("Shutdown complete.") } @@ -254,7 +293,6 @@ private[kafka] class LogManager(val config: KafkaConfig, } } - def topics(): Iterable[String] = logs.keys } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4252c89..fb29e37 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -74,7 +74,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue)) /* the directory in which the log data is kept */ - val logDir = props.getString("log.dir") + val logDirs = Utils.getCSVList(props.getString("log.directories", props.getString("log.dir", ""))) + require(logDirs.size > 0) /* the maximum size of a single log file */ val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index cc83198..cfca402 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -32,7 +32,6 @@ import kafka.controller.KafkaController */ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging { this.logIdent = "[Kafka Server " + config.brokerId + "], " - val CleanShutdownFile = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) var socketServer: SocketServer = null @@ -53,12 +52,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg info("starting") isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) - var needRecovery = true - val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile) - if (cleanShutDownFile.exists) { - needRecovery = false - cleanShutDownFile.delete - } /* start scheduler */ kafkaScheduler.startup @@ -69,8 +62,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg time, 1000L * 60 * 60 * config.logRollHours, 1000L * 60 * config.logCleanupIntervalMinutes, - 1000L * 60 * 60 * config.logRetentionHours, - needRecovery) + 1000L * 60 * 60 * config.logRetentionHours) logManager.startup() socketServer = new SocketServer(config.brokerId, @@ -128,9 +120,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if(kafkaController != null) kafkaController.shutdown() - val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile) - debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath()) - cleanShutDownFile.createNewFile shutdownLatch.countDown() info("shut down completed") } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 515ba5a..a78f696 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -41,7 +41,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient this.logIdent = "Replica Manager on Broker " + config.brokerId + ": " private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) - val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) + val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDirs(0)) info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) newGauge( diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 27c73d2..28b87f9 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -300,22 +300,28 @@ object Utils extends Logging { def rm(file: String): Unit = rm(new File(file)) /** + * Recursively delete the list of files/directories and any subfiles (if any exist) + * @param a sequence of files to be deleted + */ + def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f))) + + /** * Recursively delete the given file/directory and any subfiles (if any exist) * @param file The root file at which to begin deleting */ - def rm(file: File): Unit = { - if(file == null) { - return - } else if(file.isDirectory) { - val files = file.listFiles() - if(files != null) { - for(f <- files) - rm(f) - } - file.delete() - } else { - file.delete() - } + def rm(file: File) { + if(file == null) { + return + } else if(file.isDirectory) { + val files = file.listFiles() + if(files != null) { + for(f <- files) + rm(f) + } + file.delete() + } else { + file.delete() + } } /** diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index b4ee2d9..194dd70 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -40,7 +40,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDir)) + servers.map(server => server.config.logDirs.map(Utils.rm(_))) super.tearDown } } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index cf304b5..a039c6c 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -46,9 +46,9 @@ class LogManagerTest extends JUnit3Suite { override val flushInterval = 100 } scheduler.startup - logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) + logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge) logManager.startup - logDir = logManager.logDir + logDir = logManager.logDirs(0) } override def tearDown() { @@ -62,7 +62,7 @@ class LogManagerTest extends JUnit3Suite { @Test def testCreateLog() { val log = logManager.getOrCreateLog(name, 0) - val logFile = new File(config.logDir, name + "-0") + val logFile = new File(config.logDirs(0), name + "-0") assertTrue(logFile.exists) log.append(TestUtils.singleMessageSet("test".getBytes())) } @@ -70,7 +70,7 @@ class LogManagerTest extends JUnit3Suite { @Test def testGetLog() { val log = logManager.getLog(name, 0) - val logFile = new File(config.logDir, name + "-0") + val logFile = new File(config.logDirs(0), name + "-0") assertTrue(!logFile.exists) } @@ -118,7 +118,7 @@ class LogManagerTest extends JUnit3Suite { override val logRetentionHours = retentionHours override val flushInterval = 100 } - logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, retentionMs, false) + logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, retentionMs) logManager.startup // create a log @@ -161,7 +161,7 @@ class LogManagerTest extends JUnit3Suite { override val flushInterval = Int.MaxValue override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100") } - logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) + logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge) logManager.startup val log = logManager.getOrCreateLog(name, 0) for(i <- 0 until 200) { diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 6f27ffc..b444dd8 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -79,8 +79,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ server1.awaitShutdown() server2.shutdown server2.awaitShutdown() - Utils.rm(server1.config.logDir) - Utils.rm(server2.config.logDir) + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 8239b64..3eae29e 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -45,7 +45,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { override def tearDown() { servers.map(server => server.shutdown()) - servers.map(server => Utils.rm(server.config.logDir)) + servers.map(server => Utils.rm(server.config.logDirs)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index fcbe4e1..e82c6cd 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -45,8 +45,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val message = new Message("hello".getBytes()) var producer: Producer[Int, Message] = null - var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir) - var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir) + var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0)) + var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] def testHWCheckpointNoFailuresSingleLogSegment { @@ -83,7 +83,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(numMessages, leaderHW) val followerHW = hwFile2.read(topic, 0) assertEquals(numMessages, followerHW) - servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)}) + servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))}) } def testHWCheckpointWithFailuresSingleLogSegment { @@ -148,7 +148,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read(topic, 0)) assertEquals(hw, hwFile2.read(topic, 0)) - servers.foreach(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointNoFailuresMultipleLogSegments { @@ -165,8 +165,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2 = TestUtils.createServer(configs.last) servers ++= List(server1, server2) - hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir) - hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir) + hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0)) + hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0)) val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) producerProps.put("producer.request.timeout.ms", "1000") @@ -193,7 +193,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(hw, leaderHW) val followerHW = hwFile2.read(topic, 0) assertEquals(hw, followerHW) - servers.foreach(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDirs)) } def testHWCheckpointWithFailuresMultipleLogSegments { @@ -210,8 +210,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2 = TestUtils.createServer(configs.last) servers ++= List(server1, server2) - hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir) - hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir) + hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0)) + hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0)) val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) producerProps.put("producer.request.timeout.ms", "1000") @@ -263,7 +263,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() assertEquals(hw, hwFile1.read(topic, 0)) assertEquals(hw, hwFile2.read(topic, 0)) - servers.foreach(server => Utils.rm(server.config.logDir)) + servers.foreach(server => Utils.rm(server.config.logDirs)) } private def sendMessages(n: Int = 1) { diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index d738010..fae0719 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -56,8 +56,10 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { // do a clean shutdown server.shutdown() - val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile) - assertTrue(cleanShutDownFile.exists) + for(logDir <- config.logDirs) { + val cleanShutDownFile = new File(logDir, server.logManager.CleanShutdownFile) + assertTrue(cleanShutDownFile.exists) + } producer.close() } @@ -93,7 +95,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator) server.shutdown() - Utils.rm(server.config.logDir) + Utils.rm(server.config.logDirs) producer.close() }