diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7e8ae29..d2bb6c1 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -130,6 +130,8 @@ class RequestSendThread(val controllerId: Int,
             response = LeaderAndIsrResponse.readFrom(receive.buffer)
           case RequestKeys.StopReplicaKey =>
             response = StopReplicaResponse.readFrom(receive.buffer)
+          case RequestKeys.UpdateMetadataKey =>
+            response = UpdateMetadataResponse.readFrom(receive.buffer)
         }
         stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %d"
                                   .format(controllerId, controllerContext.epoch, response.correlationId, toBrokerId))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0c5c4d5..2157b81 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -101,21 +101,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {
     val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
+    val stateChangeLogger = replicaManager.stateChangeLogger
     if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) {
       val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
         "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId,
         updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
         replicaManager.controllerEpoch)
-      replicaManager.stateChangeLogger.warn(stateControllerEpochErrorMessage)
+      stateChangeLogger.warn(stateControllerEpochErrorMessage)
       throw new ControllerMovedException(stateControllerEpochErrorMessage)
     }
+    if(stateChangeLogger.isTraceEnabled)
+      updateMetadataRequest.partitionStateInfos.foreach(p => stateChangeLogger.trace(("Broker %d handling " +
+        "UpdateMetadata request %s correlation id %d received from controller %d epoch %d for partition %s")
+        .format(brokerId, p._2, updateMetadataRequest.correlationId, updateMetadataRequest.controllerId,
+        updateMetadataRequest.controllerEpoch, p._1)))
     partitionMetadataLock synchronized {
       replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch
       // cache the list of alive brokers in the cluster
       updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
       updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
         leaderCache.put(partitionState._1, partitionState._2)
-        debug("Caching leader info %s for partition %s".format(partitionState._2, partitionState._1))
+        if(stateChangeLogger.isTraceEnabled)
+          stateChangeLogger.trace(("Broker %d caching leader info %s for partition %s in response to UpdateMetadata request sent by controller %d" +
+            " epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1,
+            updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
       }
     }
     val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
