diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 0b88f14..7bddf18 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -90,7 +90,7 @@ class Partition(val topic: String, if (isReplicaLocal(replicaId)) { val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) - val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) + val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId)) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 255be06..963e707 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -462,7 +462,7 @@ class ReplicaManager(val config: KafkaConfig, */ def checkpointHighWatermarks() { val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) + val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) for((dir, reps) <- replicasByDir) { val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap try { diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala deleted file mode 100644 index ab95ce1..0000000 --- a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import scala.annotation.StaticAnnotation - -/* Some helpful annotations */ - -/** - * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation - * must respect - */ -class threadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is not threadsafe - */ -class nonthreadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is immutable - */ -class immutable extends StaticAnnotation diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index b5936d4..77eadce 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -24,23 +24,54 @@ import java.util.concurrent.atomic.AtomicBoolean import java.io.File import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient -import kafka.cluster.Replica -import kafka.log.{LogManager, LogConfig, Log} +import kafka.log.{CleanerConfig, LogManager, LogConfig} class ReplicaManagerTest extends JUnit3Suite { @Test def testHighwaterMarkDirectoryMapping() { val props = TestUtils.createBrokerConfig(1) - val dir = "/tmp/kafka-logs/" - new File(dir).mkdir() - props.setProperty("log.dirs", dir) + val dir = new File("/tmp/kafka-logs/") + dir.mkdir() + props.setProperty("log.dirs", "/tmp/kafka-logs/") val config = new KafkaConfig(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val mockLogMgr = EasyMock.createMock(classOf[LogManager]) + val mockLogMgr = createMockLogManager(dir) val time: MockTime = new MockTime() val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) val partition = rm.getOrCreatePartition("test-topic", 1, 1) - partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null)))) + partition.getOrCreateReplica(1) rm.checkpointHighWatermarks() + + } + + @Test + def testHighwaterMarkRelativeDirectoryMapping() { + val props = TestUtils.createBrokerConfig(1) + val dir = new File("data/kafka-logs/") + dir.mkdir() + props.setProperty("log.dirs", "data/kafka-logs") + val config = new KafkaConfig(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val mockLogMgr = createMockLogManager(dir) + + val time: MockTime = new MockTime() + val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) + val partition = rm.getOrCreatePartition("test-topic", 1, 1) + partition.getOrCreateReplica(1) + rm.checkpointHighWatermarks() + dir.delete() + } + + def createMockLogManager(logDir: File): LogManager = { + val time = new MockTime() + return new LogManager(logDirs = Array(logDir), + topicConfigs = Map(), + defaultConfig = new LogConfig(), + cleanerConfig = CleanerConfig(enableCleaner = false), + flushCheckMs = 1000L, + flushCheckpointMs = 100000L, + retentionCheckMs = 1000L, + scheduler = time.scheduler, + time = time) } } diff --git a/gradle.properties b/gradle.properties index 4827769..236e243 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ group=org.apache.kafka version=0.8.1 -scalaVersion=2.8.0 +scalaVersion=2.9.2 task=build mavenUrl= diff --git a/scala.gradle b/scala.gradle index ebd21b8..c0bf77b 100644 --- a/scala.gradle +++ b/scala.gradle @@ -1,5 +1,5 @@ if (!hasProperty('scalaVersion')) { - ext.scalaVersion = '2.8.0' + ext.scalaVersion = '2.9.2' } -ext.defaultScalaVersion = '2.8.0' +ext.defaultScalaVersion = '2.9.2' ext.baseScalaVersion = (scalaVersion.startsWith('2.10')) ? '2.10' : scalaVersion