diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 0b88f14..7bddf18 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -90,7 +90,7 @@ class Partition(val topic: String, if (isReplicaLocal(replicaId)) { val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) - val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) + val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId)) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 255be06..963e707 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -462,7 +462,7 @@ class ReplicaManager(val config: KafkaConfig, */ def checkpointHighWatermarks() { val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) + val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap try { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b4bee33..d95f8f6 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -201,6 +201,7 @@ class LogManagerTest extends JUnit3Suite { /** * Test that it is not possible to open two log managers using the same data directory */ + @Test def testTwoLogManagersUsingSameDirFails() { try { new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) @@ -213,6 +214,7 @@ class LogManagerTest extends JUnit3Suite { /** * Test that recovery points are correctly written out to disk */ + @Test def testCheckpointRecoveryPoints() { val topicA = TopicAndPartition("test-a", 1) val topicB = TopicAndPartition("test-b", 1) @@ -229,4 +231,60 @@ class LogManagerTest extends JUnit3Suite { assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint) assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint) } + + /** + * Test that recovery points directory checking works with trailing slash + */ + @Test + def testRecoveryDirectoryMappingWithTrailingSlash() { + logManager.shutdown() + logDir = TestUtils.tempDir() + logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + val topicA = TopicAndPartition("test-a", 1) + val logA = this.logManager.createLog(topicA, logConfig) + for(i <- 0 until 50) + logA.append(TestUtils.singleMessageSet("test".getBytes())) + logA.flush() + logManager.checkpointRecoveryPointOffsets() + val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() + assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint) + } + + /** + * Test that recovery points directory checking works with trailing slash + */ + @Test + def testRecoveryDirectoryMappingWithRelativeDirectory() { + logManager.shutdown() + logDir = new File("data" + File.separator + logDir.getName) + logDir.mkdirs() + logDir.deleteOnExit() + logManager = new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + val topicA = TopicAndPartition("test-a", 1) + val logA = this.logManager.createLog(topicA, logConfig) + for(i <- 0 until 50) + logA.append(TestUtils.singleMessageSet("test".getBytes())) + logA.flush() + logManager.checkpointRecoveryPointOffsets() + val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() + assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint) + } } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 67497dd..2639309 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -6,14 +6,14 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.log4j @@ -56,7 +56,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) server = TestUtils.createServer(config); - simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") + simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 1024, "") } @After @@ -72,8 +72,8 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -82,15 +82,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -99,15 +99,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -116,15 +116,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -132,7 +132,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with // serializer missing try { PropertyConfigurator.configure(props) - }catch { + } catch { case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder") } } @@ -141,26 +141,26 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with def testLog4jAppends() { PropertyConfigurator.configure(getLog4jConfig) - for(i <- 1 to 5) + for (i <- 1 to 5) info("test") - val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) - val fetchMessage = response.messageSet("test-topic", 0) - - var count = 0 - for(message <- fetchMessage) { - count = count + 1 - } - - assertEquals(5, count) + assertTrue(TestUtils.waitUntilTrue(() => { + var count = 0 + val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024 * 1024).build()) + val fetchMessage = response.messageSet("test-topic", 0) + for (message <- fetchMessage) { + count = count + 1 + } + count == 5 + }, 2000)); } private def getLog4jConfig: Properties = { var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b5936d4..02109bf 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -24,23 +24,52 @@ import java.util.concurrent.atomic.AtomicBoolean import java.io.File import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient -import kafka.cluster.Replica -import kafka.log.{LogManager, LogConfig, Log} +import kafka.log.{CleanerConfig, LogManager, LogConfig} class ReplicaManagerTest extends JUnit3Suite { @Test def testHighwaterMarkDirectoryMapping() { val props = TestUtils.createBrokerConfig(1) - val dir = "/tmp/kafka-logs/" - new File(dir).mkdir() - props.setProperty("log.dirs", dir) + val dir = TestUtils.tempDir() + props.setProperty("log.dirs", dir + File.separator) val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val mockLogMgr = EasyMock.createMock(classOf[LogManager]) + val mockLogMgr = createLogManager(dir) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val partition = rm.getOrCreatePartition("test-topic", 1, 1) - partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null)))) + partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() + + } + + @Test + def testHighwaterMarkRelativeDirectoryMapping() { + val props = TestUtils.createBrokerConfig(1) + val dir = new File("data/kafka-logs/") + dir.mkdir() + dir.deleteOnExit() + props.setProperty("log.dirs", "data/kafka-logs") + val config = new KafkaConfig(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val mockLogMgr = createLogManager(dir) + val time: MockTime = new MockTime() + val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) + val partition = rm.getOrCreatePartition("test-topic", 1, 1) + partition.getOrCreateReplica(1) + rm.checkpointHighWatermarks() + } + + private def createLogManager(logDir: File): LogManager = { + val time = new MockTime() + return new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = new LogConfig(), + cleanerConfig = CleanerConfig(enableCleaner = false), + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) } }