Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(revision 1421054)
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(working copy)
@@ -122,7 +122,8 @@
 
 
     // start another controller
-    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
+    val controllerId = 2
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
     val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
     controllerChannelManager.startup()
@@ -131,7 +132,7 @@
     leaderAndIsr.put((topic, partitionId),
       new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
     val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
-    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch)
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch)
 
     controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
     TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
Index: core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala	(revision 1421054)
+++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala	(working copy)
@@ -89,7 +89,7 @@
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1)
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1)
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
Index: core/src/main/scala/kafka/cluster/Partition.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Partition.scala	(revision 1421054)
+++ core/src/main/scala/kafka/cluster/Partition.scala	(working copy)
@@ -25,6 +25,7 @@
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.ErrorMapping
 import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
+import kafka.tools.PartitionStateChangeLogger
 
 
 /**
@@ -51,7 +52,9 @@
    * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
    * each partition. */
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
+
   this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
@@ -127,12 +130,12 @@
   def makeLeader(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-      if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
+      if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
+        partitionStateChangeLogger.info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
           .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
+      partitionStateChangeLogger.trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@@ -163,17 +166,17 @@
                    liveBrokers: Set[Broker]): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-      if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
+      if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
+        partitionStateChangeLogger.info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
           .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
+      partitionStateChangeLogger.trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
       val newLeaderBrokerId: Int = leaderAndIsr.leader
-      info("Starting the follower state transition to follow leader %d for topic %s partition %d"
+      partitionStateChangeLogger.info("Starting the follower state transition to follow leader %d for topic %s partition %d"
         .format(newLeaderBrokerId, topic, partitionId))
       liveBrokers.find(_.id == newLeaderBrokerId) match {
         case Some(leaderBroker) =>
@@ -189,8 +192,8 @@
           // start fetcher thread to current leader
           replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
         case None => // leader went down
-          warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
-          " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
+          partitionStateChangeLogger.warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
+            " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
       }
       true
     }
Index: core/src/main/scala/kafka/controller/PartitionStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(revision 1421054)
+++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala	(working copy)
@@ -24,6 +24,7 @@
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.tools.PartitionStateChangeLogger
 
 /**
  * This class represents the state machine for partitions. It defines the states that a partition can be in, and
@@ -38,14 +39,16 @@
  *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
  */
 class PartitionStateMachine(controller: KafkaController) extends Logging {
-  this.logIdent = "[Partition state machine on Controller " + controller.config.brokerId + "]: "
   private val controllerContext = controller.controllerContext
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val isShuttingDown = new AtomicBoolean(false)
 
+  this.logIdent = "[Partition state machine on Controller " + controller.config.brokerId + "]: "
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
+
   /**
    * Invoked on successful controller election. First registers a topic change listener since that triggers all
    * state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
@@ -125,12 +128,12 @@
       targetState match {
         case NewPartition =>
           // pre: partition did not exist before this
-          // post: partition has been assigned replicas
           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
           assignReplicasToPartitions(topic, partition)
           partitionState.put(topicAndPartition, NewPartition)
-          info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
+          partitionStateChangeLogger.info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
             "%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")))
+          // post: partition has been assigned replicas
         case OnlinePartition =>
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
           partitionState(topicAndPartition) match {
@@ -143,27 +146,28 @@
               electLeaderForPartition(topic, partition, leaderSelector)
             case _ => // should never come here since illegal previous states are checked above
           }
-          info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
+          partitionState.put(topicAndPartition, OnlinePartition)
+          partitionStateChangeLogger.info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
             partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
-          partitionState.put(topicAndPartition, OnlinePartition)
            // post: partition has a leader
         case OfflinePartition =>
-          // pre: partition should be in Online state
+          // pre: partition should be in New or Online state
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
           // should be called when the leader for a partition is no longer alive
-          info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
           partitionState.put(topicAndPartition, OfflinePartition)
+          partitionStateChangeLogger.info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
           // post: partition has no alive leader
         case NonExistentPartition =>
-          // pre: partition could be in either of the above states
+          // pre: partition should be in Offline state
           assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
-          info("Partition [%s, %d] state changed from Offline to NotExists".format(topic, partition))
           partitionState.put(topicAndPartition, NonExistentPartition)
+          partitionStateChangeLogger.info("Partition [%s, %d] state changed from Offline to NotExists".format(topic, partition))
           // post: partition state is deleted from all brokers and zookeeper
       }
     } catch {
-      case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) +
-        "from %s to %s failed".format(currState, targetState), t)
+      case t: Throwable =>
+        partitionStateChangeLogger.error("State change for partition [%s, %d] ".format(topic, partition) + "from %s to %s failed."
+          .format(currState, targetState), t)
     }
   }
 
@@ -218,7 +222,7 @@
    * @param topicAndPartition   The topic/partition whose leader and isr path is to be initialized
    */
   private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
-    debug("Initializing leader and isr for partition %s".format(topicAndPartition))
+    partitionStateChangeLogger.debug("Initializing leader and isr for partition %s".format(topicAndPartition))
     val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
     liveAssignedReplicas.size match {
@@ -228,7 +232,7 @@
           "[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition,
           replicaAssignment.mkString(","), controllerContext.liveBrokerIds))
       case _ =>
-        debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
+        partitionStateChangeLogger.debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
         // make the first replica in the list of assigned replicas, the leader
         val leader = liveAssignedReplicas.head
         val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
@@ -266,7 +270,7 @@
    */
   def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
     // handle leader election for the partitions whose leader is no longer alive
-    info("Electing leader for partition [%s, %d]".format(topic, partition))
+    partitionStateChangeLogger.info("Electing leader for partition [%s, %d]".format(topic, partition))
     try {
       var zookeeperPathUpdateSucceeded: Boolean = false
       var newLeaderAndIsr: LeaderAndIsr = null
@@ -292,7 +296,7 @@
       val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
       // update the leader cache
       controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
-      info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
+      partitionStateChangeLogger.info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
@@ -302,7 +306,7 @@
       case sce => throw new StateChangeFailedException(("Error while electing leader for partition " +
         " [%s, %d] due to: %s.").format(topic, partition, sce.getMessage), sce)
     }
-    debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
+    partitionStateChangeLogger.debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
   }
 
   private def registerTopicChangeListener() = {
Index: core/src/main/scala/kafka/controller/KafkaController.scala
===================================================================
--- core/src/main/scala/kafka/controller/KafkaController.scala	(revision 1421054)
+++ core/src/main/scala/kafka/controller/KafkaController.scala	(working copy)
@@ -87,7 +87,7 @@
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, config.brokerId)
   registerControllerChangedListener()
 
   newGauge(
Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(revision 1421054)
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(working copy)
@@ -22,6 +22,7 @@
 import kafka.common.{TopicAndPartition, StateChangeFailedException}
 import kafka.utils.{ZkUtils, Logging}
 import org.I0Itec.zkclient.IZkChildListener
+import kafka.tools.PartitionStateChangeLogger
 
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and
@@ -37,13 +38,15 @@
  * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica
  */
 class ReplicaStateMachine(controller: KafkaController) extends Logging {
-  this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
   private val controllerContext = controller.controllerContext
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId)
   private val isShuttingDown = new AtomicBoolean(false)
 
+  this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
+
   /**
    * Invoked on successful controller election. First registers a broker change listener since that triggers all
    * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
@@ -117,17 +120,16 @@
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
-          info("Replica %d for partition [%s, %d] state changed to NewReplica".format(replicaId, topic, partition))
+          partitionStateChangeLogger.info("Replica %d for partition [%s, %d] state changed to NewReplica".format(replicaId, topic, partition))
         case NonExistentReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
           // send stop replica command
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-          controllerContext.partitionReplicaAssignment.put(topicAndPartition,
-            currentAssignedReplicas.filterNot(_ == replicaId))
-          info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition))
+          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
           replicaState.remove((topic, partition, replicaId))
+          partitionStateChangeLogger.info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition))
         case OnlineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
           replicaState((topic, partition, replicaId)) match {
@@ -135,7 +137,7 @@
               // add this replica to the assigned replicas list for its partition
               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
               controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
-              info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
+              partitionStateChangeLogger.info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
             case _ =>
               // check if the leader for this partition is alive or even exists
                 controllerContext.allLeaders.get(topicAndPartition) match {
@@ -146,7 +148,7 @@
                                                                           topic, partition, leaderIsrAndControllerEpoch,
                                                                           replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OnlineReplica)
-                      info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
+                      partitionStateChangeLogger.info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
                     case false => // ignore partitions whose leader is not alive
                   }
                 case None => // ignore partitions who don't have a leader yet
@@ -166,8 +168,8 @@
                                                                         topic, partition, updatedLeaderIsrAndControllerEpoch,
                                                                         replicaAssignment.size)
                     replicaState.put((topic, partition, replicaId), OfflineReplica)
-                    info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
-                    info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
+                    partitionStateChangeLogger.info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
+                    partitionStateChangeLogger.info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
                     false
                   case None =>
                     true
@@ -183,8 +185,9 @@
       }
     }
     catch {
-      case t: Throwable => error("Error while changing state of replica %d for partition ".format(replicaId) +
-        "[%s, %d] to %s".format(topic, partition, targetState), t)
+      case t: Throwable =>
+        partitionStateChangeLogger.error("Error while changing state of replica %d for partition "
+          .format(replicaId) + "[%s, %d] to %s".format(topic, partition, targetState), t)
     }
   }
 
Index: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
===================================================================
--- core/src/main/scala/kafka/controller/ControllerChannelManager.scala	(revision 1421054)
+++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala	(working copy)
@@ -24,11 +24,12 @@
 import kafka.server.KafkaConfig
 import collection.mutable
 import kafka.api._
+import kafka.tools.PartitionStateChangeLogger
 
 class ControllerChannelManager private (config: KafkaConfig) extends Logging {
   private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
-  this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
+  this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
   def this(allBrokers: Set[Broker], config : KafkaConfig) {
     this(config)
@@ -143,12 +144,14 @@
   }
 }
 
-class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit)
+class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, controllerId: Int)
   extends  Logging {
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
   val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
 
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
+
   def newBatch() {
     // raise error if the previous batch is not empty
     if(leaderAndIsrRequestMap.size > 0 || stopReplicaRequestMap.size > 0)
@@ -190,8 +193,9 @@
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
       val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch)
-      debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch)
+      partitionStateInfos.foreach(p => partitionStateChangeLogger.debug(("Controller %d, epoch %d sending " +
+        "LeaderAndIsr request to broker %d for partition [%s, %d]").format(controllerId, controllerEpoch, broker, p._1._1, p._1._2)))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
     leaderAndIsrRequestMap.clear()
Index: core/src/main/scala/kafka/tools/PartitionStateChangeLogMerger.scala
===================================================================
--- core/src/main/scala/kafka/tools/PartitionStateChangeLogMerger.scala	(revision 0)
+++ core/src/main/scala/kafka/tools/PartitionStateChangeLogMerger.scala	(working copy)
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple._
+import scala.util.matching.Regex
+import collection.mutable.ListBuffer
+import collection.mutable
+import java.util.Date
+import java.text.SimpleDateFormat
+import kafka.utils.Logging
+import collection.immutable.TreeMap
+import org.apache.log4j.Logger
+
+class PartitionStateChangeLogger(ident: String) extends Logging {
+  override lazy val logger = Logger.getLogger("partitionStateChangeLogger")
+  logIdent = ident
+}
+
+object PartitionStateChangeLogMerger extends Logging {
+
+  implicit object dateBasedOrdering extends Ordering[Date] {
+    def compare(a: Date, b: Date) = a.compareTo(b)
+  }
+
+  def main(args: Array[String]) {
+
+    val parser = new OptionParser
+    val fileNameOpt = parser.accepts("file", "REQUIRED: File name.")
+                           .withRequiredArg
+                           .describedAs("file")
+                           .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    if(!options.has(fileNameOpt)) {
+      System.err.println("Missing required argument \"" + fileNameOpt + "\"")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+
+    val rgxDate = new Regex("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}")
+    val rgxTopicPartition = new Regex("\\[[A-Za-z-_]+, [0-9]+\\]")
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS")
+    var partitionsMergedLog = new mutable.HashMap[String, TreeMap[Date, ListBuffer[String]]]
+
+    val files = options.valuesOf(fileNameOpt).iterator
+
+    while (files.hasNext) {
+      val file = files.next
+      for (line <- io.Source.fromFile(file).getLines) {
+        rgxTopicPartition.findFirstIn(line) match {
+          case Some(topicPartition) =>
+            rgxDate.findFirstIn(line) match {
+              case Some(dt) =>
+                val date = dateFormat.parse(dt)
+                partitionsMergedLog.get(topicPartition) match {
+                  case Some(sortedDateLog) =>
+                    sortedDateLog.get(date) match {
+                      case Some(lines) => lines.append(line)
+                      case None => partitionsMergedLog(topicPartition) = sortedDateLog + (date -> ListBuffer[String](line))
+                    }
+                  case None => partitionsMergedLog += topicPartition -> TreeMap(date -> ListBuffer[String](line))
+                }
+              case None => // do nothing
+            }
+          case None => // do nothing
+        }
+      }
+    }
+
+    if (partitionsMergedLog.size > 0) {
+      System.err.println("Merging state change logs...")
+      System.err.println("-----------------------------------------------------------")
+    }
+    for (partitionLog <- partitionsMergedLog) {
+      for (dateLog <- partitionLog._2) {
+        for (line <- dateLog._2) {
+          System.err.println(line)
+        }
+      }
+      System.err.println("-----------------------------------------------------------")
+    }
+  }
+
+}
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1421054)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -29,6 +29,7 @@
 import kafka.common._
 import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
 import kafka.controller.KafkaController
+import kafka.tools.PartitionStateChangeLogger
 
 
 object ReplicaManager {
@@ -46,10 +47,12 @@
   private var leaderPartitions = new mutable.HashSet[Partition]()
   private val leaderPartitionsLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
-  this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
 
+  this.logIdent = "[Replica Manager on Broker " + config.brokerId + "]: "
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
+
   newGauge(
     "LeaderCount",
     new Gauge[Int] {
@@ -95,7 +98,7 @@
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
-    trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId))
+    partitionStateChangeLogger.trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId))
     val errorCode = ErrorMapping.NoError
     getReplica(topic, partitionId) match {
       case Some(replica) =>
@@ -110,7 +113,7 @@
         info("After removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
       case None => //do nothing if replica no longer exists
     }
-    trace("Finish handling stop replica [%s, %d]".format(topic, partitionId))
+    partitionStateChangeLogger.trace("Finished handling stop replica [%s, %d]".format(topic, partitionId))
     errorCode
   }
 
@@ -187,6 +190,7 @@
         " Latest known controller epoch is %d " + controllerEpoch)
       (responseMap, ErrorMapping.StaleControllerEpochCode)
     }else {
+      val controllerId = leaderAndISRRequest.controllerId
       controllerEpoch = leaderAndISRRequest.controllerEpoch
       for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) {
         var errorCode = ErrorMapping.NoError
@@ -196,9 +200,9 @@
         val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
         try {
           if(requestedLeaderId == config.brokerId)
-            makeLeader(topic, partitionId, partitionStateInfo)
+            makeLeader(controllerId, topic, partitionId, partitionStateInfo)
           else
-            makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
+            makeFollower(controllerId, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
         } catch {
           case e =>
             error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
@@ -212,9 +216,10 @@
     }
   }
 
-  private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
+  private def makeLeader(controllerId: Int, topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
-    info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
+    partitionStateChangeLogger.info("Received LeaderAndIsr request from controller %d, epoch %d, becoming Leader for partition [%s, %d]"
+      .format(controllerId, leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
     if (partition.makeLeader(topic, partitionId, leaderIsrAndControllerEpoch)) {
       // also add this partition to the list of partitions for which the leader is the current broker
@@ -222,15 +227,16 @@
         leaderPartitions += partition
       } 
     }
-    info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
+    partitionStateChangeLogger.info("Completed the leader state transition for partition [%s, %d]".format(topic, partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
+  private def makeFollower(controllerId: Int, topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
                            liveBrokers: Set[Broker]) {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
     val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader
-    info("Starting the follower state transition to follow leader %d for topic %s partition %d"
-                 .format(leaderBrokerId, topic, partitionId))
+    partitionStateChangeLogger.info(("Received LeaderAndIsr request from controller %d, epoch %d, " +
+      "starting the follower state transition to follow leader %d for partition [%s, %d]")
+      .format(controllerId, leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, leaderBrokerId, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
     if (partition.makeFollower(topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers)) {
@@ -239,6 +245,7 @@
         leaderPartitions -= partition
       }
     }
+    partitionStateChangeLogger.info("Completed the follwer state transition for partition [%s, %d]".format(topic, partitionId))
   }
 
   private def maybeShrinkIsr(): Unit = {
Index: core/src/main/scala/kafka/common/TopicAndPartition.scala
===================================================================
--- core/src/main/scala/kafka/common/TopicAndPartition.scala	(revision 1421054)
+++ core/src/main/scala/kafka/common/TopicAndPartition.scala	(working copy)
@@ -26,6 +26,6 @@
 
   def asTuple = (topic, partition)
 
-  override def toString = "[%s,%d]".format(topic, partition)
+  override def toString = "[%s, %d]".format(topic, partition)
 }
 
Index: core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala	(revision 1421054)
+++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala	(working copy)
@@ -90,6 +90,7 @@
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
     val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
@@ -107,7 +108,7 @@
     for (i <- 0 until leadersCount)
       leaders += Broker.readFrom(buffer)
 
-    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
+    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerId, controllerEpoch)
   }
 }
 
@@ -117,12 +118,13 @@
                                 ackTimeoutMs: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
                                 leaders: Set[Broker],
+                                controllerId: Int,
                                 controllerEpoch: Int)
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int, controllerEpoch: Int) = {
     this(LeaderAndIsrRequest.CurrentVersion, 0, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
-      partitionStateInfos, liveBrokers, controllerEpoch)
+      partitionStateInfos, liveBrokers, controllerId, controllerEpoch)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -130,6 +132,7 @@
     buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerId)
     buffer.putInt(controllerEpoch)
     buffer.putInt(partitionStateInfos.size)
     for((key, value) <- partitionStateInfos){
@@ -146,6 +149,7 @@
       2 /* version id */ +
       4 /* correlation id */ + 
       (2 + clientId.length) /* client id */ +
+      4 /* controller id */ +
       4 /* ack timeout */ +
       4 /* controller epoch */ +
       4 /* number of partitions */
Index: config/log4j.properties
===================================================================
--- config/log4j.properties	(revision 1421054)
+++ config/log4j.properties	(working copy)
@@ -23,6 +23,11 @@
 #log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
 #log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
 
+log4j.logger.partitionStateChangeLogger=TRACE, partitionStateChangeAppender
+log4j.appender.partitionStateChangeAppender=org.apache.log4j.FileAppender
+log4j.appender.partitionStateChangeAppender.File=kafka_partition_state_change.log
+log4j.appender.partitionStateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.partitionStateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
 # Turn on all our debugging info
 log4j.logger.kafka.perf=DEBUG
