Index: core/src/main/scala/kafka/consumer/TopicCount.scala =================================================================== --- core/src/main/scala/kafka/consumer/TopicCount.scala (revision 1368092) +++ core/src/main/scala/kafka/consumer/TopicCount.scala (working copy) @@ -18,12 +18,10 @@ package kafka.consumer import scala.collection._ -import scala.util.parsing.json.JSON import org.I0Itec.zkclient.ZkClient import java.util.regex.Pattern -import kafka.utils.{ZKGroupDirs, ZkUtils, Logging} +import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging} - private[kafka] trait TopicCount { def getConsumerThreadIdsPerTopic: Map[String, Set[String]] def dbString: String @@ -59,9 +57,6 @@ private val BLACKLIST_PATTERN = Pattern.compile("""!(\p{Digit}+)!(.*)""") - val myConversionFunc = {input : String => input.toInt} - JSON.globalNumberParser = myConversionFunc - def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient) : TopicCount = { @@ -93,7 +88,7 @@ else { var topMap : Map[String,Int] = null try { - JSON.parseFull(topicCountString) match { + SyncJSON.parseFull(topicCountString) match { case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString) } Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1368092) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -31,6 +31,7 @@ import java.util.{Random, Properties} import joptsimple.{OptionSpec, OptionSet, OptionParser} import kafka.common.KafkaException +import util.parsing.json.JSON /** @@ -905,4 +906,24 @@ def durationMs: Double = (end.get - start) / (1000.0 * 1000.0) } +} + +/** + * A wrapper that synchronizes JSON in scala, which is not threadsafe. + */ +object SyncJSON extends Logging { + val myConversionFunc = {input : String => input.toInt} + JSON.globalNumberParser = myConversionFunc + val lock = new Object + + def parseFull(input: String): Option[Any] = { + lock synchronized { + try { + JSON.parseFull(input) + } catch { + case t => + throw new RuntimeException("can't parse json string: %s".format(input), t) + } + } + } } \ No newline at end of file Index: core/src/main/scala/kafka/utils/ZkUtils.scala =================================================================== --- core/src/main/scala/kafka/utils/ZkUtils.scala (revision 1368092) +++ core/src/main/scala/kafka/utils/ZkUtils.scala (working copy) @@ -24,7 +24,6 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} import org.I0Itec.zkclient.serialize.ZkSerializer import scala.collection._ -import util.parsing.json.JSON import kafka.api.LeaderAndISR import kafka.common.NoEpochForPartitionException import org.apache.zookeeper.data.Stat @@ -78,7 +77,7 @@ val stat = ret._2 if(leaderAndISRStr == null) None else { - JSON.parseFull(leaderAndISRStr) match { + SyncJSON.parseFull(leaderAndISRStr) match { case Some(m) => val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt @@ -97,7 +96,7 @@ val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1 if(leaderAndISR == null) None else { - JSON.parseFull(leaderAndISR) match { + SyncJSON.parseFull(leaderAndISR) match { case Some(m) => Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt) case None => None @@ -113,7 +112,7 @@ def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = { val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1 if(leaderAndISR != null) { - val epoch = JSON.parseFull(leaderAndISR) match { + val epoch = SyncJSON.parseFull(leaderAndISR) match { case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition)) case Some(m) => m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt @@ -131,7 +130,7 @@ val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1 if(leaderAndISR == null) Seq.empty[Int] else { - JSON.parseFull(leaderAndISR) match { + SyncJSON.parseFull(leaderAndISR) match { case Some(m) => val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get Utils.getCSVList(ISRString).map(r => r.toInt) @@ -148,7 +147,7 @@ val assignedReplicas = if (jsonPartitionMap == null) { Seq.empty[Int] } else { - JSON.parseFull(jsonPartitionMap) match { + SyncJSON.parseFull(jsonPartitionMap) match { case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match { case None => Seq.empty[Int] case Some(seq) => seq.map(_.toInt) @@ -165,27 +164,6 @@ replicas.contains(brokerId.toString) } - def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = { - // read previous epoch, increment it and write it to the leader path and the ISR path. - val epoch = try { - Some(getEpochForPartition(client, topic, partition)) - }catch { - case e: NoEpochForPartitionException => None - case e1 => throw e1 - } - - val newEpoch = epoch match { - case Some(partitionEpoch) => - debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch)) - partitionEpoch + 1 - case None => - // this is the first time leader is elected for this partition. So set epoch to 1 - debug("First epoch is 1 for topic %s partition %d".format(topic, partition)) - 1 - } - newEpoch - } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val broker = new Broker(id, creator, host, port) @@ -424,7 +402,7 @@ topics.foreach{ topic => val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1 if (jsonPartitionMap != null) { - JSON.parseFull(jsonPartitionMap) match { + SyncJSON.parseFull(jsonPartitionMap) match { case Some(m) => val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] for((partition, replicas) <- replicaMap){ @@ -458,7 +436,7 @@ val partitionMap = if (jsonPartitionMap == null) { Map[Int, Seq[Int]]() } else { - JSON.parseFull(jsonPartitionMap) match { + SyncJSON.parseFull(jsonPartitionMap) match { case Some(m) => val m1 = m.asInstanceOf[Map[String, Seq[String]]] m1.map(p => (p._1.toInt, p._2.map(_.toInt))) @@ -552,31 +530,8 @@ if(topics == null) Seq.empty[String] else topics } - - def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int) = { - // read previous epoch, increment it and write it to the leader path and the ISR path. - val epoch = try { - Some(getEpochForPartition(client, topic, partition)) - }catch { - case e: NoEpochForPartitionException => None - case e1 => throw e1 - } - val newEpoch = epoch match { - case Some(partitionEpoch) => - debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch)) - partitionEpoch + 1 - case None => - // this is the first time leader is elected for this partition. So set epoch to 1 - debug("First epoch is 1 for topic %s partition %d".format(topic, partition)) - LeaderAndISR.initialLeaderEpoch - } - newEpoch - } } - - - class LeaderExistsOrChangedListener(topic: String, partition: Int, leaderLock: ReentrantLock,