From 84bc159a0da47765d88ac218547c8014aa16de3d Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Tue, 12 Mar 2013 10:14:29 -0700
Subject: [PATCH 1/2] fix synchronization issues

---
 core/src/main/scala/kafka/cluster/Partition.scala  |   20 +++++---
 .../main/scala/kafka/server/ReplicaManager.scala   |   46 ++++++++++++--------
 2 files changed, 40 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 824e394..6e73003 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -67,7 +67,9 @@ class Partition(val topic: String,
   )
 
   def isUnderReplicated(): Boolean = {
-    inSyncReplicas.size < replicationFactor
+    leaderIsrUpdateLock synchronized {
+      inSyncReplicas.size < replicationFactor
+    }
   }
 
   def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
@@ -339,12 +341,14 @@ class Partition(val topic: String,
   }
 
   override def toString(): String = {
-    val partitionString = new StringBuilder
-    partitionString.append("Topic: " + topic)
-    partitionString.append("; Partition: " + partitionId)
-    partitionString.append("; Leader: " + leaderReplicaIdOpt)
-    partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
-    partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
-    partitionString.toString()
+    leaderIsrUpdateLock synchronized {
+      val partitionString = new StringBuilder
+      partitionString.append("Topic: " + topic)
+      partitionString.append("; Partition: " + partitionId)
+      partitionString.append("; Leader: " + leaderReplicaIdOpt)
+      partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+      partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+      partitionString.toString()
+    }
   }
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 53e34ec..c10f310 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -57,7 +57,11 @@ class ReplicaManager(val config: KafkaConfig,
   newGauge(
     "LeaderCount",
     new Gauge[Int] {
-      def getValue = leaderPartitions.size
+      def getValue = {
+        leaderPartitionsLock synchronized {
+          leaderPartitions.size
+        }
+      }
     }
   )
   newGauge(
@@ -115,9 +119,9 @@ class ReplicaManager(val config: KafkaConfig,
         //  logManager.deleteLog(topic, partition)
         leaderPartitionsLock synchronized {
           leaderPartitions -= replica.partition
+          allPartitions.remove((topic, partitionId))
+          info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
         }
-        allPartitions.remove((topic, partitionId))
-        info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
       case None => //do nothing if replica no longer exists
     }
     stateChangeLogger.trace("Broker %d finished handling stop replica [%s,%d]".format(localBrokerId, topic, partitionId))
@@ -143,20 +147,24 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = {
-    var partition = allPartitions.get((topic, partitionId))
-    if (partition == null) {
-      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this))
-      partition = allPartitions.get((topic, partitionId))
+    leaderPartitionsLock synchronized {
+      var partition = allPartitions.get((topic, partitionId))
+      if (partition == null) {
+        allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this))
+        partition = allPartitions.get((topic, partitionId))
+      }
+      partition
     }
-    partition
   }
 
   def getPartition(topic: String, partitionId: Int): Option[Partition] = {
-    val partition = allPartitions.get((topic, partitionId))
-    if (partition == null)
-      None
-    else
-      Some(partition)
+    leaderPartitionsLock synchronized {
+      val partition = allPartitions.get((topic, partitionId))
+      if (partition == null)
+        None
+      else
+        Some(partition)
+    }
   }
 
   def getReplicaOrException(topic: String, partition: Int): Replica = {
@@ -297,11 +305,13 @@ class ReplicaManager(val config: KafkaConfig,
    * Flushes the highwatermark value for all partitions to the highwatermark file
    */
   def checkpointHighWatermarks() {
-    val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
-    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
-    for((dir, reps) <- replicasByDir) {
-      val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
-      highWatermarkCheckpoints(dir).write(hwms)
+    leaderPartitionsLock synchronized {
+      val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
+      val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
+      for((dir, reps) <- replicasByDir) {
+        val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
+        highWatermarkCheckpoints(dir).write(hwms)
+      }
     }
   }
 
-- 
1.7.1


From 4e42e7881d1beecfab3e9bef21da4cd74bd388ef Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Tue, 12 Mar 2013 11:10:36 -0700
Subject: [PATCH 2/2] remove synchronization for allpartitions

---
 .../main/scala/kafka/server/ReplicaManager.scala   |   40 ++++++++-----------
 1 files changed, 17 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c10f310..68e712c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -119,9 +119,9 @@ class ReplicaManager(val config: KafkaConfig,
         //  logManager.deleteLog(topic, partition)
         leaderPartitionsLock synchronized {
           leaderPartitions -= replica.partition
-          allPartitions.remove((topic, partitionId))
-          info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
         }
+        allPartitions.remove((topic, partitionId))
+        info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
       case None => //do nothing if replica no longer exists
     }
     stateChangeLogger.trace("Broker %d finished handling stop replica [%s,%d]".format(localBrokerId, topic, partitionId))
@@ -147,24 +147,20 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = {
-    leaderPartitionsLock synchronized {
-      var partition = allPartitions.get((topic, partitionId))
-      if (partition == null) {
-        allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this))
-        partition = allPartitions.get((topic, partitionId))
-      }
-      partition
+    var partition = allPartitions.get((topic, partitionId))
+    if (partition == null) {
+      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this))
+      partition = allPartitions.get((topic, partitionId))
     }
+    partition
   }
 
   def getPartition(topic: String, partitionId: Int): Option[Partition] = {
-    leaderPartitionsLock synchronized {
-      val partition = allPartitions.get((topic, partitionId))
-      if (partition == null)
-        None
-      else
-        Some(partition)
-    }
+    val partition = allPartitions.get((topic, partitionId))
+    if (partition == null)
+      None
+    else
+      Some(partition)
   }
 
   def getReplicaOrException(topic: String, partition: Int): Replica = {
@@ -305,13 +301,11 @@ class ReplicaManager(val config: KafkaConfig,
    * Flushes the highwatermark value for all partitions to the highwatermark file
    */
   def checkpointHighWatermarks() {
-    leaderPartitionsLock synchronized {
-      val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
-      val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
-      for((dir, reps) <- replicasByDir) {
-        val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
-        highWatermarkCheckpoints(dir).write(hwms)
-      }
+    val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
+    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
+    for((dir, reps) <- replicasByDir) {
+      val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
+      highWatermarkCheckpoints(dir).write(hwms)
     }
   }
 
-- 
1.7.1

