Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1362686) +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) @@ -32,7 +32,7 @@ import org.scalatest.junit.JUnit3Suite import scala.collection._ import kafka.admin.CreateTopicCommand -import kafka.common.{InvalidPartitionException, NotLeaderForPartitionException, FetchRequestFormatException, OffsetOutOfRangeException} +import kafka.common.{InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException} /** * End to end tests of the primitive apis against a local server Index: core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (revision 1362686) +++ core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (working copy) @@ -40,7 +40,6 @@ // create leader replica val log = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log.logEndOffset).andReturn(5L).times(12) - EasyMock.expect(log.setHW(5L)).times(1) EasyMock.replay(log) // add one partition @@ -155,7 +154,7 @@ private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, localLog: Log, leaderHW: Long): Partition = { val partition = new Partition(topic, partitionId, time) - val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog)) + val leaderReplica = new Replica(leaderId, partition, topic, Some(leaderHW), Some(localLog)) val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica partition.assignedReplicas(Some(allReplicas.toSet)) @@ -170,7 +169,6 @@ private def getLogWithHW(hw: Long): Log = { val log1 = EasyMock.createMock(classOf[kafka.log.Log]) EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6) - EasyMock.expect(log1.setHW(hw)).times(1) EasyMock.replay(log1) log1 Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (revision 1362686) +++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (working copy) @@ -20,7 +20,6 @@ import kafka.consumer.SimpleConsumer import java.util.Properties import org.junit.Test -import org.scalatest.junit.JUnitSuite import junit.framework.Assert._ import kafka.message.{Message, ByteBufferMessageSet} import org.scalatest.junit.JUnit3Suite Index: core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (revision 0) +++ core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (revision 0) @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.server + +import kafka.log.Log +import org.I0Itec.zkclient.ZkClient +import org.scalatest.junit.JUnit3Suite +import kafka.utils.{TestUtils, MockTime} +import org.easymock.EasyMock +import org.junit.Assert._ + +class HighwatermarkPersistenceTest extends JUnit3Suite { + + val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { + override val defaultFlushIntervalMs = 100 + }) + val topic = "foo" + + def testHighWatermarkPersistenceSinglePartition() { + // mock zkclient + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + // create replica manager + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient) + replicaManager.startup() + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) + assertEquals(0L, fooPartition0Hw) + val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet) + // create leader log + val log0 = getMockLog + // create leader and follower replicas + val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet) + val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0) + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) + assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw) + try { + followerReplicaPartition0.highWatermark() + fail("Should fail with IllegalStateException") + }catch { + case e: IllegalStateException => // this is ok + } + // set the leader + partition0.leaderId(Some(leaderReplicaPartition0.brokerId)) + // set the highwatermark for local replica + partition0.leaderHW(Some(5L)) + // sleep until flush interval + Thread.sleep(configs.head.defaultFlushIntervalMs) + fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0) + assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw) + EasyMock.verify(zkClient) + EasyMock.verify(log0) + } + + def testHighWatermarkPersistenceMultiplePartitions() { + val topic1 = "foo1" + val topic2 = "foo2" + // mock zkclient + val zkClient = EasyMock.createMock(classOf[ZkClient]) + EasyMock.replay(zkClient) + // create replica manager + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient) + replicaManager.startup() + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + assertEquals(0L, topic1Partition0Hw) + val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet) + // create leader log + val topic1Log0 = getMockLog + // create leader and follower replicas + val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet) + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw) + // set the leader + topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId)) + // set the highwatermark for local replica + topic1Partition0.leaderHW(Some(5L)) + // sleep until flush interval + Thread.sleep(configs.head.defaultFlushIntervalMs) + topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark()) + assertEquals(5L, topic1Partition0Hw) + // add another partition and set highwatermark + val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, configs.map(_.brokerId).toSet) + // create leader log + val topic2Log0 = getMockLog + // create leader and follower replicas + val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet) + // sleep until flush ms + Thread.sleep(configs.head.defaultFlushIntervalMs) + var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0) + assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw) + // set the leader + topic2Partition0.leaderId(Some(leaderReplicaTopic2Partition0.brokerId)) + // set the highwatermark for local replica + topic2Partition0.leaderHW(Some(15L)) + assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark()) + // change the highwatermark for topic1 + topic1Partition0.leaderHW(Some(10L)) + assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark()) + // sleep until flush interval + Thread.sleep(configs.head.defaultFlushIntervalMs) + // verify checkpointed hw for topic 2 + topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0) + assertEquals(15L, topic2Partition0Hw) + // verify checkpointed hw for topic 1 + topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0) + assertEquals(10L, topic1Partition0Hw) + EasyMock.verify(zkClient) + EasyMock.verify(topic1Log0) + EasyMock.verify(topic2Log0) + } + + private def getMockLog: Log = { + val log = EasyMock.createMock(classOf[kafka.log.Log]) + EasyMock.replay(log) + log + } +} \ No newline at end of file Index: core/src/main/scala/kafka/cluster/Replica.scala =================================================================== --- core/src/main/scala/kafka/cluster/Replica.scala (revision 1362686) +++ core/src/main/scala/kafka/cluster/Replica.scala (working copy) @@ -24,6 +24,7 @@ class Replica(val brokerId: Int, val partition: Partition, val topic: String, + var hw: Option[Long] = None, var log: Option[Log] = None, var leoUpdateTime: Long = -1L) extends Logging { private var logEndOffset: Long = -1L @@ -69,7 +70,7 @@ case true => trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId, brokerId, highwaterMark)) - log.get.setHW(highwaterMark) + hw = Some(highwaterMark) highwaterMark case false => throw new IllegalStateException("Unable to set highwatermark for topic %s ".format(topic) + "partition %d on broker %d, since there is no local log for this partition" @@ -78,7 +79,11 @@ case None => isLocal match { case true => - log.get.highwaterMark + hw match { + case Some(highWatermarkValue) => highWatermarkValue + case None => throw new IllegalStateException("HighWatermark does not exist for topic %s ".format(topic) + + " partition %d on broker %d but local log exists".format(partition.partitionId, brokerId)) + } case false => throw new IllegalStateException("Unable to get highwatermark for topic %s ".format(topic) + "partition %d on broker %d, since there is no local log for this partition" .format(partition.partitionId, brokerId)) Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1362686) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -18,7 +18,7 @@ package kafka.log import kafka.api.OffsetRequest -import java.io.{IOException, RandomAccessFile, File} +import java.io.{IOException, File} import java.util.{Comparator, Collections, ArrayList} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger} import kafka.utils._ @@ -120,12 +120,6 @@ /* The actual segments of the log */ private[log] val segments: SegmentList[LogSegment] = loadSegments() - - /* create the leader highwatermark file handle */ - private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw") - - private var hw: Long = 0 - private val logStats = new LogStats(this) Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName) @@ -206,8 +200,6 @@ info("Closing log segment " + seg.file.getAbsolutePath) seg.messageSet.close() } - checkpointHW() - hwFile.close() } } @@ -361,7 +353,6 @@ segments.view.last.messageSet.flush() unflushed.set(0) lastflushedTime.set(System.currentTimeMillis) - checkpointHW() } } @@ -433,11 +424,7 @@ total } - def recoverUptoLastCheckpointedHW() { - if(hwFile.length() > 0) { - // read the last checkpointed hw from disk - hwFile.seek(0) - val lastKnownHW = hwFile.readLong() + def recoverUptoLastCheckpointedHighWatermark(lastKnownHW: Long) { // find the log segment that has this hw val segmentToBeTruncated = segments.view.find(segment => lastKnownHW >= segment.start && lastKnownHW < segment.messageSet.getEndOffset()) @@ -452,7 +439,7 @@ segmentToBeTruncated match { case Some(segment) => segment.messageSet.truncateUpto(lastKnownHW) - info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw)) + info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, lastKnownHW)) case None => assert(lastKnownHW <= segments.view.last.messageSet.size, "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s". @@ -467,20 +454,8 @@ if(numSegmentsDeleted != segmentsToBeDeleted.size) error("Failed to delete some segments during log recovery") } - }else - info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name) } - def setHW(latestLeaderHW: Long) { - hw = latestLeaderHW - } - - def checkpointHW() { - hwFile.seek(0) - hwFile.writeLong(hw) - hwFile.getChannel.force(true) - } - def topicName():String = { name.substring(0, name.lastIndexOf("-")) } Index: core/src/main/scala/kafka/producer/ProducerPool.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerPool.scala (revision 1362686) +++ core/src/main/scala/kafka/producer/ProducerPool.scala (working copy) @@ -20,7 +20,7 @@ import kafka.cluster.Broker import java.util.Properties import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZkUtils, Utils, Logging} +import kafka.utils.{ZkUtils, Logging} import collection.mutable.HashMap import java.lang.Object import kafka.common.{UnavailableProducerException, NoBrokersForPartitionException} Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1362686) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -91,6 +91,10 @@ // starting relevant replicas and leader election for partitions assigned to this broker kafkaZookeeper.startup + // start the replica manager + replicaManager.startup() + + // start the controller kafkaController.startup() info("Server started.") Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1362686) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantLock import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging} import kafka.common.InvalidPartitionException +import java.io.{File, RandomAccessFile} class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging { @@ -33,14 +34,27 @@ private val leaderReplicaLock = new ReentrantLock() private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true) private val replicaFetcherManager = new ReplicaFetcherManager(config, this) + private val highWatermarkCheckpointScheduler = new KafkaScheduler(1, "highwatermark-checkpoint-thread", true) - // start ISR expiration thread - isrExpirationScheduler.startUp - isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs) + val logDir = new File(config.logDir) + val highWatermarkFileName = "highwatermark" + /* create the highwatermark file handle for all partitions */ + private val hwFile = new RandomAccessFile(logDir.getAbsolutePath + "/" + highWatermarkFileName, "rw") + info("Created highwatermark file %s on broker %d".format(logDir.getAbsolutePath + "/" + highWatermarkFileName, config.brokerId)) + def startup() { + // start the highwatermark checkpoint thread + highWatermarkCheckpointScheduler.startUp + highWatermarkCheckpointScheduler.scheduleWithRate(checkpointHighwaterMark, 0, config.defaultFlushIntervalMs) + // start ISR expiration thread + isrExpirationScheduler.startUp + isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs) + } + def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = { val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds) - val localReplica = new Replica(config.brokerId, partition, topic, Some(log)) + val localReplica = new Replica(config.brokerId, partition, topic, + Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log)) val replicaOpt = partition.getReplica(config.brokerId) replicaOpt match { @@ -166,7 +180,7 @@ } replica.log match { case Some(log) => // log is already started - log.recoverUptoLastCheckpointedHW() + log.recoverUptoLastCheckpointedHighWatermark(readCheckpointedHighWatermark(replica.topic, replica.partition.partitionId)) case None => } // get leader for this replica @@ -246,8 +260,63 @@ } } + /** + * Flushes the highwatermark value for all partitions to the highwatermark file + */ + def checkpointHighwaterMark() { + hwFile.synchronized { + hwFile.seek(0) + // checkpoint highwatermark for valid replicas, throw error for the rest + // write the number of entries in the highwatermark file + hwFile.writeInt(allReplicas.size) + allReplicas.map(_._2).foreach { partition => + val topic = partition.topic + val partitionId = partition.partitionId + hwFile.writeUTF(topic) + hwFile.writeInt(partitionId) + val localReplicaOpt = partition.getReplica(config.brokerId) + localReplicaOpt match { + case Some(localReplica) => + hwFile.writeLong(localReplica.highWatermark()) + case None => + hwFile.writeLong(0L) + error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) + + " Replica metadata doesn't exist in replica manager on broker " + config.brokerId) + } + } + hwFile.getChannel.force(true) + } + } + + /** + * Reads the checkpointed highWatermarks for all partitions + * @returns checkpointed value of highwatermark for topic, partition. If one doesn't exist, returns 0 + */ + def readCheckpointedHighWatermark(topic: String, partition: Int): Long = { + hwFile.synchronized { + hwFile.length() match { + case 0 => 0L + case _ => + hwFile.seek(0) + val numberOfHighWatermarks = hwFile.readInt() + val partitionHighWatermarks = + for(i <- 0 until numberOfHighWatermarks) yield ((hwFile.readUTF(), hwFile.readInt()) -> hwFile.readLong()) + val hwOpt = partitionHighWatermarks.toMap.get((topic, partition)) + hwOpt match { + case Some(hw) => hw + case None => error("No previously checkpointed highwatermark value found for topic %s ".format(topic) + + "partition %d on broker %d. Returning 0 as the highwatermark".format(partition, config.brokerId)) + 0L + } + } + } + } + def close() { isrExpirationScheduler.shutdown() replicaFetcherManager.shutdown() + highWatermarkCheckpointScheduler.shutdown() + checkpointHighwaterMark() + hwFile.close() } } Index: core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java =================================================================== --- core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (revision 1362686) +++ core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (working copy) @@ -18,13 +18,14 @@ package kafka.javaapi.consumer; -import java.util.List; -import java.util.Map; import kafka.consumer.KafkaStream; import kafka.consumer.TopicFilter; import kafka.message.Message; import kafka.serializer.Decoder; +import java.util.List; +import java.util.Map; + public interface ConsumerConnector { /** * Create a list of MessageStreams of type T for each topic.