diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 63f5bc8..c7652ad 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -117,7 +117,7 @@ object AdminUtils extends Logging { try { Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) } catch { - case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e) + case e => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e) } case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9a29fb2..02d2c44 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -221,7 +221,7 @@ class Partition(val topic: String, if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica - info("Expanding ISR for topic %s partition %d from %s to %s" + info("Expanding ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache updateIsr(newInSyncReplicas) @@ -270,10 +270,10 @@ class Partition(val topic: String, val oldHighWatermark = leaderReplica.highWatermark if(newHighWatermark > oldHighWatermark) { leaderReplica.highWatermark = newHighWatermark - debug("Highwatermark for topic %s partition %d updated to %d".format(topic, partitionId, newHighWatermark)) + debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, partitionId, newHighWatermark)) } else - debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s" + debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are %s" .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(","))) } @@ -285,7 +285,7 @@ class Partition(val topic: String, if(outOfSyncReplicas.size > 0) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.size > 0) - info("Shrinking ISR for topic %s partition %d from %s to %s".format(topic, partitionId, + info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache updateIsr(newInSyncReplicas) @@ -310,13 +310,16 @@ class Partition(val topic: String, val candidateReplicas = inSyncReplicas - leaderReplica // Case 1 above val possiblyStuckReplicas = candidateReplicas.filter(r => r.logEndOffset < leaderLogEndOffset) - debug("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId, - possiblyStuckReplicas.map(_.brokerId).mkString(","))) + if(possiblyStuckReplicas.size > 0) + debug("Possibly stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, + possiblyStuckReplicas.map(_.brokerId).mkString(","))) val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs)) - debug("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) + if(stuckReplicas.size > 0) + debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages) - debug("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) + if(slowReplicas.size > 0) + debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas } @@ -338,7 +341,7 @@ class Partition(val topic: String, } private def updateIsr(newIsr: Set[Replica]) { - debug("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(","))) + debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 321ab58..5e659b4 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -40,10 +40,10 @@ class Replica(val brokerId: Int, if (!isLocal) { logEndOffsetValue.set(newLogEndOffset) logEndOffsetUpdateTimeMsValue.set(time.milliseconds) - trace("Setting log end offset for replica %d for topic %s partition %d to %d" + trace("Setting log end offset for replica %d for partition [%s,%d] to %d" .format(brokerId, topic, partitionId, logEndOffsetValue.get())) } else - throw new KafkaException("Shouldn't set logEndOffset for replica %d topic %s partition %d since it's local" + throw new KafkaException("Shouldn't set logEndOffset for replica %d partition [%s,%d] since it's local" .format(brokerId, topic, partitionId)) } @@ -66,11 +66,11 @@ class Replica(val brokerId: Int, def highWatermark_=(newHighWatermark: Long) { if (isLocal) { - trace("Setting hw for replica %d topic %s partition %d on broker %d to %d" + trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d" .format(brokerId, topic, partitionId, brokerId, newHighWatermark)) highWatermarkValue.set(newHighWatermark) } else - throw new KafkaException("Unable to set highwatermark for replica %d topic %s partition %d since it's not local" + throw new KafkaException("Unable to set highwatermark for replica %d partition [%s,%d] since it's not local" .format(brokerId, topic, partitionId)) } @@ -78,7 +78,7 @@ class Replica(val brokerId: Int, if (isLocal) highWatermarkValue.get() else - throw new KafkaException("Unable to get highwatermark for replica %d topic %s partition %d since it's not local" + throw new KafkaException("Unable to get highwatermark for replica %d partition [%s,%d] since it's not local" .format(brokerId, topic, partitionId)) } diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 80df1b5..5f9c902 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -45,7 +45,7 @@ class ConsumerFetcherThread(name: String, def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { val pti = partitionMap(topicAndPartition) if (pti.getFetchOffset != fetchOffset) - throw new RuntimeException("Offset doesn't match for topic %s partition: %d pti offset: %d fetch offset: %d" + throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 497cfdd..4771d11 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -197,7 +197,7 @@ private[kafka] class LogManager(val config: KafkaConfig, config.logIndexIntervalBytes, time, config.brokerId) - info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath)) + info("Created log for partition [%s,%d] in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath)) logs.put(topicAndPartition, log) log } diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 72597ef..82e6e4d 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -57,10 +57,10 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, partitionMetadata.map { m => m.leader match { case Some(leader) => - debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id)) + debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id)) new PartitionAndLeader(topic, m.partitionId, Some(leader.id)) case None => - debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId)) + debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId)) new PartitionAndLeader(topic, m.partitionId, None) } }.sortWith((s, t) => s.partitionId < t.partitionId) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 2ac7a17..162c749 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -144,15 +144,15 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke try { val newOffset = handleOffsetOutOfRange(topicAndPartition) partitionMap.put(topicAndPartition, newOffset) - warn("current offset %d for topic %s partition %d out of range; reset offset to %d" + warn("current offset %d for partition [%s,%d] out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) } catch { case e => - warn("error getting offset for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), e) + warn("error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } case _ => - warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), + warn("error for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), ErrorMapping.exceptionFor(partitionData.error)) partitionsWithError += topicAndPartition } diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala index 5aa0141..30caec1 100644 --- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala +++ b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala @@ -77,8 +77,7 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { try { hwFile.length() match { case 0 => - warn("No highwatermark file is found. Returning 0 as the highwatermark for topic %s partition %d." - .format(topic, partition)) + warn("No highwatermark file is found. Returning 0 as the highwatermark for partition [%s,%d]".format(topic, partition)) 0L case _ => val hwFileReader = new BufferedReader(new FileReader(hwFile)) @@ -99,7 +98,7 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging { val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition)) hwOpt match { case Some(hw) => - debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file".format(hw, topic, partition)) + debug("Read hw %d for partition [%s,%d] from highwatermark checkpoint file".format(hw, topic, partition)) hw case None => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) + diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6b6f8f2..bb178d6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -252,7 +252,8 @@ class KafkaApis(val requestChannel: RequestChannel, val response = new FetchResponse(fetchRequest.correlationId, dataRead) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { - debug("Putting fetch request into purgatory") + debug("Putting fetch request with correlation id %d from client %s into purgatory".format(fetchRequest.correlationId, + fetchRequest.clientId)) // create a list of (topic, partition) pairs to use as keys for this delayed request val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_)) val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait, bytesReadable) @@ -285,7 +286,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!isFetchFromFollower) { new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) } else { - debug("Leader %d for topic %s partition %d received fetch request from follower %d" + debug("Leader %d for partition [%s,%d] received fetch request from follower %d" .format(brokerId, topic, partition, fetchRequest.replicaId)) new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) } @@ -302,7 +303,7 @@ class KafkaApis(val requestChannel: RequestChannel, case t => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() - error("Error when processing fetch request for topic %s partition %d offset %d from %s with correlation id %d" + error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d" .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId), t) new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) @@ -334,7 +335,7 @@ class KafkaApis(val requestChannel: RequestChannel, case Some(log) => log.read(offset, maxSize, maxOffsetOpt) case None => - error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic, partition, brokerId)) + error("Leader for partition [%s,%d] on broker %d does not have a local log".format(topic, partition, brokerId)) MessageSet.Empty } (messages, localReplica.highWatermark) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b733fa3..74073d0 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -54,7 +54,7 @@ class ReplicaFetcherThread(name:String, .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes)) val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) replica.highWatermark = followerHighWatermark - trace("Follower %d set replica highwatermark for topic %s partition %d to %d" + trace("Follower %d set replica highwatermark for partition [%s,%d] to %d" .format(replica.brokerId, topic, partitionId, followerHighWatermark)) } catch { case e: KafkaStorageException => diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index dc6d066..eac9af2 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -99,7 +99,7 @@ object VerifyConsumerRebalance extends Logging { partitions.foreach { partition => // check if there is a node for [partition] if(!partitionsWithOwners.exists(p => p.equals(partition))) { - error("No owner for topic %s partition %s".format(topic, partition)) + error("No owner for partition [%s,%d]".format(topic, partition)) rebalanceSucceeded = false } // try reading the partition owner path for see if a valid consumer id exists there @@ -109,7 +109,7 @@ object VerifyConsumerRebalance extends Logging { case None => null } if(partitionOwner == null) { - error("No owner for topic %s partition %s".format(topic, partition)) + error("No owner for partition [%s,%d]".format(topic, partition)) rebalanceSucceeded = false } else { @@ -117,12 +117,12 @@ object VerifyConsumerRebalance extends Logging { consumerIdsForTopic match { case Some(consumerIds) => if(!consumerIds.contains(partitionOwner)) { - error("Owner %s for topic %s partition %s is not a valid member of consumer " + - "group %s".format(partitionOwner, topic, partition, group)) + error(("Owner %s for partition [%s,%d] is not a valid member of consumer " + + "group %s").format(partitionOwner, topic, partition, group)) rebalanceSucceeded = false } else - info("Owner of topic %s partition %s is %s".format(topic, partition, partitionOwner)) + info("Owner of partition [%s,%d] is %s".format(topic, partition, partitionOwner)) case None => { error("No consumer ids registered for topic " + topic) rebalanceSucceeded = false diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 7971a09..4f6fcd4 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -101,7 +101,7 @@ object ZkUtils extends Logging { val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] val zkPathVersion = stat.getVersion - debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, + debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for partition [%s,%d]".format(leader, epoch, isr.toString(), zkPathVersion, topic, partition)) Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)) case None => None @@ -131,10 +131,10 @@ object ZkUtils extends Logging { leaderAndIsrOpt match { case Some(leaderAndIsr) => Json.parseFull(leaderAndIsr) match { - case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition)) + case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for partition [%s,%d] is invalid".format(topic, partition)) case Some(m) => m.asInstanceOf[Map[String, Any]].get("leader_epoch").get.asInstanceOf[Int] } - case None => throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d is empty" + case None => throw new NoEpochForPartitionException("No epoch, ISR path for partition [%s,%d] is empty" .format(topic, partition)) } } @@ -177,7 +177,7 @@ object ZkUtils extends Logging { def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { val replicas = getReplicasForPartition(zkClient, topic, partition) - debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas)) + debug("The list of replicas for partition [%s,%d] is %s".format(topic, partition, replicas)) replicas.contains(brokerId.toString) } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 86d30ad..4d989e4 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -338,7 +338,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props)) val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x) producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) - debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition)) + debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition)) producer.close() ms.toList } @@ -359,7 +359,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x) producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) messages ++= ms - debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition)) + debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition)) } producer.close() messages diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 68c134e..f9c9e64 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -408,7 +408,7 @@ object TestUtils extends Logging { ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)) } catch { - case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe) + case oe => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe) } } } @@ -419,9 +419,9 @@ object TestUtils extends Logging { val leaderExistsOrChanged = leaderLock.newCondition() if(oldLeaderOpt == None) - info("Waiting for leader to be elected for topic %s partition %d".format(topic, partition)) + info("Waiting for leader to be elected for partition [%s,%d]".format(topic, partition)) else - info("Waiting for leader for topic %s partition %d to be changed from old leader %d".format(topic, partition, oldLeaderOpt.get)) + info("Waiting for leader for partition [%s,%d] to be changed from old leader %d".format(topic, partition, oldLeaderOpt.get)) leaderLock.lock() try { @@ -432,10 +432,10 @@ object TestUtils extends Logging { leader match { case Some(l) => if(oldLeaderOpt == None) - info("Leader %d is elected for topic %s partition %d".format(l, topic, partition)) + info("Leader %d is elected for partition [%s,%d]".format(l, topic, partition)) else - info("Leader for topic %s partition %d is changed from %d to %d".format(topic, partition, oldLeaderOpt.get, l)) - case None => error("Timing out after %d ms since leader is not elected for topic %s partition %d" + info("Leader for partition [%s,%d] is changed from %d to %d".format(topic, partition, oldLeaderOpt.get, l)) + case None => error("Timing out after %d ms since leader is not elected for partition [%s,%d]" .format(timeoutMs, topic, partition)) } leader