diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 810952e..8b9487c 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -88,7 +88,7 @@ class Partition(val topic: String, if (isReplicaLocal(replicaId)) { val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) - val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) + val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId)) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index bcd2bb7..7cee543 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -52,7 +52,7 @@ class LogManager(val logDirs: Array[File], private val logs = new Pool[TopicAndPartition, Log]() createAndValidateLogDirs(logDirs) - private var dirLocks = lockLogDirs(logDirs) + private val dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap loadLogs(logDirs) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7df56ce..ad4ffe0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -440,7 +440,7 @@ class ReplicaManager(val config: KafkaConfig, */ def checkpointHighWatermarks() { val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) + val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap try { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index b4bee33..be1a1ee 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -201,6 +201,7 @@ class LogManagerTest extends JUnit3Suite { /** * Test that it is not possible to open two log managers using the same data directory */ + @Test def testTwoLogManagersUsingSameDirFails() { try { new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time) @@ -209,24 +210,75 @@ class LogManagerTest extends JUnit3Suite { case e: KafkaException => // this is good } } - + /** * Test that recovery points are correctly written out to disk */ + @Test def testCheckpointRecoveryPoints() { - val topicA = TopicAndPartition("test-a", 1) - val topicB = TopicAndPartition("test-b", 1) - val logA = this.logManager.createLog(topicA, logConfig) - val logB = this.logManager.createLog(topicB, logConfig) - for(i <- 0 until 50) - logA.append(TestUtils.singleMessageSet("test".getBytes())) - for(i <- 0 until 100) - logB.append(TestUtils.singleMessageSet("test".getBytes())) - logA.flush() - logB.flush() + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1), TopicAndPartition("test-b", 1)), logManager) + } + + /** + * Test that recovery points directory checking works with trailing slash + */ + @Test + def testRecoveryDirectoryMappingWithTrailingSlash() { + logManager.shutdown() + logDir = TestUtils.tempDir() + logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) + } + + /** + * Test that recovery points directory checking works with relative directory + */ + @Test + def testRecoveryDirectoryMappingWithRelativeDirectory() { + logManager.shutdown() + logDir = new File("data" + File.separator + logDir.getName) + logDir.mkdirs() + logDir.deleteOnExit() + logManager = new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = logConfig, + cleanerConfig = cleanerConfig, + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) + logManager.startup + verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager) + } + + + private def verifyCheckpointRecovery(topicAndPartitions: Seq[TopicAndPartition], + logManager: LogManager) { + val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig)) + logs.foreach(log => { + for(i <- 0 until 50) + log.append(TestUtils.singleMessageSet("test".getBytes())) + + log.flush() + }) + logManager.checkpointRecoveryPointOffsets() val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read() - assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint) - assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint) + + topicAndPartitions.zip(logs).foreach { + case(tp, log) => { + assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint) + } + } } } diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 67497dd..4dcd41a 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -6,32 +6,35 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.log4j -import java.util.Properties -import java.io.File import kafka.consumer.SimpleConsumer import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{TestUtils, Utils, Logging} -import junit.framework.Assert._ import kafka.api.FetchRequestBuilder import kafka.producer.async.MissingConfigException import kafka.serializer.Encoder import kafka.zk.ZooKeeperTestHarness + +import java.util.Properties +import java.io.File + import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.{PropertyConfigurator, Logger} import org.junit.{After, Before, Test} import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ + class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { var logDirZk: File = null @@ -72,8 +75,8 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with var props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -82,15 +85,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -99,15 +102,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -116,15 +119,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with try { PropertyConfigurator.configure(props) fail("Missing properties exception was expected !") - }catch { + } catch { case e: MissingConfigException => } props = new Properties() props.put("log4j.rootLogger", "INFO") props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender") - props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout") - props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n") + props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout") + props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n") props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO, KAFKA") @@ -132,7 +135,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with // serializer missing try { PropertyConfigurator.configure(props) - }catch { + } catch { case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder") } }