diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 2faa196..b9ffe00 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -19,6 +19,7 @@ package kafka.log import scala.collection._ import scala.math +import java.util.concurrent.TimeUnit import java.nio._ import java.util.Date import java.io.File @@ -214,7 +215,6 @@ class LogCleaner(val config: CleanerConfig, */ def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { this.lastStats = stats - cleaner.statsUnderlying.swap def mb(bytes: Double) = bytes / (1024*1024) val message = "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + @@ -260,10 +260,9 @@ private[log] class Cleaner(val id: Int, this.logIdent = "Cleaner " + id + ": " - /* cleaning stats - one instance for the current (or next) cleaning cycle and one for the last completed cycle */ - val statsUnderlying = (new CleanerStats(time), new CleanerStats(time)) - def stats = statsUnderlying._1 - + /* stats on this cleaning */ + val stats = new CleanerStats(time) + /* buffer used for read i/o */ private var readBuffer = ByteBuffer.allocate(ioBufferSize) @@ -305,7 +304,6 @@ private[log] class Cleaner(val id: Int, stats.bufferUtilization = offsetMap.utilization stats.allDone() - endOffset } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index e8ced6a..514941c 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -58,7 +58,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To private val pausedCleaningCond = lock.newCondition() /* a gauge for tracking the cleanable ratio of the dirtiest log */ - @volatile private var dirtiestLogCleanableRatio = 0.0 + private var dirtiestLogCleanableRatio = 0.0 newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt }) /** @@ -80,7 +80,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) .filter(l => l.totalBytes > 0) // skip any empty logs - this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 + if(!dirtyLogs.isEmpty) + this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio if(cleanableLogs.isEmpty) { None @@ -125,8 +126,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To case LogCleaningInProgress => inProgress.put(topicAndPartition, LogCleaningAborted) case s => - throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state." - .format(topicAndPartition, s)) + throw new IllegalStateException(("Partiiton %s can't be aborted and pasued since it's in %s state").format(topicAndPartition, s)) } } while (!isCleaningInState(topicAndPartition, LogCleaningPaused)) @@ -142,19 +142,17 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { inProgress.get(topicAndPartition) match { case None => - throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused." - .format(topicAndPartition)) + throw new IllegalStateException(("Partiiton %s can't be resumed since it's never paused").format(topicAndPartition)) case Some(state) => state match { case LogCleaningPaused => inProgress.remove(topicAndPartition) case s => - throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state." - .format(topicAndPartition, s)) + throw new IllegalStateException(("Partiiton %s can't be resumed since it's in %s state").format(topicAndPartition, s)) } } } - info("Compaction for partition %s is resumed".format(topicAndPartition)) + info("The cleaning for partition %s is resumed".format(topicAndPartition)) } /** @@ -196,7 +194,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inProgress.put(topicAndPartition, LogCleaningPaused) pausedCleaningCond.signalAll() case s => - throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s)) + throw new IllegalStateException(("In-progress partiiton %s can't be in %s state").format(topicAndPartition, s)) } } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1a4ffce..c5a9aaf 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -29,12 +29,8 @@ import kafka.metrics.KafkaMetricsGroup import kafka.common._ import kafka.utils.{Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response -import kafka.cluster.Broker import kafka.controller.KafkaController -import kafka.utils.Utils.inLock import org.I0Itec.zkclient.ZkClient -import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.controller.KafkaController.StateChangeLogger /** * Logic to handle the various Kafka requests @@ -52,80 +48,9 @@ class KafkaApis(val requestChannel: RequestChannel, private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) private val delayedRequestMetrics = new DelayedRequestMetrics - /* following 3 data structures are updated by the update metadata request - * and is queried by the topic metadata request. */ var metadataCache = new MetadataCache - private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() - private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = "[KafkaApi-%d] ".format(brokerId) - class MetadataCache { - private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = - new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() - - def addPartitionInfo(topic: String, - partitionId: Int, - stateInfo: PartitionStateInfo) { - cache.get(topic) match { - case Some(infos) => infos.put(partitionId, stateInfo) - case None => { - val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo] - cache.put(topic, newInfos) - newInfos.put(partitionId, stateInfo) - } - } - } - - def removePartitionInfo(topic: String, partitionId: Int) = { - cache.get(topic) match { - case Some(infos) => { - infos.remove(partitionId) - if(infos.isEmpty) { - cache.remove(topic) - } - true - } - case None => false - } - } - - def getPartitionInfos(topic: String) = cache(topic) - - def containsTopicAndPartition(topic: String, - partitionId: Int): Boolean = { - cache.get(topic) match { - case Some(partitionInfos) => partitionInfos.contains(partitionId) - case None => false - } - } - - def allTopics = cache.keySet - - def removeTopic(topic: String) = cache.remove(topic) - - def containsTopic(topic: String) = cache.contains(topic) - - def updateCache(updateMetadataRequest: UpdateMetadataRequest, - brokerId: Int, - stateChangeLogger: StateChangeLogger) = { - updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => - if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { - removePartitionInfo(tp.topic, tp.partition) - stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, tp, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } else { - addPartitionInfo(tp.topic, tp.partition, info) - stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + - "sent by controller %d epoch %d with correlation id %d") - .format(brokerId, info, tp, updateMetadataRequest.controllerId, - updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) - } - } - } - } - /** * Top-level method that handles all requests and multiplexes to the right api */ @@ -154,14 +79,6 @@ class KafkaApis(val requestChannel: RequestChannel, request.apiLocalCompleteTimeMs = SystemTime.milliseconds } - // ensureTopicExists is only for client facing requests - private def ensureTopicExists(topic: String) = { - inLock(partitionMetadataLock.readLock()) { - if (!metadataCache.containsTopic(topic)) - throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") - } - } - def handleLeaderAndIsrRequest(request: RequestChannel.Request) { // ensureTopicExists is only for client facing requests // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they @@ -191,24 +108,8 @@ class KafkaApis(val requestChannel: RequestChannel, def handleUpdateMetadataRequest(request: RequestChannel.Request) { val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest] - // ensureTopicExists is only for client facing requests - // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they - // stop serving data to clients for the topic being deleted - val stateChangeLogger = replicaManager.stateChangeLogger - if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) { - val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + - "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId, - updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, - replicaManager.controllerEpoch) - stateChangeLogger.warn(stateControllerEpochErrorMessage) - throw new ControllerMovedException(stateControllerEpochErrorMessage) - } - inLock(partitionMetadataLock.writeLock()) { - replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch - // cache the list of alive brokers in the cluster - updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) - metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger) - } + replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache) + val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) } @@ -388,7 +289,6 @@ class KafkaApis(val requestChannel: RequestChannel, BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) try { - ensureTopicExists(topicAndPartition.topic) val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = partitionOpt match { @@ -491,7 +391,6 @@ class KafkaApis(val requestChannel: RequestChannel, case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => val partitionData = try { - ensureTopicExists(topic) val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId) BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes) @@ -562,7 +461,6 @@ class KafkaApis(val requestChannel: RequestChannel, val responseMap = offsetRequest.requestInfo.map(elem => { val (topicAndPartition, partitionOffsetRequestInfo) = elem try { - ensureTopicExists(topicAndPartition.topic) // ensure leader exists val localReplica = if(!offsetRequest.isFromDebuggingClient) replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition) @@ -658,69 +556,24 @@ class KafkaApis(val requestChannel: RequestChannel, } private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { - val config = replicaManager.config - - // Returning all topics when requested topics are empty - val isAllTopics = topics.isEmpty - val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] - val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String] - - inLock(partitionMetadataLock.readLock()) { - val topicsRequested = if (isAllTopics) metadataCache.allTopics else topics - for (topic <- topicsRequested) { - if (isAllTopics || metadataCache.containsTopic(topic)) { - val partitionStateInfos = metadataCache.getPartitionInfos(topic) - val partitionMetadata = partitionStateInfos.map { - case (partitionId, partitionState) => - val replicas = partitionState.allReplicas - val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq - var leaderInfo: Option[Broker] = None - var isrInfo: Seq[Broker] = Nil - val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch - val leader = leaderIsrAndEpoch.leaderAndIsr.leader - val isr = leaderIsrAndEpoch.leaderAndIsr.isr - debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader) - try { - leaderInfo = aliveBrokers.get(leader) - if (!leaderInfo.isDefined) - throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) - if (replicaInfo.size < replicas.size) - throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + - replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) - if (isrInfo.size < isr.size) - throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + - isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) - } catch { - case e: Throwable => - debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage)) - new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) - } + val topicResponses = metadataCache.getTopicMetadata(topics) + if (topics.size > 0 && topicResponses.size != topics.size) { + val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + val responsesForNonExistentTopics = nonExistentTopics.map { topic => + if (config.autoCreateTopicsEnable) { + try { + AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) + info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) + } catch { + case e: TopicExistsException => // let it go, possibly another broker created this topic } - topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) - } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) { - topicsToBeCreated += topic + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) } else { - topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) + new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode) } } + topicResponses.appendAll(responsesForNonExistentTopics) } - - topicResponses.appendAll(topicsToBeCreated.map { topic => - try { - if (topic == OffsetManager.OffsetsTopicName) - AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig) - else - AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor)) - } catch { - case e: TopicExistsException => // let it go, possibly another broker created this topic - } - new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode) - }) - topicResponses } diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala new file mode 100644 index 0000000..a8b7bf7 --- /dev/null +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import scala.collection.{Seq, Set, mutable} +import kafka.api._ +import kafka.cluster.Broker +import java.util.concurrent.locks.ReentrantReadWriteLock +import kafka.utils.Utils._ +import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException} +import kafka.common.TopicAndPartition +import kafka.controller.KafkaController.StateChangeLogger +import scala.Some + +/** + * A cache for the state (e.g., current leader) of each partition. This cache is updated through + * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. + */ +private[server] class MetadataCache { + private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] = + new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]() + private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]() + private val partitionMetadataLock = new ReentrantReadWriteLock() + + def getTopicMetadata(topics: Set[String]) = { + val isAllTopics = topics.isEmpty + val topicsRequested = if(isAllTopics) cache.keySet else topics + val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] + inLock(partitionMetadataLock.readLock()) { + for (topic <- topicsRequested) { + if (isAllTopics || cache.contains(topic)) { + val partitionStateInfos = cache(topic) + val partitionMetadata = partitionStateInfos.map { + case (partitionId, partitionState) => + val replicas = partitionState.allReplicas + val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq + var leaderInfo: Option[Broker] = None + var isrInfo: Seq[Broker] = Nil + val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch + val leader = leaderIsrAndEpoch.leaderAndIsr.leader + val isr = leaderIsrAndEpoch.leaderAndIsr.isr + val topicPartition = TopicAndPartition(topic, partitionId) + try { + leaderInfo = aliveBrokers.get(leader) + if (!leaderInfo.isDefined) + throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition)) + isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) + if (replicaInfo.size < replicas.size) + throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) + if (isrInfo.size < isr.size) + throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + + isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) + } catch { + case e: Throwable => + debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition, e.getMessage)) + new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + } + topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq) + } + } + } + topicResponses + } + + def addOrUpdatePartitionInfo(topic: String, + partitionId: Int, + stateInfo: PartitionStateInfo) { + inLock(partitionMetadataLock.writeLock()) { + cache.get(topic) match { + case Some(infos) => infos.put(partitionId, stateInfo) + case None => { + val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo] + cache.put(topic, newInfos) + newInfos.put(partitionId, stateInfo) + } + } + } + } + + def getPartitionInfos(topic: String) = { + inLock(partitionMetadataLock.readLock()) { + cache(topic) + } + } + + def containsTopicAndPartition(topic: String, + partitionId: Int): Boolean = { + inLock(partitionMetadataLock.readLock()) { + cache.get(topic) match { + case Some(partitionInfos) => partitionInfos.contains(partitionId) + case None => false + } + } + } + + def updateCache(updateMetadataRequest: UpdateMetadataRequest, + brokerId: Int, + stateChangeLogger: StateChangeLogger) { + inLock(partitionMetadataLock.writeLock()) { + updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b)) + updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) => + if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) { + removePartitionInfo(tp.topic, tp.partition) + stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } else { + addOrUpdatePartitionInfo(tp.topic, tp.partition, info) + stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " + + "sent by controller %d epoch %d with correlation id %d") + .format(brokerId, info, tp, updateMetadataRequest.controllerId, + updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId)) + } + } + } + } + + private def removePartitionInfo(topic: String, partitionId: Int) = { + cache.get(topic) match { + case Some(infos) => { + infos.remove(partitionId) + if(infos.isEmpty) { + cache.remove(topic) + } + true + } + case None => false + } + } +} + diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5588f59..11c20ce 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -23,16 +23,14 @@ import kafka.utils._ import kafka.log.LogManager import kafka.metrics.KafkaMetricsGroup import kafka.common._ -import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} +import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} import kafka.controller.KafkaController -import org.apache.log4j.Logger import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} import java.util.concurrent.TimeUnit - object ReplicaManager { val UnknownLogEndOffset = -1L val HighWatermarkFilename = "replication-offset-checkpoint" @@ -205,6 +203,22 @@ class ReplicaManager(val config: KafkaConfig, } } + def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) { + replicaStateChangeLock synchronized { + if(updateMetadataRequest.controllerEpoch < controllerEpoch) { + val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + + "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId, + updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, + controllerEpoch) + stateChangeLogger.warn(stateControllerEpochErrorMessage) + throw new ControllerMovedException(stateControllerEpochErrorMessage) + } else { + metadataCache.updateCache(updateMetadataRequest, localBrokerId, stateChangeLogger) + controllerEpoch = updateMetadataRequest.controllerEpoch + } + } + } + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 17b08e1..b1c4ce9 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -96,7 +96,7 @@ class SimpleFetchTest extends JUnit3Suite { val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) + apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log val goodFetch = new FetchRequestBuilder() @@ -169,7 +169,7 @@ class SimpleFetchTest extends JUnit3Suite { val requestChannel = new RequestChannel(2, 5) val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller) val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo]) - apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo) + apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo) EasyMock.replay(partitionStateInfo) /**