diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index f4bf3b9..63f5bc8 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -136,7 +136,7 @@ object AdminUtils extends Logging { new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { case e => - error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e) + debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e) new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala index baeb099..095469b 100644 --- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala @@ -82,9 +82,8 @@ object ListTopicCommand { case ErrorMapping.UnknownTopicOrPartitionCode => println("topic " + topic + " doesn't exist!") case _ => - println("topic: " + topic) for (part <- topicMetaData.partitionsMetadata) - println(part.toString) + println("topic: " + topic + "\t" + part.toString) } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 3b7ee24..68e64d6 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -79,6 +79,13 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr 4 /* replication factor */ size } + + override def toString(): String = { + val partitionStateInfo = new StringBuilder + partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString) + partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")") + partitionStateInfo.toString() + } } object LeaderAndIsrRequest { @@ -121,13 +128,13 @@ case class LeaderAndIsrRequest (versionId: Short, controllerId: Int, controllerEpoch: Int, partitionStateInfos: Map[(String, Int), PartitionStateInfo], - leaders: Set[Broker]) + aliveLeaders: Set[Broker]) extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) { - def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int, + def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], aliveLeaders: Set[Broker], controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String) = { this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, LeaderAndIsrRequest.DefaultAckTimeout, - controllerId, controllerEpoch, partitionStateInfos, liveBrokers) + controllerId, controllerEpoch, partitionStateInfos, aliveLeaders) } def writeTo(buffer: ByteBuffer) { @@ -143,8 +150,8 @@ case class LeaderAndIsrRequest (versionId: Short, buffer.putInt(key._2) value.writeTo(buffer) } - buffer.putInt(leaders.size) - leaders.foreach(_.writeTo(buffer)) + buffer.putInt(aliveLeaders.size) + aliveLeaders.foreach(_.writeTo(buffer)) } def sizeInBytes(): Int = { @@ -159,22 +166,22 @@ case class LeaderAndIsrRequest (versionId: Short, for((key, value) <- partitionStateInfos) size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */ size += 4 /* number of leader brokers */ - for(broker <- leaders) + for(broker <- aliveLeaders) size += broker.sizeInBytes /* broker info */ size } override def toString(): String = { val leaderAndIsrRequest = new StringBuilder - leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName) - leaderAndIsrRequest.append("; Version: " + versionId) - leaderAndIsrRequest.append("; Controller: " + controllerId) - leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch) - leaderAndIsrRequest.append("; CorrelationId: " + correlationId) - leaderAndIsrRequest.append("; ClientId: " + clientId) - leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms") - leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(",")) - leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(",")) + leaderAndIsrRequest.append("Name:" + this.getClass.getSimpleName) + leaderAndIsrRequest.append(";Version:" + versionId) + leaderAndIsrRequest.append(";Controller:" + controllerId) + leaderAndIsrRequest.append(";ControllerEpoch:" + controllerEpoch) + leaderAndIsrRequest.append(";CorrelationId:" + correlationId) + leaderAndIsrRequest.append(";ClientId:" + clientId) + leaderAndIsrRequest.append(";AckTimeoutMs:" + ackTimeoutMs + " ms") + leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(",")) + leaderAndIsrRequest.append(";Leaders:" + aliveLeaders.mkString(",")) leaderAndIsrRequest.toString() } diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index f822678..a0d68c5 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -115,6 +115,7 @@ case class PartitionMetadata(partitionId: Int, partitionMetadataString.append("\tleader: " + (if(leader.isDefined) formatBroker(leader.get) else "none")) partitionMetadataString.append("\treplicas: " + replicas.map(formatBroker).mkString(",")) partitionMetadataString.append("\tisr: " + isr.map(formatBroker).mkString(",")) + partitionMetadataString.append("\tisUnderReplicated: %s".format(if(isr.size < replicas.size) "true" else "false")) partitionMetadataString.toString() } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2ca7ee6..1b1bc0e 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -166,7 +166,7 @@ class Partition(val topic: String, * 4. start a fetcher to the new leader */ def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - liveBrokers: Set[Broker], correlationId: Int): Boolean = { + aliveLeaders: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr if (leaderEpoch >= leaderAndIsr.leaderEpoch) { @@ -180,7 +180,7 @@ class Partition(val topic: String, // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch val newLeaderBrokerId: Int = leaderAndIsr.leader - liveBrokers.find(_.id == newLeaderBrokerId) match { + aliveLeaders.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => // stop fetcher thread to previous leader replicaFetcherManager.removeFetcher(topic, partitionId) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index f7a7bd4..3164f78 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -190,8 +190,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques val broker = m._1 val partitionStateInfos = m._2.toMap val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - val leaders = liveBrokers.filter(b => leaderIds.contains(b.id)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, + val aliveLeaders = liveBrokers.filter(b => leaderIds.contains(b.id)) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, aliveLeaders, controllerId, controllerEpoch, correlationId, clientId) for (p <- partitionStateInfos) { val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 74614d8..65def03 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -238,8 +238,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() - partitionStateMachine.startup() replicaStateMachine.startup() + partitionStateMachine.startup() Utils.registerMBean(this, KafkaController.MBeanName) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) initializeAndMaybeTriggerPartitionReassignment() @@ -523,16 +523,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private def updateLeaderAndIsrCache() { val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq) - for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) { - // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it - controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { - case true => - controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch) - case false => - debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderIsrAndControllerEpoch.leaderAndIsr.leader) + - "partition %s is dead, just ignore it".format(topicPartition)) - } - } + for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) + controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch) } private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = { @@ -982,7 +974,16 @@ case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty, case class PartitionAndReplica(topic: String, partition: Int, replica: Int) -case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) +case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) { + override def toString(): String = { + val leaderAndIsrInfo = new StringBuilder + leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader) + leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(",")) + leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch) + leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")") + leaderAndIsrInfo.toString() + } +} object ControllerStats extends KafkaMetricsGroup { val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 2e40629..213db6e 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -135,7 +135,7 @@ class LogSegment(val messageSet: FileMessageSet, /** * Calculate the offset that would be used for the next message to be append to this segment. - * Not that this is expensive. + * Note that this is expensive. */ def nextOffset(): Long = { val ms = read(index.lastOffset, messageSet.sizeInBytes, None) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index e8702e2..be872dc 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -46,7 +46,7 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers: I fetcherThread.start } fetcherThread.addPartition(topic, partitionId, initialOffset) - info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d" + info("adding fetcher on topic %s, partition %d, initOffset %d to broker %d with fetcherId %d" .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId)) } } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index d4f15c1..b733fa3 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -83,6 +83,8 @@ class ReplicaFetcherThread(name:String, val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) if (leaderEndOffset < log.logEndOffset) { log.truncateTo(leaderEndOffset) + warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d" + .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset)) leaderEndOffset } else { /** @@ -93,6 +95,8 @@ class ReplicaFetcherThread(name:String, */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) log.truncateAndStartWithNewOffset(leaderStartOffset) + warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d" + .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset)) leaderStartOffset } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 68e712c..6d849ac 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -219,7 +219,7 @@ class ReplicaManager(val config: KafkaConfig, if(requestedLeaderId == config.brokerId) makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId) else - makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders, + makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.aliveLeaders, leaderAndISRRequest.correlationId) } catch { case e => @@ -264,15 +264,14 @@ class ReplicaManager(val config: KafkaConfig, } private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int, - partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker], correlationId: Int) { + partitionStateInfo: PartitionStateInfo, aliveLeaders: Set[Broker], correlationId: Int) { val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch - val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partition [%s,%d]") .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers, correlationId)) { + if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, aliveLeaders, correlationId)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition