From 72ac1768ec4fba4307e9dbd924ef3c99de158104 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Fri, 22 Feb 2013 10:57:35 -0800
Subject: [PATCH] Ensure that the highwater mark thread gets scheduled after the first isr request

---
 .../main/scala/kafka/server/ReplicaManager.scala   |    9 +++++++--
 1 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1044085..4e6c8ea 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -49,6 +49,7 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
+  private var hwThreadInitialized = false
 
   newGauge(
     "LeaderCount",
@@ -92,8 +93,6 @@ class ReplicaManager(val config: KafkaConfig,
   def startup() {
     // start ISR expiration thread
     kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs)
-    // start high watermark checkpoint thread
-    startHighWaterMarksCheckPointThread()
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
@@ -209,6 +208,12 @@ class ReplicaManager(val config: KafkaConfig,
         responseMap.put(topicAndPartition, errorCode)
       }
       info("Completed leader and isr request %s".format(leaderAndISRRequest))
+      // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
+      // have been completely populated before starting the checkpointing there by avoiding weird race conditions
+      if (!hwThreadInitialized) {
+        startHighWaterMarksCheckPointThread()
+        hwThreadInitialized = true
+      }
       replicaFetcherManager.shutdownIdleFetcherThreads()
       (responseMap, ErrorMapping.NoError)
     }
-- 
1.7.1

