diff --git a/config/log4j.properties b/config/log4j.properties
index 5692da0..716f3cb 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -36,6 +36,12 @@ log4j.appender.requestAppender.File=kafka-request.log
 log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
 # Turn on all our debugging info
 #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
 #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
@@ -53,5 +59,10 @@ log4j.additivity.kafka.network.RequestChannel$=false
 log4j.logger.kafka.request.logger=TRACE, requestAppender
 log4j.additivity.kafka.request.logger=false
 
-log4j.logger.kafka.controller=TRACE, stateChangeAppender
+log4j.logger.kafka.controller=TRACE, controllerAppender
 log4j.additivity.kafka.controller=false
+
+log4j.logger.partitionStateChangeLogger=TRACE, stateChangeAppender
+log4j.additivity.partitionStateChangeLogger=false
+
+
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index d146b14..2a4c287 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -93,6 +93,7 @@ object LeaderAndIsrRequest {
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
     val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
@@ -110,7 +111,7 @@ object LeaderAndIsrRequest {
     for (i <- 0 until leadersCount)
       leaders += Broker.readFrom(buffer)
 
-    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
+    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders)
   }
 }
 
@@ -118,15 +119,16 @@ case class LeaderAndIsrRequest (versionId: Short,
                                 correlationId: Int,
                                 clientId: String,
                                 ackTimeoutMs: Int,
+                                controllerId: Int,
+                                controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[Broker],
-                                controllerEpoch: Int)
+                                leaders: Set[Broker])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker],
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int,
            controllerEpoch: Int, correlationId: Int) = {
     this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
-      partitionStateInfos, liveBrokers, controllerEpoch)
+      controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -134,6 +136,7 @@ case class LeaderAndIsrRequest (versionId: Short,
     buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerId)
     buffer.putInt(controllerEpoch)
     buffer.putInt(partitionStateInfos.size)
     for((key, value) <- partitionStateInfos){
@@ -151,6 +154,7 @@ case class LeaderAndIsrRequest (versionId: Short,
       4 /* correlation id */ + 
       (2 + clientId.length) /* client id */ +
       4 /* ack timeout */ +
+      4 /* controller id */ +
       4 /* controller epoch */ +
       4 /* number of partitions */
     for((key, value) <- partitionStateInfos)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 469ac79..ca33dc1 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -52,6 +52,7 @@ class Partition(val topic: String,
    * each partition. */
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
   this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
@@ -128,11 +129,10 @@ class Partition(val topic: String,
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
+        partitionStateChangeLogger.info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discarding the become leader request"
           .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@@ -164,17 +164,14 @@ class Partition(val topic: String,
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
+        partitionStateChangeLogger.info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discarding the become follower request"
           .format(leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
       val newLeaderBrokerId: Int = leaderAndIsr.leader
-      info("Starting the follower state transition to follow leader %d for topic %s partition %d"
-        .format(newLeaderBrokerId, topic, partitionId))
       liveBrokers.find(_.id == newLeaderBrokerId) match {
         case Some(leaderBroker) =>
           // stop fetcher thread to previous leader
@@ -189,8 +186,8 @@ class Partition(val topic: String,
           // start fetcher thread to current leader
           replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
         case None => // leader went down
-          warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
-          " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
+          partitionStateChangeLogger.warn("Aborted the become follower state change since leader %d for".format(newLeaderBrokerId) +
+          " partition [%s, %d] became unavailble during the state change operation".format(topic, partitionId))
       }
       true
     }
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 63596b7..4f00234 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -26,6 +26,6 @@ case class TopicAndPartition(topic: String, partition: Int) {
 
   def asTuple = (topic, partition)
 
-  override def toString = "[%s,%d]".format(topic, partition)
+  override def toString = "[%s, %d]".format(topic, partition)
 }
 
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e2ca1d6..6b6ebaa 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -17,7 +17,7 @@
 package kafka.controller
 
 import kafka.network.{Receive, BlockingChannel}
-import kafka.utils.{Logging, ShutdownableThread}
+import kafka.utils.{PartitionStateChangeLogger, Logging, ShutdownableThread}
 import collection.mutable.HashMap
 import kafka.cluster.Broker
 import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
@@ -28,7 +28,7 @@ import kafka.api._
 class ControllerChannelManager private (config: KafkaConfig) extends Logging {
   private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
-  this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
+  this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
   def this(allBrokers: Set[Broker], config : KafkaConfig) {
     this(config)
@@ -143,11 +143,12 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit)
+class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, controllerId: Int)
   extends  Logging {
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
   val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
 
   def newBatch() {
     // raise error if the previous batch is not empty
@@ -162,10 +163,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) {
     brokerIds.foreach { brokerId =>
-      leaderAndIsrRequestMap.getOrElseUpdate(brokerId,
-                                             new mutable.HashMap[(String, Int), PartitionStateInfo])
-      leaderAndIsrRequestMap(brokerId).put((topic, partition),
-                                           PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
+      leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
+      leaderAndIsrRequestMap(brokerId).put((topic, partition), PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
     }
   }
 
@@ -190,8 +189,9 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
       val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch, correlationId)
-      debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId)
+      partitionStateInfos.foreach(p => partitionStateChangeLogger.debug(("Controller %d, epoch %d sending " +
+        "LeaderAndIsr request to broker %d for partition [%s, %d]").format(controllerId, controllerEpoch, broker, p._1._1, p._1._2)))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
     leaderAndIsrRequestMap.clear()
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4d253da..197c840 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -89,7 +89,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, config.brokerId)
   registerControllerChangedListener()
 
   newGauge(
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 4078604..0fb5ae7 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -21,7 +21,7 @@ import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
 import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
-import kafka.utils.{Logging, ZkUtils}
+import kafka.utils.{PartitionStateChangeLogger, Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 
@@ -38,14 +38,16 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
  *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
  */
 class PartitionStateMachine(controller: KafkaController) extends Logging {
-  this.logIdent = "[Partition state machine on Controller " + controller.config.brokerId + "]: "
   private val controllerContext = controller.controllerContext
+  private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val isShuttingDown = new AtomicBoolean(false)
 
+  this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
   /**
    * Invoked on successful controller election. First registers a topic change listener since that triggers all
    * state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
@@ -126,12 +128,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       targetState match {
         case NewPartition =>
           // pre: partition did not exist before this
-          // post: partition has been assigned replicas
           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
           assignReplicasToPartitions(topic, partition)
           partitionState.put(topicAndPartition, NewPartition)
-          info("Partition %s state changed from NotExists to New with assigned replicas ".format(topicAndPartition) +
+          partitionStateChangeLogger.info("Partition %s state changed from NotExists to New with assigned replicas ".format(topicAndPartition) +
             "%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")))
+          // post: partition has been assigned replicas
         case OnlinePartition =>
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
           partitionState(topicAndPartition) match {
@@ -144,26 +146,26 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
               electLeaderForPartition(topic, partition, leaderSelector)
             case _ => // should never come here since illegal previous states are checked above
           }
-          info("Partition %s state changed from %s to OnlinePartition with leader %d".format(topicAndPartition,
-            partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
           partitionState.put(topicAndPartition, OnlinePartition)
+          partitionStateChangeLogger.info("Partition %s state changed from %s to OnlinePartition with leader %d".format(topicAndPartition,
+            partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
            // post: partition has a leader
         case OfflinePartition =>
-          // pre: partition should be in Online state
+          // pre: partition should be in New or Online state
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
           // should be called when the leader for a partition is no longer alive
-          info("Partition %s state changed from Online to Offline".format(topicAndPartition))
+          partitionStateChangeLogger.info("Partition %s state changed from Online to Offline".format(topicAndPartition))
           partitionState.put(topicAndPartition, OfflinePartition)
           // post: partition has no alive leader
         case NonExistentPartition =>
-          // pre: partition could be in either of the above states
+          // pre: partition should be in Offline state
           assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
-          info("Partition %s state changed from Offline to NotExists".format(topicAndPartition))
+          partitionStateChangeLogger.info("Partition %s state changed from Offline to NotExists".format(topicAndPartition))
           partitionState.put(topicAndPartition, NonExistentPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
     } catch {
-      case t: Throwable => error("State change for partition %s ".format(topicAndPartition) +
+      case t: Throwable => partitionStateChangeLogger.error("State change for partition %s ".format(topicAndPartition) +
         "from %s to %s failed".format(currState, targetState), t)
     }
   }
@@ -268,7 +270,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
     val topicAndPartition = TopicAndPartition(topic, partition)
     // handle leader election for the partitions whose leader is no longer alive
-    info("Electing leader for partition %s".format(topicAndPartition))
+    partitionStateChangeLogger.info("Started leader election for partition %s".format(topicAndPartition))
     try {
       var zookeeperPathUpdateSucceeded: Boolean = false
       var newLeaderAndIsr: LeaderAndIsr = null
@@ -294,7 +296,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
       // update the leader cache
       controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
-      info("Elected leader %d for Offline partition %s".format(newLeaderAndIsr.leader, topicAndPartition))
+      partitionStateChangeLogger.info("Elected leader %d for Offline partition %s".format(newLeaderAndIsr.leader, topicAndPartition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
@@ -362,7 +364,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   class PartitionChangeListener(topic: String) extends IZkChildListener with Logging {
-    this.logIdent = "[Controller " + controller.config.brokerId + "], "
+    this.logIdent = "[Controller " + controller.config.brokerId + "]: "
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, children : java.util.List[String]) {
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 20d9c4f..e17e19e 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -20,7 +20,7 @@ import collection._
 import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.{TopicAndPartition, StateChangeFailedException}
-import kafka.utils.{ZkUtils, Logging}
+import kafka.utils.{PartitionStateChangeLogger, ZkUtils, Logging}
 import org.I0Itec.zkclient.IZkChildListener
 
 /**
@@ -37,12 +37,13 @@ import org.I0Itec.zkclient.IZkChildListener
  * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica
  */
 class ReplicaStateMachine(controller: KafkaController) extends Logging {
-  this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
   private val controllerContext = controller.controllerContext
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId)
   private val isShuttingDown = new AtomicBoolean(false)
+  this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
 
   /**
    * Invoked on successful controller election. First registers a broker change listener since that triggers all
@@ -117,17 +118,16 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
-          info("Replica %d for partition %s state changed to NewReplica".format(replicaId, topicAndPartition))
+          partitionStateChangeLogger.info("Replica %d for partition %s state changed to NewReplica".format(replicaId, topicAndPartition))
         case NonExistentReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
           // send stop replica command
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-          controllerContext.partitionReplicaAssignment.put(topicAndPartition,
-            currentAssignedReplicas.filterNot(_ == replicaId))
-          info("Replica %d for partition %s state changed to NonExistentReplica".format(replicaId, topicAndPartition))
+          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
           replicaState.remove((topic, partition, replicaId))
+          partitionStateChangeLogger.info("Replica %d for partition %s state changed to NonExistentReplica".format(replicaId, topicAndPartition))
         case OnlineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
           replicaState((topic, partition, replicaId)) match {
@@ -135,7 +135,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               // add this replica to the assigned replicas list for its partition
               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
               controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
-              info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
+              partitionStateChangeLogger.info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
             case _ =>
               // check if the leader for this partition is alive or even exists
                 controllerContext.allLeaders.get(topicAndPartition) match {
@@ -146,7 +146,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                                                                           topic, partition, leaderIsrAndControllerEpoch,
                                                                           replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OnlineReplica)
-                      info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
+                      partitionStateChangeLogger.info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
                     case false => // ignore partitions whose leader is not alive
                   }
                 case None => // ignore partitions who don't have a leader yet
@@ -167,8 +167,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                         topic, partition, updatedLeaderIsrAndControllerEpoch,
                         replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OfflineReplica)
-                      info("Replica %d for partition %s state changed to OfflineReplica".format(replicaId, topicAndPartition))
-                      info("Removed offline replica %d from ISR for partition %s".format(replicaId, topicAndPartition))
+                      partitionStateChangeLogger.info("Replica %d for partition %s state changed to OfflineReplica".format(replicaId, topicAndPartition))
                       false
                     case None =>
                       true
@@ -184,7 +183,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       }
     }
     catch {
-      case t: Throwable => error("Error while changing state of replica %d for partition ".format(replicaId) +
+      case t: Throwable => partitionStateChangeLogger.error("Error while changing state of replica %d for partition ".format(replicaId) +
         "[%s, %d] to %s".format(topic, partition, targetState), t)
     }
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f7fe0de..f430112 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -46,10 +46,11 @@ class ReplicaManager(val config: KafkaConfig,
   private var leaderPartitions = new mutable.HashSet[Partition]()
   private val leaderPartitionsLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
-  this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
   private var hwThreadInitialized = false
+  this.logIdent = "[Replica Manager on Broker " + config.brokerId + "]: "
+  private val partitionStateChangeLogger = new PartitionStateChangeLogger(this.logIdent)
 
   newGauge(
     "LeaderCount",
@@ -190,10 +191,11 @@ class ReplicaManager(val config: KafkaConfig,
     info("Handling leader and isr request %s".format(leaderAndISRRequest))
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
     if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-      error("Received leader and isr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) +
+      error("Received LeaderAndIsr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) +
         " Latest known controller epoch is %d " + controllerEpoch)
       (responseMap, ErrorMapping.StaleControllerEpochCode)
     }else {
+      val controllerId = leaderAndISRRequest.controllerId
       controllerEpoch = leaderAndISRRequest.controllerEpoch
       for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) {
         var errorCode = ErrorMapping.NoError
@@ -203,9 +205,9 @@ class ReplicaManager(val config: KafkaConfig,
         val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
         try {
           if(requestedLeaderId == config.brokerId)
-            makeLeader(topic, partitionId, partitionStateInfo)
+            makeLeader(controllerId, topic, partitionId, partitionStateInfo)
           else
-            makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
+            makeFollower(controllerId, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
         } catch {
           case e =>
             error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
@@ -225,9 +227,10 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
+  private def makeLeader(controllerId: Int, topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
-    info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
+    partitionStateChangeLogger.info("Received LeaderAndIsr request from controller %d, epoch %d, starting leader state transition for partition [%s, %d]"
+      .format(controllerId, leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
     if (partition.makeLeader(topic, partitionId, leaderIsrAndControllerEpoch)) {
       // also add this partition to the list of partitions for which the leader is the current broker
@@ -235,15 +238,15 @@ class ReplicaManager(val config: KafkaConfig,
         leaderPartitions += partition
       } 
     }
-    info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
+    partitionStateChangeLogger.info("Completed leader state transition for partition [%s, %d]".format(topic, partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
+  private def makeFollower(controllerId: Int, topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
                            liveBrokers: Set[Broker]) {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
     val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader
-    info("Starting the follower state transition to follow leader %d for topic %s partition %d"
-                 .format(leaderBrokerId, topic, partitionId))
+    partitionStateChangeLogger.info(("Received LeaderAndIsr request from controller %d, epoch %d, starting follower state transition to follow leader %d for partition [%s, %d]")
+      .format(controllerId, leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, leaderBrokerId, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
     if (partition.makeFollower(topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers)) {
@@ -252,6 +255,7 @@ class ReplicaManager(val config: KafkaConfig,
         leaderPartitions -= partition
       }
     }
+    partitionStateChangeLogger.info("Completed follwer state transition for partition [%s, %d]".format(topic, partitionId))
   }
 
   private def maybeShrinkIsr(): Unit = {
diff --git a/core/src/main/scala/kafka/tools/PartitionStateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/PartitionStateChangeLogMerger.scala
new file mode 100644
index 0000000..577935e
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/PartitionStateChangeLogMerger.scala
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple._
+import scala.util.matching.Regex
+import collection.mutable.ListBuffer
+import collection.mutable
+import java.util.Date
+import java.text.SimpleDateFormat
+import kafka.utils.Logging
+
+object PartitionStateChangeLogMerger extends Logging {
+
+  implicit object dateBasedOrdering extends Ordering[Date] {
+    def compare(a: Date, b: Date) = a.compareTo(b)
+  }
+
+  def main(args: Array[String]) {
+
+    val parser = new OptionParser
+    val fileNameOpt = parser.accepts("stateChangeLog", "REQUIRED: State Change Log from a server")
+      .withRequiredArg
+      .describedAs("stateChangeLog")
+      .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    if(!options.has(fileNameOpt)) {
+      System.err.println("Missing required argument \"" + fileNameOpt + "\"")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+
+    val rgxDate = new Regex("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}")
+    val rgxTopicPartition = new Regex("\\[[A-Za-z-_]+, [0-9]+\\]")
+    val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS")
+    var partitionsMergedLog = new mutable.HashMap[String, mutable.HashMap[Date, ListBuffer[String]]]
+
+    val files = options.valuesOf(fileNameOpt).iterator
+
+    while (files.hasNext) {
+      val file = files.next
+      for (line <- io.Source.fromFile(file).getLines) {
+        rgxTopicPartition.findFirstIn(line) match {
+          case Some(topicPartition) =>
+            rgxDate.findFirstIn(line) match {
+              case Some(dt) =>
+                val date = dateFormat.parse(dt)
+                partitionsMergedLog.get(topicPartition) match {
+                  case Some(dateLog) =>
+                    dateLog.get(date) match {
+                      case Some(lines) => lines.append(line)
+                      case None => dateLog += date -> ListBuffer[String](line)
+                    }
+                  case None => partitionsMergedLog += topicPartition -> mutable.HashMap(date -> ListBuffer[String](line))
+                }
+              case None => // do nothing
+            }
+          case None => // do nothing
+        }
+      }
+    }
+
+    if (partitionsMergedLog.size > 0) {
+      System.out.println("Merging state change logs...")
+      System.out.println("-----------------------------------------------------------")
+    }
+    for (partitionLog <- partitionsMergedLog) {
+      for (dateLog <- partitionLog._2.toSeq.sortBy(_._1)) {
+        for (line <- dateLog._2) {
+          System.out.println(line)
+        }
+      }
+      System.out.println("-----------------------------------------------------------")
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/utils/PartitionStateChangeLogger.scala b/core/src/main/scala/kafka/utils/PartitionStateChangeLogger.scala
new file mode 100644
index 0000000..7acc754
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/PartitionStateChangeLogger.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.apache.log4j.Logger
+
+class PartitionStateChangeLogger(ident: String) extends Logging {
+  override lazy val logger = Logger.getLogger("partitionStateChangeLogger")
+  logIdent = ident
+}
diff --git a/core/src/main/scala/kafka/utils/Topic.scala b/core/src/main/scala/kafka/utils/Topic.scala
deleted file mode 100644
index e69de29..0000000
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 26f31ec..f3efda3 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -89,7 +89,7 @@ object SerializationTestUtils{
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1, 0)
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0)
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 129bc56..be82592 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -122,7 +122,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
 
     // start another controller
-    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
+    val controllerId = 2
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
     val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
     controllerChannelManager.startup()
@@ -131,7 +132,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     leaderAndIsr.put((topic, partitionId),
       new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
     val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
-    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch, 0)
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0)
 
     controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
     TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
