diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 59c9bc3..55f6ecf 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -86,6 +86,7 @@ object LogReadResult { object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" + case class DirAndOffset(dir: String, offset: Long) } class ReplicaManager(val config: KafkaConfig, @@ -111,6 +112,8 @@ class ReplicaManager(val config: KafkaConfig, val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) + var allHighwaterMarks = Map.empty[TopicAndPartition, ReplicaManager.DirAndOffset] + newGauge( "LeaderCount", new Gauge[Int] { @@ -169,6 +172,9 @@ class ReplicaManager(val config: KafkaConfig, def startup() { // start ISR expiration thread + for ((dir, checkpoint) <- highWatermarkCheckpoints) { + allHighwaterMarks ++= checkpoint.read().map({case (t, offset) => (t, ReplicaManager.DirAndOffset(dir, offset))}) + } scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS) } @@ -180,8 +186,10 @@ class ReplicaManager(val config: KafkaConfig, case Some(partition) => if(deletePartition) { val removedPartition = allPartitions.remove((topic, partitionId)) - if (removedPartition != null) + if (removedPartition != null) { removedPartition.delete() // this will delete the local log + allHighwaterMarks -= TopicAndPartition(topic, partitionId) + } } case None => // Delete log and corresponding folders in case replica manager doesn't hold them anymore. @@ -190,7 +198,8 @@ class ReplicaManager(val config: KafkaConfig, val topicAndPartition = TopicAndPartition(topic, partitionId) if(logManager.getLog(topicAndPartition).isDefined) { - logManager.deleteLog(topicAndPartition) + logManager.deleteLog(topicAndPartition) + allHighwaterMarks -= topicAndPartition } } stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker" @@ -805,11 +814,24 @@ class ReplicaManager(val config: KafkaConfig, // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks() { val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) - for((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap + + allHighwaterMarks ++= replicas.filter(_.log.isDefined).map({ + case replica => + val dir = replica.log.get.dir.getParentFile.getAbsolutePath + val topicAndPartition = TopicAndPartition(replica.topic, replica.partitionId) + val hw = replica.highWatermark.messageOffset + + topicAndPartition -> ReplicaManager.DirAndOffset(dir, hw) + }) + + val hwmsByDir = allHighwaterMarks.groupBy({ + case (topicAndPartition, ReplicaManager.DirAndOffset(dir, offset)) => dir + }) + + for ((dir, hwms) <- hwmsByDir) { + val save = hwms.map({case (topicAndPartition, ReplicaManager.DirAndOffset(_, offset)) => topicAndPartition -> offset}) try { - highWatermarkCheckpoints(dir).write(hwms) + highWatermarkCheckpoints(dir).write(save) } catch { case e: IOException => fatal("Error writing to highwatermark file: ", e)