diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f16fbe6..255be06 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -53,7 +53,7 @@ class ReplicaManager(val config: KafkaConfig, private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) - val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap + val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap private var hwThreadInitialized = false this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala deleted file mode 100644 index ab95ce1..0000000 --- a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import scala.annotation.StaticAnnotation - -/* Some helpful annotations */ - -/** - * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation - * must respect - */ -class threadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is not threadsafe - */ -class nonthreadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is immutable - */ -class immutable extends StaticAnnotation diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 02c188a..a78f7cf 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -143,7 +143,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { } def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { - replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read.getOrElse(TopicAndPartition(topic, partition), 0L) + replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L) } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala new file mode 100644 index 0000000..b5936d4 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import org.junit.Test +import kafka.utils.{MockScheduler, MockTime, TestUtils} +import java.util.concurrent.atomic.AtomicBoolean +import java.io.File +import org.easymock.EasyMock +import org.I0Itec.zkclient.ZkClient +import kafka.cluster.Replica +import kafka.log.{LogManager, LogConfig, Log} + +class ReplicaManagerTest extends JUnit3Suite { + @Test + def testHighwaterMarkDirectoryMapping() { + val props = TestUtils.createBrokerConfig(1) + val dir = "/tmp/kafka-logs/" + new File(dir).mkdir() + props.setProperty("log.dirs", dir) + val config = new KafkaConfig(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val mockLogMgr = EasyMock.createMock(classOf[LogManager]) + val time: MockTime = new MockTime() + val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false)) + val partition = rm.getOrCreatePartition("test-topic", 1, 1) + partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null)))) + rm.checkpointHighWatermarks() + } +}