diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index d5f5a4e..6146c6e 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -220,7 +220,8 @@ class Partition(val topic: String, if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica - info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(", "))) + info("Expanding ISR for topic %s partition %d from %s to %s" + .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() @@ -315,7 +316,7 @@ class Partition(val topic: String, } private def updateIsr(newIsr: Set[Replica]) { - info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", "))) + debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(","))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index a504534..b80c0b0 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -109,7 +109,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk def clearCurrentChunk() { try { - info("Clearing the current data chunk for this consumer iterator") + debug("Clearing the current data chunk for this consumer iterator") current.set(null) } } diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 7a06c24..76f0777 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -53,8 +53,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { case true => - debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s" - .format(liveAssignedReplicasToThisPartition.mkString(","))) + debug("No broker is ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" + .format(topicAndPartition, liveAssignedReplicasToThisPartition.mkString(","))) liveAssignedReplicasToThisPartition.isEmpty match { case true => throw new NoReplicaOnlineException(("No replica for partition " + @@ -63,13 +63,13 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten case false => ControllerStats.uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicasToThisPartition.head - warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) + - "There's potential data loss") + warn("No broker in ISR is alive for %s. Elected leader from broker %s. There's potential data loss." + .format(topicAndPartition, newLeader)) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) } case false => val newLeader = liveBrokersInIsr.head - debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader) + debug("Some broker in ISR is alive for %s. Select %d from ISR to be the leader.".format(topicAndPartition, newLeader)) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) } info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index ce27a19..c4397b7 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -44,7 +44,7 @@ class FileMessageSet private[kafka](val file: File, private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start) if (initChannelPositionToEnd) { - info("Creating or reloading log segment %s".format(file.getAbsolutePath)) + debug("Creating or reloading log segment %s".format(file.getAbsolutePath)) /* set the file position to the last byte in the file */ channel.position(channel.size) } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 7d71451..6ac6545 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -127,7 +127,7 @@ private[kafka] class Log(val dir: File, /* Calculate the offset of the next message */ private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset()) - debug("Completed load of log %s with log end offset %d".format(name, logEndOffset)) + info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) newGauge(name + "-" + "NumLogSegments", new Gauge[Int] { def getValue = numberOfSegments }) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index e806da9..60ebc52 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -90,7 +90,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = /* the last offset in the index */ var lastOffset = readLastOffset() - info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" + debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) /* the maximum number of entries this index can hold */ diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 27b16e3..89cb27d 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -135,7 +135,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } else { // currently, if in async mode, we just log the serialization error. We need to revisit // this when doing kafka-496 - error("Error serializing message ", t) + error("Error serializing message for topic %s".format(e.topic), t) } } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 87ca6b0..7ee81a1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -205,10 +205,10 @@ class KafkaApis(val requestChannel: RequestChannel, Runtime.getRuntime.halt(1) null case utpe: UnknownTopicOrPartitionException => - warn(utpe.getMessage) + warn("Produce request: " + utpe.getMessage) new ProduceResult(topicAndPartition, utpe) case nle: NotLeaderForPartitionException => - warn(nle.getMessage) + warn("Produce request: " + nle.getMessage) new ProduceResult(topicAndPartition, nle) case e => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() @@ -291,15 +291,17 @@ class KafkaApis(val requestChannel: RequestChannel, // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request // for a partition it is the leader for case utpe: UnknownTopicOrPartitionException => - warn(utpe.getMessage) + warn("Fetch request: " + utpe.getMessage) new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) case nle: NotLeaderForPartitionException => - warn(nle.getMessage) + warn("Fetch request: " + nle.getMessage) new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) case t => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() - error("error when processing request " + (topic, partition, offset, fetchSize), t) + error("Error when processing fetch request for topic %s partition %d offset %d from %s with correlation id %d" + .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId), + t) new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) } (TopicAndPartition(topic, partition), partitionData) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6d849ac..679b800 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -131,7 +131,7 @@ class ReplicaManager(val config: KafkaConfig, def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = { val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(stopReplicaRequest.controllerEpoch < controllerEpoch) { - stateChangeLogger.error("Broker %d received stop replica request from an old controller epoch %d." + stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d." .format(localBrokerId, stopReplicaRequest.controllerEpoch) + " Latest known controller epoch is %d " + controllerEpoch) (responseMap, ErrorMapping.StaleControllerEpochCode) @@ -203,7 +203,7 @@ class ReplicaManager(val config: KafkaConfig, val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { - stateChangeLogger.error("Broker %d received LeaderAndIsr request correlationId %d with an old controllerEpoch %d, latest known controllerEpoch is %d" + stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlationId %d with an old controllerEpoch %d. Latest known controllerEpoch is %d" .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch)) (responseMap, ErrorMapping.StaleControllerEpochCode) }else { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 5673ae2..f0ec960 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -328,7 +328,7 @@ object ZkUtils extends Logging { def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { val stat = client.writeData(path, data, expectVersion) - info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d" + debug("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch { @@ -346,7 +346,7 @@ object ZkUtils extends Logging { def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { val stat = client.writeData(path, data, expectVersion) - info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d" + debug("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch {