diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 7405c5a..7f42358 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -54,7 +54,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
         if (!options.has(jsonFileOpt))
           ZkUtils.getAllPartitions(zkClient)
         else
-          parsePreferredReplicaJsonData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
+          ZkUtils.parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
       val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
 
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
@@ -69,22 +69,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     }
   }
 
-  def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = {
-    Json.parseFull(jsonString) match {
-      case Some(m) =>
-        m.asInstanceOf[Map[String, Any]].get("partitions") match {
-          case Some(partitionsList) =>
-            val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]]
-            partitions.map { p =>
-              val topic = p.get("topic").get.asInstanceOf[String]
-              val partition = p.get("partition").get.asInstanceOf[Int]
-              TopicAndPartition(topic, partition)
-            }.toSet
-          case None => throw new AdministrationException("Preferred replica election data is empty")
-        }
-      case None => throw new AdministrationException("Preferred replica election data is empty")
-    }
-  }
+
 
   def writePreferredReplicaElectionData(zkClient: ZkClient,
                                         partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
@@ -101,8 +86,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       info("Created preferred replica election path with %s".format(jsonData))
     } catch {
       case nee: ZkNodeExistsException =>
-        val partitionsUndergoingPreferredReplicaElection =
-          PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
+        val partitionsUndergoingPreferredReplicaElection = ZkUtils.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
         throw new AdministrationException("Preferred replica leader election currently in progress for " +
           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
       case e2 => throw new AdministrationException(e2.toString)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 229239c..558125b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -386,8 +386,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
-    controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+    try {
+      controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
+      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+    } catch {
+      case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+    } finally {
+      removePartitionsFromPreferredReplicaElection(partitions)
+    }
   }
 
   /**
@@ -910,20 +916,15 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
    */
   @throws(classOf[Exception])
   def handleDataChange(dataPath: String, data: Object) {
-    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election" +
-      " %s".format(dataPath, data.toString))
-    val partitionsForPreferredReplicaElection =
-      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(data.toString)
+    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
+            .format(dataPath, data.toString))
+    info("These partitions are already undergoing preferred replica election: %s"
+           .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+
+    val partitionsForPreferredReplicaElection = ZkUtils.parsePreferredReplicaElectionData(data.toString)
     val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
     controllerContext.controllerLock synchronized {
-      try {
-        controller.onPreferredReplicaElection(newPartitions)
-      } catch {
-        case e => error("Error completing preferred replica leader election for partitions %s"
-          .format(partitionsForPreferredReplicaElection.mkString(",")), e)
-      } finally {
-        controller.removePartitionsFromPreferredReplicaElection(newPartitions)
-      }
+      controller.onPreferredReplicaElection(newPartitions)
     }
   }
 
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9a0e250..498e020 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -638,18 +638,20 @@ object ZkUtils extends Logging {
     }
   }
 
-  def parsePreferredReplicaElectionData(jsonData: String):Set[TopicAndPartition] = {
-    Json.parseFull(jsonData) match {
+  def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicAndPartition] = {
+    Json.parseFull(jsonString) match {
       case Some(m) =>
-        val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]]
-        val partitions = topicAndPartitions.map { p =>
-          val topicPartitionMap = p
-          val topic = topicPartitionMap.get("topic").get
-          val partition = topicPartitionMap.get("partition").get.toInt
-          TopicAndPartition(topic, partition)
+        m.asInstanceOf[Map[String, Any]].get("partitions") match {
+          case Some(partitionsList) =>
+            val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]]
+            partitions.map { p =>
+              val topic = p.get("topic").get.asInstanceOf[String]
+              val partition = p.get("partition").get.asInstanceOf[Int]
+              TopicAndPartition(topic, partition)
+            }.toSet
+          case None => throw new AdministrationException("Preferred replica election data is empty")
         }
-        Set.empty[TopicAndPartition] ++ partitions
-      case None => Set.empty[TopicAndPartition]
+      case None => throw new AdministrationException("Preferred replica election data is empty")
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 6c80c4c..06d7dec 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -331,8 +331,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     // try to read it back and compare with what was written
     val preferredReplicaElectionZkData = ZkUtils.readData(zkClient,
         ZkUtils.PreferredReplicaLeaderElectionPath)._1
-    val partitionsUndergoingPreferredReplicaElection =
-      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(preferredReplicaElectionZkData)
+    val partitionsUndergoingPreferredReplicaElection = ZkUtils.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
     assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
       partitionsUndergoingPreferredReplicaElection)
   }
