diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 7405c5a..d5de5f3 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -54,7 +54,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
         if (!options.has(jsonFileOpt))
           ZkUtils.getAllPartitions(zkClient)
         else
-          parsePreferredReplicaJsonData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
+          parsePreferredReplicaElectionData(Utils.readFileAsString(options.valueOf(jsonFileOpt)))
       val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
 
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
@@ -69,7 +69,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
     }
   }
 
-  def parsePreferredReplicaJsonData(jsonString: String): immutable.Set[TopicAndPartition] = {
+  def parsePreferredReplicaElectionData(jsonString: String): immutable.Set[TopicAndPartition] = {
     Json.parseFull(jsonString) match {
       case Some(m) =>
         m.asInstanceOf[Map[String, Any]].get("partitions") match {
@@ -101,8 +101,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       info("Created preferred replica election path with %s".format(jsonData))
     } catch {
       case nee: ZkNodeExistsException =>
-        val partitionsUndergoingPreferredReplicaElection =
-          PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
+        val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
         throw new AdministrationException("Preferred replica leader election currently in progress for " +
           "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
       case e2 => throw new AdministrationException(e2.toString)
diff --git a/core/src/main/scala/kafka/common/LeaderElectionNotNeededException.scala b/core/src/main/scala/kafka/common/LeaderElectionNotNeededException.scala
new file mode 100644
index 0000000..ca89d25
--- /dev/null
+++ b/core/src/main/scala/kafka/common/LeaderElectionNotNeededException.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * This exception is thrown when new leader election is not necessary.
+ */
+class LeaderElectionNotNeededException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
+}
+
+
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 229239c..9d32901 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -386,8 +386,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
-    controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
-    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+    try {
+      controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
+      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+    } catch {
+      case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
+    } finally {
+      removePartitionsFromPreferredReplicaElection(partitions)
+    }
   }
 
   /**
@@ -910,20 +916,15 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
    */
   @throws(classOf[Exception])
   def handleDataChange(dataPath: String, data: Object) {
-    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election" +
-      " %s".format(dataPath, data.toString))
-    val partitionsForPreferredReplicaElection =
-      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(data.toString)
-    val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
+            .format(dataPath, data.toString))
+    val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
+
     controllerContext.controllerLock synchronized {
-      try {
-        controller.onPreferredReplicaElection(newPartitions)
-      } catch {
-        case e => error("Error completing preferred replica leader election for partitions %s"
-          .format(partitionsForPreferredReplicaElection.mkString(",")), e)
-      } finally {
-        controller.removePartitionsFromPreferredReplicaElection(newPartitions)
-      }
+      info("These partitions are already undergoing preferred replica election: %s"
+             .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+      val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+      controller.onPreferredReplicaElection(newPartitions)
     }
   }
 
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index d295781..7a06c24 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -18,7 +18,7 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.utils.Logging
-import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
+import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 
 trait PartitionLeaderSelector {
 
@@ -125,9 +125,9 @@ with Logging {
     val preferredReplica = assignedReplicas.head
     // check if preferred replica is the current leader
     val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
-    if(currentLeader == preferredReplica) {
-      throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s"
-        .format(preferredReplica, topicAndPartition))
+    if (currentLeader == preferredReplica) {
+      throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
+                                                   .format(preferredReplica, topicAndPartition))
     } else {
       info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
         " Trigerring preferred replica leader election")
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 654fa2e..da47ac8 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -20,7 +20,7 @@ import collection._
 import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
-import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
+import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -315,6 +315,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
     } catch {
+      case lenne: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe
       case sce =>
         val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9a0e250..ce1904b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -633,22 +633,7 @@ object ZkUtils extends Logging {
     // read the partitions and their new replica list
     val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1
     jsonPartitionListOpt match {
-      case Some(jsonPartitionList) => parsePreferredReplicaElectionData(jsonPartitionList)
-      case None => Set.empty[TopicAndPartition]
-    }
-  }
-
-  def parsePreferredReplicaElectionData(jsonData: String):Set[TopicAndPartition] = {
-    Json.parseFull(jsonData) match {
-      case Some(m) =>
-        val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]]
-        val partitions = topicAndPartitions.map { p =>
-          val topicPartitionMap = p
-          val topic = topicPartitionMap.get("topic").get
-          val partition = topicPartitionMap.get("partition").get.toInt
-          TopicAndPartition(topic, partition)
-        }
-        Set.empty[TopicAndPartition] ++ partitions
+      case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList)
       case None => Set.empty[TopicAndPartition]
     }
   }
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 6c80c4c..b0a0e09 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -332,7 +332,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val preferredReplicaElectionZkData = ZkUtils.readData(zkClient,
         ZkUtils.PreferredReplicaLeaderElectionPath)._1
     val partitionsUndergoingPreferredReplicaElection =
-      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(preferredReplicaElectionZkData)
+      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(preferredReplicaElectionZkData)
     assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
       partitionsUndergoingPreferredReplicaElection)
   }
