diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index decba0e..e055d67 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -80,4 +80,4 @@ else
   JAVA="$JAVA_HOME/bin/java"
 fi
 
-$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@
+$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH "$@"
diff --git a/config/log4j.properties b/config/log4j.properties
index 5692da0..719d792 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -36,6 +36,12 @@ log4j.appender.requestAppender.File=kafka-request.log
 log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
 # Turn on all our debugging info
 #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
 #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
@@ -53,5 +59,10 @@ log4j.additivity.kafka.network.RequestChannel$=false
 log4j.logger.kafka.request.logger=TRACE, requestAppender
 log4j.additivity.kafka.request.logger=false
 
-log4j.logger.kafka.controller=TRACE, stateChangeAppender
+log4j.logger.kafka.controller=TRACE, controllerAppender
 log4j.additivity.kafka.controller=false
+
+log4j.logger.stateChangeLogger=TRACE, stateChangeAppender
+log4j.additivity.stateChangeLogger=false
+
+
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index dc4ed8e..a807c1f 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -59,13 +59,13 @@ object FetchRequest {
 }
 
 case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
-                                        correlationId: Int = FetchRequest.DefaultCorrelationId,
+                                        override val correlationId: Int = FetchRequest.DefaultCorrelationId,
                                         clientId: String = ConsumerConfig.DefaultClientId,
                                         replicaId: Int = Request.OrdinaryConsumerId,
                                         maxWait: Int = FetchRequest.DefaultMaxWait,
                                         minBytes: Int = FetchRequest.DefaultMinBytes,
                                         requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
-        extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
+        extends RequestOrResponse(Some(RequestKeys.FetchKey), correlationId) {
 
   /**
    * Partitions the request info into a map of maps (one for each topic).
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index d146b14..b40522d 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -93,6 +93,7 @@ object LeaderAndIsrRequest {
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
+    val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
     val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
@@ -110,23 +111,24 @@ object LeaderAndIsrRequest {
     for (i <- 0 until leadersCount)
       leaders += Broker.readFrom(buffer)
 
-    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
+    new LeaderAndIsrRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch, partitionStateInfos.toMap, leaders)
   }
 }
 
 case class LeaderAndIsrRequest (versionId: Short,
-                                correlationId: Int,
+                                override val correlationId: Int,
                                 clientId: String,
                                 ackTimeoutMs: Int,
+                                controllerId: Int,
+                                controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[Broker],
-                                controllerEpoch: Int)
-        extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
+                                leaders: Set[Broker])
+    extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
 
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker],
+  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int,
            controllerEpoch: Int, correlationId: Int) = {
     this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
-      partitionStateInfos, liveBrokers, controllerEpoch)
+      controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
   }
 
   def writeTo(buffer: ByteBuffer) {
@@ -134,6 +136,7 @@ case class LeaderAndIsrRequest (versionId: Short,
     buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerId)
     buffer.putInt(controllerEpoch)
     buffer.putInt(partitionStateInfos.size)
     for((key, value) <- partitionStateInfos){
@@ -151,6 +154,7 @@ case class LeaderAndIsrRequest (versionId: Short,
       4 /* correlation id */ + 
       (2 + clientId.length) /* client id */ +
       4 /* ack timeout */ +
+      4 /* controller id */ +
       4 /* controller epoch */ +
       4 /* number of partitions */
     for((key, value) <- partitionStateInfos)
@@ -165,10 +169,11 @@ case class LeaderAndIsrRequest (versionId: Short,
     val leaderAndIsrRequest = new StringBuilder
     leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName)
     leaderAndIsrRequest.append("; Version: " + versionId)
+    leaderAndIsrRequest.append("; Controller: " + controllerId)
+    leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
     leaderAndIsrRequest.append("; CorrelationId: " + correlationId)
     leaderAndIsrRequest.append("; ClientId: " + clientId)
     leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
-    leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
     leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(","))
     leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(","))
     leaderAndIsrRequest.toString()
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
index dbd85d0..b4cfae8 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
@@ -41,10 +41,10 @@ object LeaderAndIsrResponse {
 }
 
 
-case class LeaderAndIsrResponse(correlationId: Int,
+case class LeaderAndIsrResponse(override val correlationId: Int,
                                 responseMap: Map[(String, Int), Short],
                                 errorCode: Short = ErrorMapping.NoError)
-        extends RequestOrResponse {
+    extends RequestOrResponse(correlationId = correlationId) {
   def sizeInBytes(): Int ={
     var size =
       4 /* correlation id */ + 
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 6360a98..32ebfd4 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -57,10 +57,10 @@ case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
 
 case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
                          versionId: Short = OffsetRequest.CurrentVersion,
-                         correlationId: Int = 0,
+                         override val correlationId: Int = 0,
                          clientId: String = OffsetRequest.DefaultClientId,
                          replicaId: Int = Request.OrdinaryConsumerId)
-        extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
+    extends RequestOrResponse(Some(RequestKeys.OffsetsKey), correlationId) {
 
   def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId)
 
diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala
index 264e200..08dc3cd 100644
--- a/core/src/main/scala/kafka/api/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetResponse.scala
@@ -47,9 +47,9 @@ object OffsetResponse {
 case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long])
 
 
-case class OffsetResponse(correlationId: Int,
+case class OffsetResponse(override val correlationId: Int,
                           partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
-        extends RequestOrResponse {
+    extends RequestOrResponse(correlationId = correlationId) {
 
   lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
 
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 916fb59..1e05d7e 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -54,12 +54,12 @@ object ProducerRequest {
 }
 
 case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
-                           correlationId: Int,
+                           override val correlationId: Int,
                            clientId: String,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
                            data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
-    extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
+    extends RequestOrResponse(Some(RequestKeys.ProduceKey), correlationId) {
 
   /**
    * Partitions the data into a map of maps (one for each topic).
diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala
index 5bff709..d59c5bb 100644
--- a/core/src/main/scala/kafka/api/ProducerResponse.scala
+++ b/core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -43,8 +43,9 @@ object ProducerResponse {
 
 case class ProducerResponseStatus(error: Short, offset: Long)
 
-case class ProducerResponse(correlationId: Int,
-                            status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse {
+case class ProducerResponse(override val correlationId: Int,
+                            status: Map[TopicAndPartition, ProducerResponseStatus])
+    extends RequestOrResponse(correlationId = correlationId) {
 
   /**
    * Partitions the status map into a map of maps (one for each topic).
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 3175e1c..b62330b 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -27,7 +27,7 @@ object Request {
 }
 
 
-private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Logging{
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None, val correlationId: Int) extends Logging{
 
   def sizeInBytes: Int
   
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index be3c7be..5107488 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -53,13 +53,13 @@ object StopReplicaRequest extends Logging {
 }
 
 case class StopReplicaRequest(versionId: Short,
-                              correlationId: Int,
+                              override val correlationId: Int,
                               clientId: String,
                               ackTimeoutMs: Int,
                               deletePartitions: Boolean,
                               partitions: Set[(String, Int)],
                               controllerEpoch: Int)
-        extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
+        extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) {
 
   def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int, correlationId: Int) = {
     this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
index fa66b99..c82eadd 100644
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
@@ -42,9 +42,10 @@ object StopReplicaResponse {
 }
 
 
-case class StopReplicaResponse(val correlationId: Int,
+case class StopReplicaResponse(override val correlationId: Int,
                                val responseMap: Map[(String, Int), Short],
-                               val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
+                               val errorCode: Short = ErrorMapping.NoError)
+    extends RequestOrResponse(correlationId = correlationId) {
   def sizeInBytes(): Int ={
     var size =
       4 /* correlation id */ + 
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 88007b1..7477cfd 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -47,10 +47,10 @@ object TopicMetadataRequest extends Logging {
 }
 
 case class TopicMetadataRequest(val versionId: Short,
-                                val correlationId: Int,
+                                override val correlationId: Int,
                                 val clientId: String,
                                 val topics: Seq[String])
- extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
+ extends RequestOrResponse(Some(RequestKeys.MetadataKey), correlationId){
 
   def this(topics: Seq[String], correlationId: Int) =
     this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics)
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index af76776..290f263 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -34,7 +34,8 @@ object TopicMetadataResponse {
 }
 
 case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata],
-                                 correlationId: Int) extends RequestOrResponse {
+                                 override val correlationId: Int)
+    extends RequestOrResponse(correlationId = correlationId) {
   val sizeInBytes: Int = {
     val brokers = extractBrokers(topicsMetadata).values
     4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 469ac79..b3c4b2a 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -51,7 +51,8 @@ class Partition(val topic: String,
    * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
    * each partition. */
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
-  this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
+  this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
+  private val stateChangeLogger = new StateChangeLogger(this.logIdent)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
@@ -124,15 +125,17 @@ class Partition(val topic: String,
    *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
    *  4. set the new leader and ISR
    */
-  def makeLeader(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Boolean = {
+  def makeLeader(controllerId: Int, topic: String, partitionId: Int,
+                 leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
-          .format(leaderEpoch, leaderAndIsr.leaderEpoch))
+        stateChangeLogger.trace(("Broker %d discarded the become-leader request with correlation id %d from " +
+                                 "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the requested leader epoch %d")
+                                   .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic,
+                                           partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@@ -159,22 +162,21 @@ class Partition(val topic: String,
    *  3. set the leader and set ISR to empty
    *  4. start a fetcher to the new leader
    */
-  def makeFollower(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                   liveBrokers: Set[Broker]): Boolean = {
+  def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                   liveBrokers: Set[Broker], correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-      if (leaderEpoch >= leaderAndIsr.leaderEpoch){
-        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
-          .format(leaderEpoch, leaderAndIsr.leaderEpoch))
+      if (leaderEpoch >= leaderAndIsr.leaderEpoch) {
+        stateChangeLogger.trace(("Broker %d discarded the become-follower request with correlation id %d from " +
+                                 "controller %d epoch %d for partition [%s,%d] since current leader epoch %d is >= the requested leader epoch %d")
+                                   .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, topic,
+                                           partitionId, leaderEpoch, leaderAndIsr.leaderEpoch))
         return false
       }
-      trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
       val newLeaderBrokerId: Int = leaderAndIsr.leader
-      info("Starting the follower state transition to follow leader %d for topic %s partition %d"
-        .format(newLeaderBrokerId, topic, partitionId))
       liveBrokers.find(_.id == newLeaderBrokerId) match {
         case Some(leaderBroker) =>
           // stop fetcher thread to previous leader
@@ -189,8 +191,8 @@ class Partition(val topic: String,
           // start fetcher thread to current leader
           replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
         case None => // leader went down
-          warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
-          " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
+          stateChangeLogger.trace(("Aborted the become-follower state change since leader %d for partition [%s,%d] " +
+                                   "became unavailble during the state change operation").format(newLeaderBrokerId, topic, partitionId))
       }
       true
     }
@@ -198,7 +200,7 @@ class Partition(val topic: String,
 
   def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
     leaderIsrUpdateLock synchronized {
-      debug("Recording follower %d position %d for topic %s partition %d.".format(replicaId, offset, topic, partitionId))
+      debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
       val replica = getOrCreateReplica(replicaId)
       replica.logEndOffset = offset
 
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 5bd8f6b..c1b9f65 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -20,7 +20,7 @@ package kafka.common
 import util.matching.Regex
 
 object Topic {
-  private val legalChars = "[a-zA-Z0-9\\._\\-]"
+  val legalChars = "[a-zA-Z0-9\\._\\-]"
   private val maxNameLength = 255
   private val rgx = new Regex(legalChars + "+")
 
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e2ca1d6..4759b0f 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -17,7 +17,7 @@
 package kafka.controller
 
 import kafka.network.{Receive, BlockingChannel}
-import kafka.utils.{Logging, ShutdownableThread}
+import kafka.utils.{StateChangeLogger, Logging, ShutdownableThread}
 import collection.mutable.HashMap
 import kafka.cluster.Broker
 import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
@@ -28,7 +28,7 @@ import kafka.api._
 class ControllerChannelManager private (config: KafkaConfig) extends Logging {
   private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
-  this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
+  this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
   def this(allBrokers: Set[Broker], config : KafkaConfig) {
     this(config)
@@ -110,6 +110,7 @@ class RequestSendThread(val controllerId: Int,
                         val channel: BlockingChannel)
   extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
   private val lock = new Object()
+  private val stateChangeLogger = new StateChangeLogger(this.logIdent)
 
   override def doWork(): Unit = {
     val queueItem = queue.take()
@@ -129,7 +130,8 @@ class RequestSendThread(val controllerId: Int,
           case RequestKeys.StopReplicaKey =>
             response = StopReplicaResponse.readFrom(receive.buffer)
         }
-        trace("Controller %d request to broker %d got a response %s".format(controllerId, toBrokerId, response))
+        stateChangeLogger.trace("Controller %d received response correlationId %d for a request sent to broker %d"
+                                  .format(controllerId, response.correlationId, toBrokerId))
 
         if(callback != null){
           callback(response)
@@ -143,11 +145,12 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit)
+class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, controllerId: Int)
   extends  Logging {
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
   val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+  private val stateChangeLogger = new StateChangeLogger(this.logIdent)
 
   def newBatch() {
     // raise error if the previous batch is not empty
@@ -162,10 +165,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) {
     brokerIds.foreach { brokerId =>
-      leaderAndIsrRequestMap.getOrElseUpdate(brokerId,
-                                             new mutable.HashMap[(String, Int), PartitionStateInfo])
-      leaderAndIsrRequestMap(brokerId).put((topic, partition),
-                                           PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
+      leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
+      leaderAndIsrRequestMap(brokerId).put((topic, partition), PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
     }
   }
 
@@ -190,8 +191,13 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
       val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch, correlationId)
-      debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId)
+      for (p <- partitionStateInfos) {
+        val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
+        stateChangeLogger.trace(
+          "Controller %d, epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d for partition [%s,%d]"
+            .format(controllerId, controllerEpoch, typeOfRequest, correlationId, broker, p._1._1, p._1._2))
+      }
       sendRequest(broker, leaderAndIsrRequest, null)
     }
     leaderAndIsrRequestMap.clear()
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 48eae7e..ff3bf8e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -89,7 +89,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, config.brokerId)
   registerControllerChangedListener()
 
   newGauge(
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 4078604..d69a184 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -21,7 +21,7 @@ import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
 import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
-import kafka.utils.{Logging, ZkUtils}
+import kafka.utils.{StateChangeLogger, Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 
@@ -38,14 +38,16 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
  *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
  */
 class PartitionStateMachine(controller: KafkaController) extends Logging {
-  this.logIdent = "[Partition state machine on Controller " + controller.config.brokerId + "]: "
   private val controllerContext = controller.controllerContext
+  private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val isShuttingDown = new AtomicBoolean(false)
 
+  this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
+  private val stateChangeLogger = new StateChangeLogger(this.logIdent)
   /**
    * Invoked on successful controller election. First registers a topic change listener since that triggers all
    * state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
@@ -126,12 +128,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       targetState match {
         case NewPartition =>
           // pre: partition did not exist before this
-          // post: partition has been assigned replicas
           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
           assignReplicasToPartitions(topic, partition)
           partitionState.put(topicAndPartition, NewPartition)
-          info("Partition %s state changed from NotExists to New with assigned replicas ".format(topicAndPartition) +
-            "%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")))
+          val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from NotExists to New with assigned replicas %s"
+                                    .format(controllerId, controller.epoch, topicAndPartition, assignedReplicas))
+          // post: partition has been assigned replicas
         case OnlinePartition =>
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
           partitionState(topicAndPartition) match {
@@ -144,27 +147,31 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
               electLeaderForPartition(topic, partition, leaderSelector)
             case _ => // should never come here since illegal previous states are checked above
           }
-          info("Partition %s state changed from %s to OnlinePartition with leader %d".format(topicAndPartition,
-            partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
           partitionState.put(topicAndPartition, OnlinePartition)
+          val leader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to OnlinePartition with leader %d"
+                                    .format(controllerId, controller.epoch, topicAndPartition, partitionState(topicAndPartition), leader))
            // post: partition has a leader
         case OfflinePartition =>
-          // pre: partition should be in Online state
+          // pre: partition should be in New or Online state
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
           // should be called when the leader for a partition is no longer alive
-          info("Partition %s state changed from Online to Offline".format(topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Online to Offline"
+                                    .format(controllerId, controller.epoch, topicAndPartition))
           partitionState.put(topicAndPartition, OfflinePartition)
           // post: partition has no alive leader
         case NonExistentPartition =>
-          // pre: partition could be in either of the above states
+          // pre: partition should be in Offline state
           assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
-          info("Partition %s state changed from Offline to NotExists".format(topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Offline to NotExists"
+                                    .format(controllerId, controller.epoch, topicAndPartition))
           partitionState.put(topicAndPartition, NonExistentPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
     } catch {
-      case t: Throwable => error("State change for partition %s ".format(topicAndPartition) +
-        "from %s to %s failed".format(currState, targetState), t)
+      case t: Throwable =>
+        stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
+                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
     }
   }
 
@@ -268,7 +275,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
     val topicAndPartition = TopicAndPartition(topic, partition)
     // handle leader election for the partitions whose leader is no longer alive
-    info("Electing leader for partition %s".format(topicAndPartition))
+    stateChangeLogger.trace("Controller %d epoch %d started leader election for partition %s"
+                              .format(controllerId, controller.epoch, topicAndPartition))
     try {
       var zookeeperPathUpdateSucceeded: Boolean = false
       var newLeaderAndIsr: LeaderAndIsr = null
@@ -294,7 +302,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
       // update the leader cache
       controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
-      info("Elected leader %d for Offline partition %s".format(newLeaderAndIsr.leader, topicAndPartition))
+      stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
+                                .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
@@ -362,7 +371,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   class PartitionChangeListener(topic: String) extends IZkChildListener with Logging {
-    this.logIdent = "[Controller " + controller.config.brokerId + "], "
+    this.logIdent = "[Controller " + controller.config.brokerId + "]: "
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, children : java.util.List[String]) {
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 20d9c4f..0996b61 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -20,7 +20,7 @@ import collection._
 import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.{TopicAndPartition, StateChangeFailedException}
-import kafka.utils.{ZkUtils, Logging}
+import kafka.utils.{StateChangeLogger, ZkUtils, Logging}
 import org.I0Itec.zkclient.IZkChildListener
 
 /**
@@ -37,12 +37,14 @@ import org.I0Itec.zkclient.IZkChildListener
  * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica
  */
 class ReplicaStateMachine(controller: KafkaController) extends Logging {
-  this.logIdent = "[Replica state machine on Controller " + controller.config.brokerId + "]: "
   private val controllerContext = controller.controllerContext
+  private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId)
   private val isShuttingDown = new AtomicBoolean(false)
+  this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
+  private val stateChangeLogger = new StateChangeLogger(this.logIdent)
 
   /**
    * Invoked on successful controller election. First registers a broker change listener since that triggers all
@@ -117,17 +119,18 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
-          info("Replica %d for partition %s state changed to NewReplica".format(replicaId, topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NewReplica"
+                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
         case NonExistentReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
           // send stop replica command
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-          controllerContext.partitionReplicaAssignment.put(topicAndPartition,
-            currentAssignedReplicas.filterNot(_ == replicaId))
-          info("Replica %d for partition %s state changed to NonExistentReplica".format(replicaId, topicAndPartition))
+          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
           replicaState.remove((topic, partition, replicaId))
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
+                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
         case OnlineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
           replicaState((topic, partition, replicaId)) match {
@@ -135,7 +138,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               // add this replica to the assigned replicas list for its partition
               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
               controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
-              info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
+              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
+                                        .format(controllerId, controller.epoch, replicaId, topicAndPartition))
             case _ =>
               // check if the leader for this partition is alive or even exists
                 controllerContext.allLeaders.get(topicAndPartition) match {
@@ -146,7 +150,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                                                                           topic, partition, leaderIsrAndControllerEpoch,
                                                                           replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OnlineReplica)
-                      info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
+                      stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
+                                                .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                     case false => // ignore partitions whose leader is not alive
                   }
                 case None => // ignore partitions who don't have a leader yet
@@ -167,8 +172,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                         topic, partition, updatedLeaderIsrAndControllerEpoch,
                         replicaAssignment.size)
                       replicaState.put((topic, partition, replicaId), OfflineReplica)
-                      info("Replica %d for partition %s state changed to OfflineReplica".format(replicaId, topicAndPartition))
-                      info("Removed offline replica %d from ISR for partition %s".format(replicaId, topicAndPartition))
+                      stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
+                                                .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                       false
                     case None =>
                       true
@@ -184,15 +189,16 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       }
     }
     catch {
-      case t: Throwable => error("Error while changing state of replica %d for partition ".format(replicaId) +
-        "[%s, %d] to %s".format(topic, partition, targetState), t)
+      case t: Throwable =>
+        stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] to %s failed"
+                                  .format(controllerId, controller.epoch, replicaId, topic, partition, targetState), t)
     }
   }
 
   private def assertValidPreviousStates(topic: String, partition: Int, replicaId: Int, fromStates: Seq[ReplicaState],
                                         targetState: ReplicaState) {
     assert(fromStates.contains(replicaState((topic, partition, replicaId))),
-      "Replica %s for partition [%s, %d] should be in the %s states before moving to %s state"
+      "Replica %s for partition [%s,%d] should be in the %s states before moving to %s state"
         .format(replicaId, topic, partition, fromStates.mkString(","), targetState) +
         ". Instead it is in %s state".format(replicaState((topic, partition, replicaId))))
   }
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 3d92569..5f80df7 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -21,9 +21,11 @@ import java.nio.ByteBuffer
 import scala.collection.JavaConversions
 
 class TopicMetadataRequest(val versionId: Short,
-                           val correlationId: Int,
+                           override val correlationId: Int,
                            val clientId: String,
-                           val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
+                           val topics: java.util.List[String])
+    extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) {
+
   val underlying: kafka.api.TopicMetadataRequest =
     new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics))
 
@@ -36,4 +38,5 @@ class TopicMetadataRequest(val versionId: Short,
   def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
 
   def sizeInBytes: Int = underlying.sizeInBytes()
+
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f7fe0de..e33c9fa 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -46,10 +46,11 @@ class ReplicaManager(val config: KafkaConfig,
   private var leaderPartitions = new mutable.HashSet[Partition]()
   private val leaderPartitionsLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
-  this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
   private var hwThreadInitialized = false
+  this.logIdent = "[Replica Manager on Broker " + config.brokerId + "]: "
+  private val stateChangeLogger = new StateChangeLogger(this.logIdent)
 
   newGauge(
     "LeaderCount",
@@ -102,7 +103,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
-    trace("Handling stop replica for partition [%s, %d]".format(topic, partitionId))
+    stateChangeLogger.trace("Handling stop replica for partition [%s,%d]".format(topic, partitionId))
     val errorCode = ErrorMapping.NoError
     getReplica(topic, partitionId) match {
       case Some(replica) =>
@@ -114,10 +115,10 @@ class ReplicaManager(val config: KafkaConfig,
           leaderPartitions -= replica.partition
         }
         allPartitions.remove((topic, partitionId))
-        info("After removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
+        info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic, partitionId, allPartitions))
       case None => //do nothing if replica no longer exists
     }
-    trace("Finish handling stop replica [%s, %d]".format(topic, partitionId))
+    stateChangeLogger.trace("Finish handling stop replica [%s,%d]".format(topic, partitionId))
     errorCode
   }
 
@@ -160,7 +161,7 @@ class ReplicaManager(val config: KafkaConfig,
     if(replicaOpt.isDefined)
       return replicaOpt.get
     else
-      throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s, %d] yet".format(config.brokerId, topic, partition))
+      throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s,%d] yet".format(config.brokerId, topic, partition))
   }
 
   def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica =  {
@@ -187,13 +188,19 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
-    info("Handling leader and isr request %s".format(leaderAndISRRequest))
+    leaderAndISRRequest.partitionStateInfos.foreach(p =>
+      stateChangeLogger.trace("Handling LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]"
+                                .format(leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch,
+                                        p._1._1, p._1._2)))
+    info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest))
+
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
     if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-      error("Received leader and isr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) +
-        " Latest known controller epoch is %d " + controllerEpoch)
+      stateChangeLogger.error("Received LeaderAndIsr request correlationId %d with an old controllerEpoch %d, latest known controllerEpoch is %d"
+                                .format(leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch))
       (responseMap, ErrorMapping.StaleControllerEpochCode)
     }else {
+      val controllerId = leaderAndISRRequest.controllerId
       controllerEpoch = leaderAndISRRequest.controllerEpoch
       for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) {
         var errorCode = ErrorMapping.NoError
@@ -203,17 +210,25 @@ class ReplicaManager(val config: KafkaConfig,
         val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
         try {
           if(requestedLeaderId == config.brokerId)
-            makeLeader(topic, partitionId, partitionStateInfo)
+            makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId)
           else
-            makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
+            makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders,
+                         leaderAndISRRequest.correlationId)
         } catch {
           case e =>
-            error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
+            stateChangeLogger.error("Error processing LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition %s"
+                                      .format(leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch,
+                                              topicAndPartition),
+                                    e)
             errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
         }
         responseMap.put(topicAndPartition, errorCode)
+        leaderAndISRRequest.partitionStateInfos.foreach(p =>
+          stateChangeLogger.trace("Handled LeaderAndIsr request correlationId %d received from controller %d epoch %d for partition [%s,%d]"
+                                    .format(leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch,
+                                            p._1._1, p._1._2)))
       }
-      info("Completed leader and isr request %s".format(leaderAndISRRequest))
+      info("Handled leader and isr request %s".format(leaderAndISRRequest))
       // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
       // have been completely populated before starting the checkpointing there by avoiding weird race conditions
       if (!hwThreadInitialized) {
@@ -225,33 +240,38 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
+  private def makeLeader(controllerId: Int, epoch:Int, topic: String, partitionId: Int,
+                         partitionStateInfo: PartitionStateInfo, correlationId: Int) = {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
-    info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
+    stateChangeLogger.trace(("LeaderAndIsr request correlationId %d received from controller %d epoch %d " +
+                             "starting the become-leader transition for partition [%s,%d]")
+                               .format(correlationId, controllerId, epoch, topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeLeader(topic, partitionId, leaderIsrAndControllerEpoch)) {
+    if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, correlationId)) {
       // also add this partition to the list of partitions for which the leader is the current broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
       } 
     }
-    info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
+    stateChangeLogger.trace("Completed become-leader transition for partition [%s,%d]".format(topic, partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
-                           liveBrokers: Set[Broker]) {
+  private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int,
+                           partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker], correlationId: Int) {
     val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
     val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader
-    info("Starting the follower state transition to follow leader %d for topic %s partition %d"
-                 .format(leaderBrokerId, topic, partitionId))
+    stateChangeLogger.trace(("LeaderAndIsr request correlationId %d received from controller %d epoch %d " +
+                             "starting the become-follower transition for partition [%s,%d]")
+                               .format(correlationId, controllerId, epoch, topic, partitionId))
 
     val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
-    if (partition.makeFollower(topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers)) {
+    if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers, correlationId)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition
       }
     }
+    stateChangeLogger.trace("Completed the become-follower transition for partition [%s,%d]".format(topic, partitionId))
   }
 
   private def maybeShrinkIsr(): Unit = {
@@ -266,7 +286,7 @@ class ReplicaManager(val config: KafkaConfig,
     if(partitionOpt.isDefined) {
       partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
     } else {
-      warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId))
+      warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId))
     }
   }
 
diff --git a/core/src/main/scala/kafka/tools/stateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/stateChangeLogMerger.scala
new file mode 100644
index 0000000..2d23549
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/stateChangeLogMerger.scala
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple._
+import scala.util.matching.Regex
+import collection.mutable
+import java.util.Date
+import java.text.SimpleDateFormat
+import kafka.utils.Logging
+import kafka.common.Topic
+import java.io.{BufferedOutputStream, OutputStream}
+
+/**
+ * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days).
+ *
+ * This utility expects at least one of the following two arguments -
+ * 1. A list of state change log files
+ * 2. A regex to specify state change log file names.
+ *
+ * This utility optionally also accepts the following arguments -
+ * 1. The topic whose state change logs should be merged
+ * 2. A list of partitions whose state change logs should be merged (can be specified only when the topic argument
+ * is explicitly specified)
+ * 3. Start time from when the logs should be merged
+ * 4. End time until when the logs should be merged
+ */
+
+object stateChangeLogMerger extends Logging {
+
+  val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS"
+  val topicPartitionRegex = new Regex("\\[(" + Topic.legalChars + "+),( )*([0-9]+)\\]")
+  val dateRegex = new Regex("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}")
+  val dateFormat = new SimpleDateFormat(dateFormatString)
+  var files: List[String] = List()
+  var topic: String = null
+  var partitions: List[Int] = List()
+  var startDate: Date = null
+  var endDate: Date = null
+
+  def main(args: Array[String]) {
+
+    // Parse input arguments.
+    val parser = new OptionParser
+    val filesOpt = parser.accepts("logs", "Comma separated list of state change logs or a regex for the log file names")
+                              .withOptionalArg
+                              .describedAs("file1,file2,...")
+                              .ofType(classOf[String])
+    val regexOpt = parser.accepts("logs-regex", "Regex to match the state change log files to be merged")
+                              .withOptionalArg
+                              .describedAs("for example: /tmp/state-change.log*")
+                              .ofType(classOf[String])
+    val topicOpt = parser.accepts("topic", "The topic whose state change logs should be merged")
+                              .withOptionalArg
+                              .describedAs("topic")
+                              .ofType(classOf[String])
+    val partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids whose state change logs should be merged")
+                              .withOptionalArg
+                              .describedAs("0,1,2,...")
+                              .ofType(classOf[String])
+    val startTimeOpt = parser.accepts("start-time", "The earliest timestamp of state change log entries to be merged")
+                              .withOptionalArg
+                              .describedAs("start timestamp in the format " + dateFormat)
+                              .ofType(classOf[String])
+                              .defaultsTo("0000-00-00 00:00:00,000")
+    val endTimeOpt = parser.accepts("end-time", "The latest timestamp of state change log entries to be merged")
+                              .withOptionalArg
+                              .describedAs("end timestamp in the format " + dateFormat)
+                              .ofType(classOf[String])
+                              .defaultsTo("9999-12-31 23:59:59,999")
+
+
+    val options = parser.parse(args : _*)
+    if (!options.has(filesOpt) && !options.has(regexOpt)) {
+      System.err.println("Provide at least one of the two arguments \"" + filesOpt + "\" or \"" + regexOpt + "\"")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    if (options.has(partitionsOpt) && !options.has(topicOpt)) {
+      System.err.println("\"" + topicOpt + "\" needs to be specified with partition ids")
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+
+    // Populate data structures.
+    if (options.has(filesOpt)) {
+      files :::= options.valueOf(filesOpt).split(",").toList
+    }
+    if (options.has(regexOpt)) {
+      val regex = options.valueOf(regexOpt)
+      val fileNameIndex = regex.lastIndexOf('/') + 1
+      val dirName = if (fileNameIndex == 0) "." else regex.substring(0, fileNameIndex - 1)
+      val fileNameRegex = new Regex(regex.substring(fileNameIndex))
+      files :::= new java.io.File(dirName).listFiles.filter(f => fileNameRegex.findFirstIn(f.getName) != None).map(dirName + "/" + _.getName).toList
+    }
+    if (options.has(topicOpt)) {
+      topic = options.valueOf(topicOpt)
+    }
+    if (options.has(partitionsOpt)) {
+      partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt)
+    }
+    startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim)
+    endDate = dateFormat.parse(options.valueOf(endTimeOpt).replace('\"', ' ').trim)
+
+    /**
+     * n-way merge from m input files:
+     * 1. Read a line that matches the specified topic/partitions and date range from every input file in a priority queue.
+     * 2. Take the line from the file with the earliest date and add it to a buffered output stream.
+     * 3. Add another line from the file selected in step 2 in the priority queue.
+     * 4. Flush the output buffer at the end. (The buffer will also be automatically flushed every K bytes.)
+     */
+    val pqueue = new mutable.PriorityQueue[LineIterator]()(dateBasedOrdering)
+    val output: OutputStream = new BufferedOutputStream(System.out, 1024*1024)
+    val lineIterators = files.map(io.Source.fromFile(_).getLines)
+    var lines: List[LineIterator] = List()
+
+    for (itr <- lineIterators) {
+      val lineItr = getNextLine(itr)
+      if (!lineItr.isEmpty)
+        lines ::= lineItr
+    }
+    if (!lines.isEmpty) pqueue.enqueue(lines:_*)
+
+    while (!pqueue.isEmpty) {
+      val lineItr = pqueue.dequeue()
+      output.write((lineItr.line + "\n").getBytes)
+      val nextLineItr = getNextLine(lineItr.itr)
+      if (!nextLineItr.isEmpty)
+        pqueue.enqueue(nextLineItr)
+    }
+
+    output.flush()
+  }
+
+  /**
+   * Returns the next line that matches the specified topic/partitions from the file that has the earliest date
+   * from the specified date range.
+   * @param itr Line iterator of a file
+   * @return (line from a file, line iterator for the same file)
+   */
+  def getNextLine(itr: Iterator[String]): LineIterator = {
+    while (itr != null && itr.hasNext) {
+      val nextLine = itr.next
+      dateRegex.findFirstIn(nextLine) match {
+        case Some(d) =>
+          val date = dateFormat.parse(d)
+          if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) {
+            topicPartitionRegex.findFirstMatchIn(nextLine) match {
+              case Some(matcher) =>
+                if ((topic == null || topic == matcher.group(1)) && (partitions.isEmpty || partitions.contains(matcher.group(3).toInt)))
+                  return new LineIterator(nextLine, itr)
+              case None =>
+            }
+          }
+        case None =>
+      }
+    }
+    new LineIterator()
+  }
+
+  class LineIterator(val line: String, val itr: Iterator[String]) {
+    def this() = this("", null)
+    def isEmpty = (line == "" && itr == null)
+  }
+
+  implicit object dateBasedOrdering extends Ordering[LineIterator] {
+    def compare(first: LineIterator, second: LineIterator) = {
+      val firstDate = dateRegex.findFirstIn(first.line).get
+      val secondDate = dateRegex.findFirstIn(second.line).get
+      secondDate.compareTo(firstDate)
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/utils/StateChangeLogger.scala b/core/src/main/scala/kafka/utils/StateChangeLogger.scala
new file mode 100644
index 0000000..a5b423c
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/StateChangeLogger.scala
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.apache.log4j.Logger
+
+class StateChangeLogger(ident: String) extends Logging {
+  override lazy val logger = Logger.getLogger("stateChangeLogger")
+  logIdent = ident
+}
diff --git a/core/src/main/scala/kafka/utils/Topic.scala b/core/src/main/scala/kafka/utils/Topic.scala
deleted file mode 100644
index e69de29..0000000
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index d0c7b90..4c209f1 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -89,7 +89,7 @@ object SerializationTestUtils{
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1, 0)
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0)
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 129bc56..be82592 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -122,7 +122,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
 
     // start another controller
-    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
+    val controllerId = 2
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
     val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
     controllerChannelManager.startup()
@@ -131,7 +132,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     leaderAndIsr.put((topic, partitionId),
       new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
     val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
-    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch, 0)
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0)
 
     controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
     TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
