diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 6d6b4ab..c08eab0 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -90,7 +90,7 @@ class Partition(val topic: String, if (isReplicaLocal(replicaId)) { val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) - val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) + val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId)) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 8ed9b68..ac67f08 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -52,7 +52,7 @@ class LogManager(val logDirs: Array[File], private val logs = new Pool[TopicAndPartition, Log]() createAndValidateLogDirs(logDirs) - private var dirLocks = lockLogDirs(logDirs) + private val dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap loadLogs(logDirs) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0f5aaa9..5588f59 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -462,7 +462,7 @@ class ReplicaManager(val config: KafkaConfig, */ def checkpointHighWatermarks() { val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) + val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap try { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b4bee33..be1a1ee 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -201,6 +201,7 @@ class LogManagerTest extends JUnit3Suite { /** * Test that it is not possible to open two log managers using the same data directory */ + @Test def testTwoLogManagersUsingSameDirFails() { try { new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) @@ -209,24 +210,75 @@ class LogManagerTest extends JUnit3Suite { case e: KafkaException => // this is good } } - + /** * Test that recovery points are correctly written out to disk */ + @Test def testCheckpointRecoveryPoints() { - val topicA = TopicAndPartition("test-a", 1) - val topicB = TopicAndPartition("test-b", 1) - val logA = this.logManager.createLog(topicA, logConfig) - val logB = this.logManager.createLog(topicB, logConfig) - for(i <- 0 until 50) - logA.append(TestUtils.singleMessageSet("test".getBytes())) - for(i <- 0 until 100) - logB.append(TestUtils.singleMessageSet("test".getBytes())) - logA.flush() - logB.flush() + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1), TopicAndPartition("test-b", 1)), logManager) + } + + /** + * Test that recovery points directory checking works with trailing slash + */ + @Test + def testRecoveryDirectoryMappingWithTrailingSlash() { + logManager.shutdown() + logDir = TestUtils.tempDir() + logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) + } + + /** + * Test that recovery points directory checking works with relative directory + */ + @Test + def testRecoveryDirectoryMappingWithRelativeDirectory() { + logManager.shutdown() + logDir = new File("data" + File.separator + logDir.getName) + logDir.mkdirs() + logDir.deleteOnExit() + logManager = new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) + } + + + private def verifyCheckpointRecovery(topicAndPartitions: Seq[TopicAndPartition], + logManager: LogManager) { + val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig)) + logs.foreach(log => { + for(i <- 0 until 50) + log.append(TestUtils.singleMessageSet("test".getBytes())) + + log.flush() + }) + logManager.checkpointRecoveryPointOffsets() val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() - assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint) - assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint) + + topicAndPartitions.zip(logs).foreach { + case(tp, log) => { + assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint) + } + } } } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index bbfb01e..4ea0489 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -6,32 +6,35 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.log4j -import java.util.Properties -import java.io.File import kafka.consumer.SimpleConsumer import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{TestUtils, Utils, Logging} -import junit.framework.Assert._ import kafka.api.FetchRequestBuilder import kafka.producer.async.MissingConfigException import kafka.serializer.Encoder import kafka.zk.ZooKeeperTestHarness + +import java.util.Properties +import java.io.File + import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.{PropertyConfigurator, Logger} import org.junit.{After, Before, Test} import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ + class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { var logDirZk: File = null @@ -56,7 +59,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) server = TestUtils.createServer(config) - simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") + simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 1024, "") } @After @@ -73,15 +76,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } @@ -89,15 +92,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a71e48d..6b615de 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,29 +17,61 @@ package kafka.server -import org.scalatest.junit.JUnit3Suite -import org.junit.Test import kafka.utils.{MockScheduler, MockTime, TestUtils} +import kafka.log.{CleanerConfig, LogManager, LogConfig} + import java.util.concurrent.atomic.AtomicBoolean import java.io.File + import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient -import kafka.cluster.Replica -import kafka.log.{LogManager, LogConfig, Log} +import org.scalatest.junit.JUnit3Suite +import org.junit.Test class ReplicaManagerTest extends JUnit3Suite { + + val topic = "test-topic" + @Test def testHighWaterMarkDirectoryMapping() { val props = TestUtils.createBrokerConfig(1) - val topic = "test-topic" val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val mockLogMgr = EasyMock.createMock(classOf[LogManager]) + val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val partition = rm.getOrCreatePartition(topic, 1, 1) - val logFilename = config.logDirs.head + File.separator + topic + "-1" - partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File(logFilename), new LogConfig(), 0L, null)))) + partition.getOrCreateReplica(1) + //val logFilename = config.logDirs.head + File.separator + topic + "-1" + //partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File(logFilename), new LogConfig(), 0L, null)))) rm.checkpointHighWatermarks() } + + @Test + def testHighwaterMarkRelativeDirectoryMapping() { + val props = TestUtils.createBrokerConfig(1) + props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) + val config = new KafkaConfig(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray) + val time: MockTime = new MockTime() + val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) + val partition = rm.getOrCreatePartition(topic, 1, 1) + partition.getOrCreateReplica(1) + rm.checkpointHighWatermarks() + } + + private def createLogManager(logDirs: Array[File]): LogManager = { + val time = new MockTime() + return new LogManager(logDirs, + topicConfigs = Map(), + defaultConfig = new LogConfig(), + cleanerConfig = CleanerConfig(enableCleaner = false), + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + } + } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 53d01aa..a1ad627 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -89,6 +89,16 @@ object TestUtils extends Logging { } /** + * Create a temporary relative directory + */ + def tempRelativeDir(parent: String): File = { + val f = new File(parent, "kafka-" + random.nextInt(1000000)) + f.mkdirs() + f.deleteOnExit() + f + } + + /** * Create a temporary file */ def tempFile(): File = {