diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1087a2e..810952e 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -56,7 +56,7 @@ class Partition(val topic: String, * each partition. */ private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId) - private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + private val stateChangeLogger = KafkaController.stateChangeLogger private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 8ab8ab6..f17d976 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -114,7 +114,7 @@ class RequestSendThread(val controllerId: Int, val channel: BlockingChannel) extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) { private val lock = new Object() - private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + private val stateChangeLogger = KafkaController.stateChangeLogger connectToBroker(toBroker, channel) override def doWork(): Unit = { @@ -188,7 +188,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]] val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[StopReplicaRequestInfo]] val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]] - private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + private val stateChangeLogger = KafkaController.stateChangeLogger def newBatch() { // raise error if the previous batch is not empty diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 2867ef1..d142f8c 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -125,10 +125,12 @@ trait KafkaControllerMBean { object KafkaController extends Logging { val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps" - val stateChangeLogger = "state.change.logger" + val stateChangeLogger = new StateChangeLogger("state.change.logger") val InitialControllerEpoch = 1 val InitialControllerEpochZkVersion = 1 + case class StateChangeLogger(override val loggerName: String) extends Logging + def parseControllerId(controllerInfoString: String): Int = { try { Json.parseFull(controllerInfoString) match { @@ -154,7 +156,7 @@ object KafkaController extends Logging { class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true - private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + private val stateChangeLogger = KafkaController.stateChangeLogger val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index c3e8d05..6457b56 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -50,7 +50,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val hasStarted = new AtomicBoolean(false) private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " - private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + private val stateChangeLogger = KafkaController.stateChangeLogger private var topicChangeListener: TopicChangeListener = null private var deleteTopicsListener: DeleteTopicsListener = null private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 5e016d5..4da43c4 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -52,7 +52,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) private val hasStarted = new AtomicBoolean(false) this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " - private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + private val stateChangeLogger = KafkaController.stateChangeLogger /** * Invoked on successful controller election. First registers a broker change listener since that triggers all diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a6ec970..826831f 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -31,6 +31,9 @@ import org.apache.log4j.Logger object RequestChannel extends Logging { val AllDone = new Request(1, 2, getShutdownReceive(), 0) + val requestLogger = new RequestLogger("kafka.request.logger") + + case class RequestLogger(override val loggerName: String) extends Logging def getShutdownReceive() = { val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) @@ -49,7 +52,7 @@ object RequestChannel extends Logging { val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer = null - private val requestLogger = Logger.getLogger("kafka.request.logger") + private val requestLogger = RequestChannel.requestLogger trace("Processor %d received request : %s".format(processor, requestObj)) def updateRequestMetrics() { @@ -81,10 +84,10 @@ object RequestChannel extends Logging { m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } - if(requestLogger.isTraceEnabled) + if(requestLogger.logger.isTraceEnabled) requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" .format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) - else if(requestLogger.isDebugEnabled) { + else { requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" .format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ae2df20..0f137c5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -138,10 +138,9 @@ class KafkaApis(val requestChannel: RequestChannel, updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) updateMetadataRequest.partitionStateInfos.foreach { partitionState => metadataCache.put(partitionState._1, partitionState._2) - if(stateChangeLogger.isTraceEnabled) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, - updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) } // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are // currently being deleted by the controller @@ -155,10 +154,9 @@ class KafkaApis(val requestChannel: RequestChannel, }.keySet partitionsToBeDeleted.foreach { partition => metadataCache.remove(partition) - if(stateChangeLogger.isTraceEnabled) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition, - updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition, + updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) } } val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0fe881d..7df56ce 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -56,7 +56,7 @@ class ReplicaManager(val config: KafkaConfig, val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap private var hwThreadInitialized = false this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " - val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) + val stateChangeLogger = KafkaController.stateChangeLogger newGauge( "LeaderCount",