diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 0b88f14..0599d02 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -29,7 +29,7 @@ import kafka.controller.KafkaController import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping} -import java.io.IOException +import java.io.{File, IOException} import scala.Some import kafka.common.TopicAndPartition @@ -90,7 +90,7 @@ class Partition(val topic: String, if (isReplicaLocal(replicaId)) { val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) - val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) + val checkpoint = replicaManager.highWatermarkCheckpoints(new File(log.dir.getParent)) val offsetMap = checkpoint.read if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId)) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f16fbe6..015fcd2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -53,7 +53,7 @@ class ReplicaManager(val config: KafkaConfig, private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) - val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap + val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir), new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap private var hwThreadInitialized = false this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) @@ -466,7 +466,7 @@ class ReplicaManager(val config: KafkaConfig, for((dir, reps) <- replicasByDir) { val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap try { - highWatermarkCheckpoints(dir).write(hwms) + highWatermarkCheckpoints(new File(dir)).write(hwms) } catch { case e: IOException => fatal("Error writing to highwatermark file: ", e) diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala deleted file mode 100644 index ab95ce1..0000000 --- a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import scala.annotation.StaticAnnotation - -/* Some helpful annotations */ - -/** - * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation - * must respect - */ -class threadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is not threadsafe - */ -class nonthreadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is immutable - */ -class immutable extends StaticAnnotation diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 02c188a..7811e26 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -143,7 +143,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { } def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { - replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read.getOrElse(TopicAndPartition(topic, partition), 0L) + replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0))).read.getOrElse(TopicAndPartition(topic, partition), 0L) } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala new file mode 100644 index 0000000..d70fbd6 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import org.junit.Test +import kafka.utils.{MockTime, TestUtils} +import java.util.concurrent.atomic.AtomicBoolean +import java.io.File +import org.easymock.EasyMock +import org.I0Itec.zkclient.ZkClient +import kafka.cluster.Replica +import kafka.log.{LogConfig, Log} + +class ReplicaManagerTest extends JUnit3Suite { + @Test + def testHighwaterMarkDirectoryMapping() { + val props = TestUtils.createBrokerConfig(1) + val dir = "/tmp/kafka-logs/" + new File(dir).mkdir() + props.setProperty("log.dirs", dir) + val config = new KafkaConfig(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val rm = new ReplicaManager(config, new MockTime(), zkClient, null, null, new AtomicBoolean(false)) + val partition = rm.getOrCreatePartition("test-topic", 1, 1) + partition.addReplicaIfNotExists(new Replica(1, partition, new MockTime(), 0L, Option(new Log(new File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null)))) + rm.checkpointHighWatermarks() + } +}