Index: core/src/main/scala/kafka/cluster/Partition.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (date 1406400865000) +++ core/src/main/scala/kafka/cluster/Partition.scala (date 1406403778000) @@ -18,8 +18,7 @@ import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils.{ReplicationUtils, Pool, Time, Logging} -import kafka.utils.Utils.inLock +import kafka.utils._ import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig import kafka.server.{OffsetManager, ReplicaManager} @@ -29,7 +28,7 @@ import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock -import scala.Some +import kafka.utils.Utils.{inReadLock,inWriteLock} import scala.collection._ import com.yammer.metrics.core.Gauge @@ -73,7 +72,7 @@ ) def isUnderReplicated(): Boolean = { - inLock(leaderIsrUpdateLock.readLock()) { + inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal() match { case Some(_) => inSyncReplicas.size < assignedReplicas.size @@ -115,7 +114,7 @@ } def leaderReplicaIfLocal(): Option[Replica] = { - inLock(leaderIsrUpdateLock.readLock()) { + inReadLock(leaderIsrUpdateLock) { leaderReplicaIdOpt match { case Some(leaderReplicaId) => if (leaderReplicaId == localBrokerId) @@ -141,7 +140,7 @@ def delete() { // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted - inLock(leaderIsrUpdateLock.writeLock()) { + inWriteLock(leaderIsrUpdateLock) { assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None @@ -156,7 +155,7 @@ } def getLeaderEpoch(): Int = { - inLock(leaderIsrUpdateLock.readLock()) { + inReadLock(leaderIsrUpdateLock) { return this.leaderEpoch } } @@ -168,7 +167,7 @@ def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - inLock(leaderIsrUpdateLock.writeLock()) { + inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -201,7 +200,7 @@ def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId: Int, offsetManager: OffsetManager): Boolean = { - inLock(leaderIsrUpdateLock.writeLock()) { + inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -235,7 +234,7 @@ } def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { - inLock(leaderIsrUpdateLock.writeLock()) { + inWriteLock(leaderIsrUpdateLock) { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) val replicaOpt = getReplica(replicaId) if(!replicaOpt.isDefined) { @@ -271,7 +270,7 @@ } def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { - inLock(leaderIsrUpdateLock.readLock()) { + inReadLock(leaderIsrUpdateLock) { leaderReplicaIfLocal() match { case Some(_) => val numAcks = inSyncReplicas.count(r => { @@ -315,7 +314,7 @@ } def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { - inLock(leaderIsrUpdateLock.writeLock()) { + inWriteLock(leaderIsrUpdateLock) { leaderReplicaIfLocal() match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) @@ -357,7 +356,7 @@ } def appendMessagesToLeader(messages: ByteBufferMessageSet) = { - inLock(leaderIsrUpdateLock.readLock()) { + inReadLock(leaderIsrUpdateLock) { val leaderReplicaOpt = leaderReplicaIfLocal() leaderReplicaOpt match { case Some(leaderReplica) => @@ -400,7 +399,7 @@ } override def toString(): String = { - inLock(leaderIsrUpdateLock.readLock()) { + inReadLock(leaderIsrUpdateLock) { val partitionString = new StringBuilder partitionString.append("Topic: " + topic) partitionString.append("; Partition: " + partitionId) Index: core/src/main/scala/kafka/server/MetadataCache.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/server/MetadataCache.scala (date 1406400865000) +++ core/src/main/scala/kafka/server/MetadataCache.scala (date 1406403778000) @@ -25,7 +25,6 @@ import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException} import kafka.common.TopicAndPartition import kafka.controller.KafkaController.StateChangeLogger -import scala.Some /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through @@ -34,14 +33,14 @@ private[server] class MetadataCache { private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() - private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() + private var aliveBrokers: Map[Int, Broker] = Map() private val partitionMetadataLock = new ReentrantReadWriteLock() def getTopicMetadata(topics: Set[String]) = { val isAllTopics = topics.isEmpty val topicsRequested = if(isAllTopics) cache.keySet else topics val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] - inLock(partitionMetadataLock.readLock()) { + inReadLock(partitionMetadataLock) { for (topic <- topicsRequested) { if (isAllTopics || cache.contains(topic)) { val partitionStateInfos = cache(topic) @@ -82,15 +81,15 @@ } def getAliveBrokers = { - inLock(partitionMetadataLock.readLock()) { - aliveBrokers.values.toList + inReadLock(partitionMetadataLock) { + aliveBrokers.values.toSeq } } def addOrUpdatePartitionInfo(topic: String, partitionId: Int, stateInfo: PartitionStateInfo) { - inLock(partitionMetadataLock.writeLock()) { + inWriteLock(partitionMetadataLock) { cache.get(topic) match { case Some(infos) => infos.put(partitionId, stateInfo) case None => { @@ -103,7 +102,7 @@ } def getPartitionInfo(topic: String, partitionId: Int): Option[PartitionStateInfo] = { - inLock(partitionMetadataLock.readLock()) { + inReadLock(partitionMetadataLock) { cache.get(topic) match { case Some(partitionInfos) => partitionInfos.get(partitionId) case None => None @@ -114,8 +113,8 @@ def updateCache(updateMetadataRequest: UpdateMetadataRequest, brokerId: Int, stateChangeLogger: StateChangeLogger) { - inLock(partitionMetadataLock.writeLock()) { - updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) + inWriteLock(partitionMetadataLock) { + aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { removePartitionInfo(tp.topic, tp.partition) Index: core/src/main/scala/kafka/utils/Utils.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (date 1406400865000) +++ core/src/main/scala/kafka/utils/Utils.scala (date 1406403778000) @@ -21,7 +21,7 @@ import java.nio._ import charset.Charset import java.nio.channels._ -import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.{ReadWriteLock, Lock} import java.lang.management._ import javax.management._ import scala.collection._ @@ -540,13 +540,18 @@ def inLock[T](lock: Lock)(fun: => T): T = { lock.lock() try { - return fun + fun } finally { lock.unlock() } } + + def inReadLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.readLock)(fun) + + def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun) + - //JSON strings need to be escaped based on ECMA-404 standard http://json.org + //JSON strings need to be escaped based on ECMA-404 standard http://json.org def JSONEscapeString (s : String) : String = { s.map { case '"' => "\\\""