From b204fae29c9e85174a6d5b8a3a49b663ede28d8e Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 11 Mar 2015 18:35:53 -0700 Subject: [PATCH 1/6] Patch for KAFKA-1546 --- core/src/main/scala/kafka/cluster/Partition.scala | 45 +++++++------- core/src/main/scala/kafka/cluster/Replica.scala | 18 +++++- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +--- .../main/scala/kafka/server/ReplicaManager.scala | 47 ++++++++++----- .../unit/kafka/server/ISRExpirationTest.scala | 70 ++++++++++++++++++---- .../kafka/server/KafkaConfigConfigDefTest.scala | 2 - .../scala/unit/kafka/server/LogRecoveryTest.scala | 1 - .../scala/unit/kafka/server/SimpleFetchTest.scala | 9 ++- 9 files changed, 139 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c4bf48a..69b8bfc 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,7 @@ import kafka.utils.Utils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, ReplicaManager} +import kafka.server._ import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -32,6 +32,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.immutable.Set import com.yammer.metrics.core.Gauge +import scala.Some +import kafka.server.TopicPartitionOperationKey +import kafka.common.TopicAndPartition /** @@ -51,6 +54,10 @@ class Partition(val topic: String, @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 @volatile var leaderReplicaIdOpt: Option[Int] = None @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica] + + // What is the lag begin time for each replica + @volatile var lagBegin : Map[Replica, Int] = Map() + /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. * One way of doing that is through the controller's start replica state change command. When a new broker starts up * the controller sends it a start replica command containing the leader for each partition that the broker hosts. @@ -182,7 +189,7 @@ class Partition(val topic: String, val newLeaderReplica = getReplica().get newLeaderReplica.convertHWToLocalOffsetMetadata() // reset log end offset for remote replicas - assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata) + assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(newLeaderReplica) if (topic == OffsetManager.OffsetsTopicName) @@ -234,21 +241,20 @@ class Partition(val topic: String, /** * Update the log end offset of a certain replica of this partition */ - def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) { + def updateReplicaLEO(replicaId: Int, logReadResult: LogReadResult) { getReplica(replicaId) match { case Some(replica) => - replica.logEndOffset = offset - + replica.updateLogReadResult(logReadResult) // check if we need to expand ISR to include this replica // if it is not in the ISR yet maybeExpandIsr(replicaId) debug("Recorded replica %d log end offset (LEO) position %d for partition %s." - .format(replicaId, offset.messageOffset, TopicAndPartition(topic, partitionId))) + .format(replicaId, logReadResult.info.fetchOffset.messageOffset, TopicAndPartition(topic, partitionId))) case None => throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, - offset.messageOffset, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + logReadResult.info.fetchOffset, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) } } @@ -263,22 +269,17 @@ class Partition(val topic: String, leaderReplicaIfLocal() match { case Some(leaderReplica) => val replica = getReplica(replicaId).get - val leaderHW = leaderReplica.highWatermark - // For a replica to get added back to ISR, it has to satisfy 3 conditions- - // 1. It is not already in the ISR - // 2. It is part of the assigned replica list. See KAFKA-1097 - // 3. It's log end offset >= leader's high watermark - if (!inSyncReplicas.contains(replica) && - assignedReplicas.map(_.brokerId).contains(replicaId) && - replica.logEndOffset.offsetDiff(leaderHW) >= 0) { - // expand ISR + if(! inSyncReplicas.contains(replica) && + assignedReplicas.map(_.brokerId).contains(replicaId) && + replica.lagBeginTimeMs == -1) { val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" - .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) + .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } + // check if the HW of the partition can now be incremented // since the replica maybe now be in the ISR and its LEO has just incremented maybeIncrementLeaderHW(leaderReplica) @@ -353,11 +354,11 @@ class Partition(val topic: String, } } - def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { + def maybeShrinkIsr(replicaMaxLagTimeMs: Long) { inWriteLock(leaderIsrUpdateLock) { leaderReplicaIfLocal() match { case Some(leaderReplica) => - val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) + val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs) if(outOfSyncReplicas.size > 0) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.size > 0) @@ -374,7 +375,7 @@ class Partition(val topic: String, } } - def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = { + def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long): Set[Replica] = { /** * there are two cases that need to be handled here - * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms, @@ -389,9 +390,7 @@ class Partition(val topic: String, if(stuckReplicas.size > 0) debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above - val slowReplicas = candidateReplicas.filter(r => - r.logEndOffset.messageOffset >= 0 && - leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages) + val slowReplicas = candidateReplicas.filter(r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > keepInSyncTimeMs)) if(slowReplicas.size > 0) debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index bd13c20..b794607 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -19,7 +19,7 @@ package kafka.cluster import kafka.log.Log import kafka.utils.{SystemTime, Time, Logging} -import kafka.server.LogOffsetMetadata +import kafka.server.{LogReadResult, LogOffsetMetadata} import kafka.common.KafkaException import java.util.concurrent.atomic.AtomicLong @@ -47,7 +47,21 @@ class Replica(val brokerId: Int, } } - def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { + private[this] val lagBeginValue = new AtomicLong(-1) + + def lagBeginTimeMs = lagBeginValue.get() + + def updateLogReadResult(logReadResult : LogReadResult) { + logEndOffset = logReadResult.info.fetchOffset + val readToEndOfLog = logReadResult.initialLogEndOffset.messageOffset - logReadResult.info.fetchOffset.messageOffset <= 0 + if(! readToEndOfLog) { + lagBeginValue.compareAndSet(-1, time.milliseconds) + } else { + lagBeginValue.set(-1) + } + } + + private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { if (isLocal) { throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId)) } else { diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 06b8ecc..3b14af3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -826,7 +826,7 @@ object Log { nf.setGroupingUsed(false) nf.format(offset) } - + /** * Construct a log file name in the given dir with the given base offset * @param dir The directory in which the log will reside diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 46d21c7..9d8668d 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -87,7 +87,6 @@ object Defaults { val ControllerMessageQueueSize = Int.MaxValue val DefaultReplicationFactor = 1 val ReplicaLagTimeMaxMs = 10000L - val ReplicaLagMaxMessages = 4000 val ReplicaSocketTimeoutMs = ConsumerConfig.SocketTimeout val ReplicaSocketReceiveBufferBytes = ConsumerConfig.SocketBufferSize val ReplicaFetchMaxBytes = ConsumerConfig.FetchSize @@ -194,7 +193,6 @@ object KafkaConfig { val ControllerMessageQueueSizeProp = "controller.message.queue.size" val DefaultReplicationFactorProp = "default.replication.factor" val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms" - val ReplicaLagMaxMessagesProp = "replica.lag.max.messages" val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms" val ReplicaSocketReceiveBufferBytesProp = "replica.socket.receive.buffer.bytes" val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes" @@ -303,8 +301,8 @@ object KafkaConfig { val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels" val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" val DefaultReplicationFactorDoc = "default replication factors for automatically created topics" - val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests during this time, the leader will remove the follower from isr" - val ReplicaLagMaxMessagesDoc = "If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr" + val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasnt consumed upto the leaders log end offset during this time," + + " the leader will remove the follower from isr" val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests" val ReplicaFetchMaxBytesDoc = "The number of byes of messages to attempt to fetch" @@ -427,7 +425,6 @@ object KafkaConfig { .define(ControllerMessageQueueSizeProp, INT, Defaults.ControllerMessageQueueSize, MEDIUM, ControllerMessageQueueSizeDoc) .define(DefaultReplicationFactorProp, INT, Defaults.DefaultReplicationFactor, MEDIUM, DefaultReplicationFactorDoc) .define(ReplicaLagTimeMaxMsProp, LONG, Defaults.ReplicaLagTimeMaxMs, HIGH, ReplicaLagTimeMaxMsDoc) - .define(ReplicaLagMaxMessagesProp, LONG, Defaults.ReplicaLagMaxMessages, HIGH, ReplicaLagMaxMessagesDoc) .define(ReplicaSocketTimeoutMsProp, INT, Defaults.ReplicaSocketTimeoutMs, HIGH, ReplicaSocketTimeoutMsDoc) .define(ReplicaSocketReceiveBufferBytesProp, INT, Defaults.ReplicaSocketReceiveBufferBytes, HIGH, ReplicaSocketReceiveBufferBytesDoc) .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, HIGH, ReplicaFetchMaxBytesDoc) @@ -546,7 +543,6 @@ object KafkaConfig { controllerMessageQueueSize = parsed.get(ControllerMessageQueueSizeProp).asInstanceOf[Int], defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int], replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long], - replicaLagMaxMessages = parsed.get(ReplicaLagMaxMessagesProp).asInstanceOf[Long], replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int], replicaSocketReceiveBufferBytes = parsed.get(ReplicaSocketReceiveBufferBytesProp).asInstanceOf[Int], replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int], @@ -687,7 +683,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val controllerMessageQueueSize: Int = Defaults.ControllerMessageQueueSize, val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor, val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs, - val replicaLagMaxMessages: Long = Defaults.ReplicaLagMaxMessages, val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs, val replicaSocketReceiveBufferBytes: Int = Defaults.ReplicaSocketReceiveBufferBytes, val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes, @@ -856,7 +851,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(ControllerMessageQueueSizeProp, controllerMessageQueueSize.toString) props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString) props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) - props.put(ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString) props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString) props.put(ReplicaSocketReceiveBufferBytesProp, replicaSocketReceiveBufferBytes.toString) props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index c527482..f9d1619 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -52,12 +52,26 @@ case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) /* * Result metadata of a log read operation on the log + * @param info @FetchDataInfo returned by the @Log read + * @param hw high watermark of the local replica + * @param readSize amount of data that was read from the log i.e. size of the fetch + * @param initialLogEndOffset @LogOffsetMetadata before the actual read to the log + * @param error Exception if error encountered while reading from the log */ -case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, error: Option[Throwable] = None) { +case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, initialLogEndOffset : LogOffsetMetadata, error: Option[Throwable] = None) { + def errorCode = error match { case None => ErrorMapping.NoError case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) } + + override def toString = { + "Fetch Data: [%s], HW: [%d], readSize: [%d], initialLogEndOffset: [%s], error: [%s]".format(info, hw, readSize, initialLogEndOffset, error) + } +} + +object LogReadResult { + val UnknownLogReadResult = LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, -1, LogOffsetMetadata.UnknownOffsetMetadata) } object ReplicaManager { @@ -406,7 +420,7 @@ class ReplicaManager(val config: KafkaConfig, // if the fetch comes from the follower, // update its corresponding log end offset if(Request.isValidBrokerId(replicaId)) - updateFollowerLEOs(replicaId, logReadResults.mapValues(_.info.fetchOffset)) + updateFollowerLEOs(replicaId, logReadResults) // check if this fetch request can be satisfied right away val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum @@ -466,7 +480,12 @@ class ReplicaManager(val config: KafkaConfig, else None - // read on log + /* Read the LogOffsetMetadata prior to performing the read from the log. We use the LogOffsetMetadata to determine if a particular replica is in-sync or not. + * Using the log end offset after performing the read can lead to a race condition where data gets appended to the log immediately after the replica has consumed from it + * This can cause a replica to always be out of sync. + */ + + val initialLogEndOffset = localReplica.logEndOffset val logReadInfo = localReplica.log match { case Some(log) => log.read(offset, fetchSize, maxOffsetOpt) @@ -475,23 +494,23 @@ class ReplicaManager(val config: KafkaConfig, FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) } - LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, None) + LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, initialLogEndOffset, None) } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it // is supposed to indicate un-expected failure of a broker in handling a fetch request case utpe: UnknownTopicOrPartitionException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(utpe)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(utpe)) case nle: NotLeaderForPartitionException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(nle)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(nle)) case rnae: ReplicaNotAvailableException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(rnae)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(rnae)) case oor : OffsetOutOfRangeException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(oor)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(oor)) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic, partition, offset)) - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(e)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(e)) } (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) } @@ -748,15 +767,15 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) + allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs)) } - private def updateFollowerLEOs(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { - debug("Recording follower broker %d log end offsets: %s ".format(replicaId, offsets)) - offsets.foreach { case (topicAndPartition, offset) => + private def updateFollowerLEOs(replicaId: Int, readResults: Map[TopicAndPartition, LogReadResult]) { + debug("Recording follower broker %d log read results: %s ".format(replicaId, readResults)) + readResults.foreach { case (topicAndPartition, readResult) => getPartition(topicAndPartition.topic, topicAndPartition.partition) match { case Some(partition) => - partition.updateReplicaLEO(replicaId, offset) + partition.updateReplicaLEO(replicaId, readResult) // for producer requests with ack > 1, we need to check // if they can be unblocked after some follower's log end offsets have moved diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 9215235..e195dbe 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -27,18 +27,18 @@ import kafka.log.Log import org.junit.Assert._ import kafka.utils._ import java.util.concurrent.atomic.AtomicBoolean +import kafka.message.MessageSet + class IsrExpirationTest extends JUnit3Suite { var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() val replicaLagTimeMaxMs = 100L val replicaFetchWaitMaxMs = 100 - val replicaLagMaxMessages = 10L val overridingProps = new Properties() overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) - overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString) val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps)) val topic = "foo" @@ -56,6 +56,9 @@ class IsrExpirationTest extends JUnit3Suite { super.tearDown() } + /* + * Test the case where a follower is caught up but stops making requests to the leader. Once beyond the configured time limit, it should fall out of ISR + */ def testIsrExpirationForStuckFollowers() { val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L @@ -65,8 +68,9 @@ class IsrExpirationTest extends JUnit3Suite { val leaderReplica = partition0.getReplica(configs.head.brokerId).get // let the follower catch up to 10 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(10L)) - var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) + (partition0.assignedReplicas() - leaderReplica).foreach( + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) // let some time pass @@ -74,26 +78,70 @@ class IsrExpirationTest extends JUnit3Suite { // now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND the follower hasn't // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck - partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) + assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) + EasyMock.verify(log) + } + + /* + * Test the case where a follower never makes a fetch request. It should fall out of ISR because it will be declared stuck + */ + def testIsrExpirationIfNoFetchRequestMade() { + val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L + + // create one partition and all replicas + val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log) + assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) + val leaderReplica = partition0.getReplica(configs.head.brokerId).get + + // Let enough time pass for the replica to be considered stuck + time.sleep(150) + + val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) EasyMock.verify(log) } + /* + * Test the case where a follower continually makes fetch requests but is unable to catch up. It should fall out of the ISR + * However, any time it makes a request to the LogEndOffset it should be back in the ISR + */ def testIsrExpirationForSlowFollowers() { // create leader replica - val log = getLogWithLogEndOffset(15L, 1) + val log = getLogWithLogEndOffset(15L, 4) // add one partition val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log) assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get - // set remote replicas leo to something low, like 4 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(4L)) - // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than - // replicaMaxLagBytes, the follower is out of sync. - val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) + // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms + (partition0.assignedReplicas() - leaderReplica).foreach( + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + + // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. + // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck + var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) + assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) + + time.sleep(75) + + (partition0.assignedReplicas() - leaderReplica).foreach( + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) + assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) + + time.sleep(75) + + // The replicas will no longer be in ISR + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) + // Now actually make a fetch to the end of the log. The replicas should be back in ISR + (partition0.assignedReplicas() - leaderReplica).foreach( + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) + assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) + EasyMock.verify(log) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 191251d..150c311 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -114,7 +114,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.controllerMessageQueueSize, actualConfig.controllerMessageQueueSize) Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor) Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs) - Assert.assertEquals(expectedConfig.replicaLagMaxMessages, actualConfig.replicaLagMaxMessages) Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs) Assert.assertEquals(expectedConfig.replicaSocketReceiveBufferBytes, actualConfig.replicaSocketReceiveBufferBytes) Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes) @@ -311,7 +310,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.ControllerMessageQueueSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaLagMaxMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 92d6b2c..de255e3 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -40,7 +40,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val overridingProps = new Properties() overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) - overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString) overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index efb4573..07cfc4c 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -21,7 +21,7 @@ import kafka.utils._ import kafka.cluster.Replica import kafka.common.TopicAndPartition import kafka.log.Log -import kafka.message.{ByteBufferMessageSet, Message} +import kafka.message.{MessageSet, ByteBufferMessageSet, Message} import scala.Some import java.util.{Properties, Collections} @@ -42,7 +42,6 @@ class SimpleFetchTest extends JUnit3Suite { val overridingProps = new Properties() overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) - overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString) val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps)) @@ -78,6 +77,7 @@ class SimpleFetchTest extends JUnit3Suite { // create the log which takes read with either HW max offset or none max offset val log = EasyMock.createMock(classOf[Log]) EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes() + EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes() EasyMock.expect(log.read(0, fetchSize, Some(partitionHW))).andReturn( new FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), @@ -108,7 +108,9 @@ class SimpleFetchTest extends JUnit3Suite { // create the follower replica with defined log end offset val followerReplica= new Replica(configs(1).brokerId, partition, time) - followerReplica.logEndOffset = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) + val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) + followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MessageSet.Empty), -1L, -1, leo)) + //followerReplica.logEndOffset = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica) @@ -140,6 +142,7 @@ class SimpleFetchTest extends JUnit3Suite { def testReadFromLog() { val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count(); val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count(); + LogReadResult.UnknownLogReadResult.toString assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW, replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) -- 1.7.12.4 From ece0924738b06a25bfedc9421fbbdbaf2c7db49a Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 11 Mar 2015 18:45:54 -0700 Subject: [PATCH 2/6] PATCH for KAFKA-1546 --- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- core/src/main/scala/kafka/log/Log.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 69b8bfc..36498d0 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,7 @@ import kafka.utils.Utils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server._ +import kafka.server.{ReplicaManager, LogOffsetMetadata, OffsetManager, LogReadResult} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 3b14af3..06b8ecc 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -826,7 +826,7 @@ object Log { nf.setGroupingUsed(false) nf.format(offset) } - + /** * Construct a log file name in the given dir with the given base offset * @param dir The directory in which the log will reside -- 1.7.12.4 From e00aa8969c23d1636ddec8958e26ac7e535f1674 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 12 Mar 2015 13:40:50 -0700 Subject: [PATCH 3/6] PATCH for KAFKA-1546 Brief summary of changes: - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO - Using lag begin value in the check for ISR expand and shrink - Removed the max lag messages config since it is no longer necessary - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log. - Unit test cases to test ISR shrinkage and expansion --- core/src/main/scala/kafka/cluster/Partition.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 36498d0..9562abf 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -55,9 +55,6 @@ class Partition(val topic: String, @volatile var leaderReplicaIdOpt: Option[Int] = None @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica] - // What is the lag begin time for each replica - @volatile var lagBegin : Map[Replica, Int] = Map() - /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. * One way of doing that is through the controller's start replica state change command. When a new broker starts up * the controller sends it a start replica command containing the leader for each partition that the broker hosts. -- 1.7.12.4 From 304b53d2a0c24e4ac552063f14dd3b03338e89d5 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 16 Mar 2015 11:31:02 -0700 Subject: [PATCH 4/6] Updated KAFKA-1546 patch to reflect Neha and Jun's comments --- core/src/main/scala/kafka/cluster/Partition.scala | 38 +++++++++++++--------- core/src/main/scala/kafka/cluster/Replica.scala | 18 +++++++--- core/src/main/scala/kafka/log/Log.scala | 2 +- .../main/scala/kafka/server/FetchDataInfo.scala | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- .../main/scala/kafka/server/ReplicaManager.scala | 24 ++++++++++---- .../unit/kafka/server/ISRExpirationTest.scala | 11 ++++--- .../scala/unit/kafka/server/SimpleFetchTest.scala | 2 -- 8 files changed, 64 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9562abf..e8cbcad 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,8 @@ import kafka.utils.Utils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{ReplicaManager, LogOffsetMetadata, OffsetManager, LogReadResult} +import kafka.server.{TopicPartitionOperationKey, ReplicaManager} +import kafka.server.{LogOffsetMetadata, OffsetManager, LogReadResult} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -32,10 +33,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.immutable.Set import com.yammer.metrics.core.Gauge -import scala.Some -import kafka.server.TopicPartitionOperationKey -import kafka.common.TopicAndPartition - /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR @@ -186,7 +183,8 @@ class Partition(val topic: String, val newLeaderReplica = getReplica().get newLeaderReplica.convertHWToLocalOffsetMetadata() // reset log end offset for remote replicas - assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) + assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) + r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(newLeaderReplica) if (topic == OffsetManager.OffsetsTopicName) @@ -247,11 +245,18 @@ class Partition(val topic: String, maybeExpandIsr(replicaId) debug("Recorded replica %d log end offset (LEO) position %d for partition %s." - .format(replicaId, logReadResult.info.fetchOffset.messageOffset, TopicAndPartition(topic, partitionId))) + .format(replicaId, + logReadResult.info.fetchOffsetMetadata.messageOffset, + TopicAndPartition(topic, partitionId))) case None => throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + - " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, - logReadResult.info.fetchOffset, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]") + .format(localBrokerId, + replicaId, + logReadResult.info.fetchOffsetMetadata, + assignedReplicas().map(_.brokerId).mkString(","), + topic, + partitionId)) } } @@ -266,12 +271,14 @@ class Partition(val topic: String, leaderReplicaIfLocal() match { case Some(leaderReplica) => val replica = getReplica(replicaId).get - if(! inSyncReplicas.contains(replica) && + val leaderHW = leaderReplica.highWatermark + if(!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && - replica.lagBeginTimeMs == -1) { + replica.logEndOffset.offsetDiff(leaderHW) >= 0) { val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" - .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) + .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), + newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() @@ -377,8 +384,8 @@ class Partition(val topic: String, * there are two cases that need to be handled here - * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms, * the follower is stuck and should be removed from the ISR - * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the - * follower is not catching up and should be removed from the ISR + * 2. Slow followers: If the replica has not read from the LEO of the leader for keepInSyncTimeMs ms, + * follower is deemed slow and should be removed from the ISR **/ val leaderLogEndOffset = leaderReplica.logEndOffset val candidateReplicas = inSyncReplicas - leaderReplica @@ -387,7 +394,8 @@ class Partition(val topic: String, if(stuckReplicas.size > 0) debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above - val slowReplicas = candidateReplicas.filter(r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > keepInSyncTimeMs)) + val slowReplicas = candidateReplicas.filter( + r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > keepInSyncTimeMs)) if(slowReplicas.size > 0) debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index b794607..214de8a 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -47,14 +47,24 @@ class Replica(val brokerId: Int, } } - private[this] val lagBeginValue = new AtomicLong(-1) + private[this] val lagBeginValue = new AtomicLong(time.milliseconds) def lagBeginTimeMs = lagBeginValue.get() def updateLogReadResult(logReadResult : LogReadResult) { - logEndOffset = logReadResult.info.fetchOffset - val readToEndOfLog = logReadResult.initialLogEndOffset.messageOffset - logReadResult.info.fetchOffset.messageOffset <= 0 - if(! readToEndOfLog) { + /* If the LogReadResult read from the LogEndOffset, set the lagBeginValue to -1 i.e. not lagging + * otherwise, start the lag counter if it is not -1 + * UnknownLogReadResult means that the request did not read from the end of log + */ + + logEndOffset = logReadResult.info.fetchOffsetMetadata + val readToEndOfLog = + if(logReadResult.equals(LogReadResult.UnknownLogReadResult)) + false + else + logReadResult.initialLogEndOffset.messageOffset - logReadResult.info.fetchOffsetMetadata.messageOffset <= 0 + + if(!readToEndOfLog) { lagBeginValue.compareAndSet(-1, time.milliseconds) } else { lagBeginValue.set(-1) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 06b8ecc..a0745be 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -470,7 +470,7 @@ class Log(val dir: File, def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { try { val fetchDataInfo = read(offset, 1) - fetchDataInfo.fetchOffset + fetchDataInfo.fetchOffsetMetadata } catch { case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata } diff --git a/core/src/main/scala/kafka/server/FetchDataInfo.scala b/core/src/main/scala/kafka/server/FetchDataInfo.scala index 26f278f..1a8a604 100644 --- a/core/src/main/scala/kafka/server/FetchDataInfo.scala +++ b/core/src/main/scala/kafka/server/FetchDataInfo.scala @@ -19,4 +19,4 @@ package kafka.server import kafka.message.MessageSet -case class FetchDataInfo(fetchOffset: LogOffsetMetadata, messageSet: MessageSet) +case class FetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata, messageSet: MessageSet) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9d8668d..d867d7a 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -301,7 +301,7 @@ object KafkaConfig { val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels" val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" val DefaultReplicationFactorDoc = "default replication factors for automatically created topics" - val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasnt consumed upto the leaders log end offset during this time," + + val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," + " the leader will remove the follower from isr" val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests" diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f9d1619..6203c33 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -58,7 +58,11 @@ case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) * @param initialLogEndOffset @LogOffsetMetadata before the actual read to the log * @param error Exception if error encountered while reading from the log */ -case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, initialLogEndOffset : LogOffsetMetadata, error: Option[Throwable] = None) { +case class LogReadResult(info: FetchDataInfo, + hw: Long, + readSize: Int, + initialLogEndOffset : LogOffsetMetadata, + error: Option[Throwable] = None) { def errorCode = error match { case None => ErrorMapping.NoError @@ -66,12 +70,17 @@ case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, initialLo } override def toString = { - "Fetch Data: [%s], HW: [%d], readSize: [%d], initialLogEndOffset: [%s], error: [%s]".format(info, hw, readSize, initialLogEndOffset, error) + "Fetch Data: [%s], HW: [%d], readSize: [%d], initialLogEndOffset: [%s], error: [%s]" + .format(info, hw, readSize, initialLogEndOffset, error) } } object LogReadResult { - val UnknownLogReadResult = LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, -1, LogOffsetMetadata.UnknownOffsetMetadata) + val UnknownLogReadResult = LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, + MessageSet.Empty), + -1L, + -1, + LogOffsetMetadata.UnknownOffsetMetadata) } object ReplicaManager { @@ -438,7 +447,7 @@ class ReplicaManager(val config: KafkaConfig, } else { // construct the fetch results from the read results val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) => - (topicAndPartition, FetchPartitionStatus(result.info.fetchOffset, fetchInfo.get(topicAndPartition).get)) + (topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo.get(topicAndPartition).get)) } val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback) @@ -480,11 +489,12 @@ class ReplicaManager(val config: KafkaConfig, else None - /* Read the LogOffsetMetadata prior to performing the read from the log. We use the LogOffsetMetadata to determine if a particular replica is in-sync or not. - * Using the log end offset after performing the read can lead to a race condition where data gets appended to the log immediately after the replica has consumed from it + /* Read the LogOffsetMetadata prior to performing the read from the log. + * We use the LogOffsetMetadata to determine if a particular replica is in-sync or not. + * Using the log end offset after performing the read can lead to a race condition + * where data gets appended to the log immediately after the replica has consumed from it * This can cause a replica to always be out of sync. */ - val initialLogEndOffset = localReplica.logEndOffset val logReadInfo = localReplica.log match { case Some(log) => diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index e195dbe..7861e27 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -67,17 +67,20 @@ class IsrExpirationTest extends JUnit3Suite { assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get - // let the follower catch up to 10 + // let the follower catch up to the Leader logEndOffset (15) (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), + MessageSet.Empty), + -1L, + -1, + new LogOffsetMetadata(15L)))) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) // let some time pass time.sleep(150) - // now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND the follower hasn't - // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck + // now follower hasn't pulled any data for > replicaMaxLagTimeMs ms. So it is stuck partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) EasyMock.verify(log) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 07cfc4c..5cb3cff 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -110,7 +110,6 @@ class SimpleFetchTest extends JUnit3Suite { val followerReplica= new Replica(configs(1).brokerId, partition, time) val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MessageSet.Empty), -1L, -1, leo)) - //followerReplica.logEndOffset = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica) @@ -142,7 +141,6 @@ class SimpleFetchTest extends JUnit3Suite { def testReadFromLog() { val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count(); val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count(); - LogReadResult.UnknownLogReadResult.toString assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW, replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) -- 1.7.12.4 From fe25e6f0f05ffc09ba403b2affbb147c17159dcc Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 17 Mar 2015 14:45:40 -0700 Subject: [PATCH 5/6] Addressing Joel's comments --- core/src/main/scala/kafka/cluster/Partition.scala | 23 +++++++++++----------- core/src/main/scala/kafka/cluster/Replica.scala | 12 +++++------ .../main/scala/kafka/server/ReplicaManager.scala | 6 +++--- .../unit/kafka/server/ISRExpirationTest.scala | 4 ++-- 4 files changed, 22 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e8cbcad..2417da1 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,8 +22,7 @@ import kafka.utils.Utils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{TopicPartitionOperationKey, ReplicaManager} -import kafka.server.{LogOffsetMetadata, OffsetManager, LogReadResult} +import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, LogReadResult, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -183,8 +182,8 @@ class Partition(val topic: String, val newLeaderReplica = getReplica().get newLeaderReplica.convertHWToLocalOffsetMetadata() // reset log end offset for remote replicas - assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) - r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) + assignedReplicas.foreach(r => + if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(newLeaderReplica) if (topic == OffsetManager.OffsetsTopicName) @@ -379,25 +378,25 @@ class Partition(val topic: String, } } - def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long): Set[Replica] = { + def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = { /** * there are two cases that need to be handled here - - * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms, + * 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms, * the follower is stuck and should be removed from the ISR - * 2. Slow followers: If the replica has not read from the LEO of the leader for keepInSyncTimeMs ms, - * follower is deemed slow and should be removed from the ISR + * 2. Slow followers: If the replica has not read up to the LEO within the last maxLagMs ms, + * then the follower is lagging and should be removed from the ISR **/ val leaderLogEndOffset = leaderReplica.logEndOffset val candidateReplicas = inSyncReplicas - leaderReplica // Case 1 above - val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs) + val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > maxLagMs) if(stuckReplicas.size > 0) - debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) + debug("Stuck replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above val slowReplicas = candidateReplicas.filter( - r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > keepInSyncTimeMs)) + r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > maxLagMs)) if(slowReplicas.size > 0) - debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) + debug("Slow replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas } diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 214de8a..f943310 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -47,12 +47,12 @@ class Replica(val brokerId: Int, } } - private[this] val lagBeginValue = new AtomicLong(time.milliseconds) + private[this] val lagBeginTimeMsUnderlying = new AtomicLong(time.milliseconds) - def lagBeginTimeMs = lagBeginValue.get() + def lagBeginTimeMs = lagBeginTimeMsUnderlying.get() def updateLogReadResult(logReadResult : LogReadResult) { - /* If the LogReadResult read from the LogEndOffset, set the lagBeginValue to -1 i.e. not lagging + /* If the request read up to the log end offset snapshot when the read was initiated, set the lagBeginValue to -1 i.e. not lagging * otherwise, start the lag counter if it is not -1 * UnknownLogReadResult means that the request did not read from the end of log */ @@ -62,12 +62,12 @@ class Replica(val brokerId: Int, if(logReadResult.equals(LogReadResult.UnknownLogReadResult)) false else - logReadResult.initialLogEndOffset.messageOffset - logReadResult.info.fetchOffsetMetadata.messageOffset <= 0 + logReadResult.logEndOffsetBeforeRead.messageOffset - logReadResult.info.fetchOffsetMetadata.messageOffset <= 0 if(!readToEndOfLog) { - lagBeginValue.compareAndSet(-1, time.milliseconds) + lagBeginTimeMsUnderlying.compareAndSet(-1, time.milliseconds) } else { - lagBeginValue.set(-1) + lagBeginTimeMsUnderlying.set(-1) } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6203c33..140307c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -55,13 +55,13 @@ case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) * @param info @FetchDataInfo returned by the @Log read * @param hw high watermark of the local replica * @param readSize amount of data that was read from the log i.e. size of the fetch - * @param initialLogEndOffset @LogOffsetMetadata before the actual read to the log + * @param logEndOffsetBeforeRead @LogOffsetMetadata before the actual read to the log * @param error Exception if error encountered while reading from the log */ case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, - initialLogEndOffset : LogOffsetMetadata, + logEndOffsetBeforeRead : LogOffsetMetadata, error: Option[Throwable] = None) { def errorCode = error match { @@ -71,7 +71,7 @@ case class LogReadResult(info: FetchDataInfo, override def toString = { "Fetch Data: [%s], HW: [%d], readSize: [%d], initialLogEndOffset: [%s], error: [%s]" - .format(info, hw, readSize, initialLogEndOffset, error) + .format(info, hw, readSize, logEndOffsetBeforeRead, error) } } diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 7861e27..ef58701 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -118,8 +118,8 @@ class IsrExpirationTest extends JUnit3Suite { val leaderReplica = partition0.getReplica(configs.head.brokerId).get // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms - (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + for(replica <- (partition0.assignedReplicas() - leaderReplica)) + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L))) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck -- 1.7.12.4 From 4b54956ad874d4f62e0442185b1461ac3eaff422 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 26 Mar 2015 17:43:51 -0700 Subject: [PATCH 6/6] Addressing Jun and Guozhang's comments --- core/src/main/scala/kafka/cluster/Partition.scala | 24 ++++++++--------- core/src/main/scala/kafka/cluster/Replica.scala | 31 +++++++--------------- .../main/scala/kafka/server/ReplicaManager.scala | 31 ++++++++++++---------- .../unit/kafka/server/ISRExpirationTest.scala | 8 +++--- .../scala/unit/kafka/server/SimpleFetchTest.scala | 2 +- 5 files changed, 43 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2417da1..e215962 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -235,7 +235,7 @@ class Partition(val topic: String, /** * Update the log end offset of a certain replica of this partition */ - def updateReplicaLEO(replicaId: Int, logReadResult: LogReadResult) { + def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) { getReplica(replicaId) match { case Some(replica) => replica.updateLogReadResult(logReadResult) @@ -380,24 +380,24 @@ class Partition(val topic: String, def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = { /** - * there are two cases that need to be handled here - + * there are two cases that will be handled here - * 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms, * the follower is stuck and should be removed from the ISR * 2. Slow followers: If the replica has not read up to the LEO within the last maxLagMs ms, * then the follower is lagging and should be removed from the ISR + * Both these cases are handled by checking the lastCaughtUpTimeMs which represents + * the last time when the replica was fully caught up. If either of the above conditions + * is violated, that replica is considered to be out of sync + * **/ val leaderLogEndOffset = leaderReplica.logEndOffset val candidateReplicas = inSyncReplicas - leaderReplica - // Case 1 above - val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > maxLagMs) - if(stuckReplicas.size > 0) - debug("Stuck replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), stuckReplicas.map(_.brokerId).mkString(","))) - // Case 2 above - val slowReplicas = candidateReplicas.filter( - r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > maxLagMs)) - if(slowReplicas.size > 0) - debug("Slow replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), slowReplicas.map(_.brokerId).mkString(","))) - stuckReplicas ++ slowReplicas + + val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs) + if(laggingReplicas.size > 0) + debug("Stuck replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(","))) + + laggingReplicas } def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = { diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index f943310..740e835 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -34,8 +34,6 @@ class Replica(val brokerId: Int, // the log end offset value, kept in all replicas; // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata - // the time when log offset is updated - private[this] val logEndOffsetUpdateTimeMsValue = new AtomicLong(time.milliseconds) val topic = partition.topic val partitionId = partition.partitionId @@ -47,27 +45,19 @@ class Replica(val brokerId: Int, } } - private[this] val lagBeginTimeMsUnderlying = new AtomicLong(time.milliseconds) + private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds) - def lagBeginTimeMs = lagBeginTimeMsUnderlying.get() + def lastCaughtUpTimeMs = lastCaughtUpTimeMsUnderlying.get() def updateLogReadResult(logReadResult : LogReadResult) { - /* If the request read up to the log end offset snapshot when the read was initiated, set the lagBeginValue to -1 i.e. not lagging - * otherwise, start the lag counter if it is not -1 - * UnknownLogReadResult means that the request did not read from the end of log - */ - logEndOffset = logReadResult.info.fetchOffsetMetadata - val readToEndOfLog = - if(logReadResult.equals(LogReadResult.UnknownLogReadResult)) - false - else - logReadResult.logEndOffsetBeforeRead.messageOffset - logReadResult.info.fetchOffsetMetadata.messageOffset <= 0 - - if(!readToEndOfLog) { - lagBeginTimeMsUnderlying.compareAndSet(-1, time.milliseconds) - } else { - lagBeginTimeMsUnderlying.set(-1) + + /* If the request read up to the log end offset snapshot when the read was initiated, + * set the lastCaughtUpTimeMsUnderlying to the current time. + * This means that the replica is fully caught up. + */ + if(logReadResult.isReadFromLogEnd) { + lastCaughtUpTimeMsUnderlying.set(time.milliseconds) } } @@ -76,7 +66,6 @@ class Replica(val brokerId: Int, throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId)) } else { logEndOffsetMetadata = newLogEndOffset - logEndOffsetUpdateTimeMsValue.set(time.milliseconds) trace("Setting log end offset for replica %d for partition [%s,%d] to [%s]" .format(brokerId, topic, partitionId, logEndOffsetMetadata)) } @@ -88,8 +77,6 @@ class Replica(val brokerId: Int, else logEndOffsetMetadata - def logEndOffsetUpdateTimeMs = logEndOffsetUpdateTimeMsValue.get() - def highWatermark_=(newHighWatermark: LogOffsetMetadata) { if (isLocal) { highWatermarkMetadata = newHighWatermark diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 140307c..a8801b2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -55,13 +55,14 @@ case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) * @param info @FetchDataInfo returned by the @Log read * @param hw high watermark of the local replica * @param readSize amount of data that was read from the log i.e. size of the fetch - * @param logEndOffsetBeforeRead @LogOffsetMetadata before the actual read to the log + * @param isReadFromLogEnd true if the request read up to the log end offset snapshot + * when the read was initiated, false otherwise * @param error Exception if error encountered while reading from the log */ case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, - logEndOffsetBeforeRead : LogOffsetMetadata, + isReadFromLogEnd : Boolean, error: Option[Throwable] = None) { def errorCode = error match { @@ -70,8 +71,8 @@ case class LogReadResult(info: FetchDataInfo, } override def toString = { - "Fetch Data: [%s], HW: [%d], readSize: [%d], initialLogEndOffset: [%s], error: [%s]" - .format(info, hw, readSize, logEndOffsetBeforeRead, error) + "Fetch Data: [%s], HW: [%d], readSize: [%d], isReadFromLogEnd: [%b], error: [%s]" + .format(info, hw, readSize, isReadFromLogEnd, error) } } @@ -80,7 +81,7 @@ object LogReadResult { MessageSet.Empty), -1L, -1, - LogOffsetMetadata.UnknownOffsetMetadata) + false) } object ReplicaManager { @@ -429,7 +430,7 @@ class ReplicaManager(val config: KafkaConfig, // if the fetch comes from the follower, // update its corresponding log end offset if(Request.isValidBrokerId(replicaId)) - updateFollowerLEOs(replicaId, logReadResults) + updateFollowerLogReadResults(replicaId, logReadResults) // check if this fetch request can be satisfied right away val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum @@ -504,23 +505,25 @@ class ReplicaManager(val config: KafkaConfig, FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) } - LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, initialLogEndOffset, None) + val readToEndOfLog = initialLogEndOffset.messageOffset - logReadInfo.fetchOffsetMetadata.messageOffset <= 0 + + LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, readToEndOfLog, None) } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it // is supposed to indicate un-expected failure of a broker in handling a fetch request case utpe: UnknownTopicOrPartitionException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(utpe)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(utpe)) case nle: NotLeaderForPartitionException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(nle)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(nle)) case rnae: ReplicaNotAvailableException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(rnae)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(rnae)) case oor : OffsetOutOfRangeException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(oor)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(oor)) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic, partition, offset)) - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(e)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(e)) } (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) } @@ -780,12 +783,12 @@ class ReplicaManager(val config: KafkaConfig, allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs)) } - private def updateFollowerLEOs(replicaId: Int, readResults: Map[TopicAndPartition, LogReadResult]) { + private def updateFollowerLogReadResults(replicaId: Int, readResults: Map[TopicAndPartition, LogReadResult]) { debug("Recording follower broker %d log read results: %s ".format(replicaId, readResults)) readResults.foreach { case (topicAndPartition, readResult) => getPartition(topicAndPartition.topic, topicAndPartition.partition) match { case Some(partition) => - partition.updateReplicaLEO(replicaId, readResult) + partition.updateReplicaLogReadResult(replicaId, readResult) // for producer requests with ack > 1, we need to check // if they can be unblocked after some follower's log end offsets have moved diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index ef58701..c1d168a 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -73,7 +73,7 @@ class IsrExpirationTest extends JUnit3Suite { MessageSet.Empty), -1L, -1, - new LogOffsetMetadata(15L)))) + true))) var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -119,7 +119,7 @@ class IsrExpirationTest extends JUnit3Suite { // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms for(replica <- (partition0.assignedReplicas() - leaderReplica)) - replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L))) + replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MessageSet.Empty), -1L, -1, false)) // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck @@ -129,7 +129,7 @@ class IsrExpirationTest extends JUnit3Suite { time.sleep(75) (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MessageSet.Empty), -1L, -1, false))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) @@ -141,7 +141,7 @@ class IsrExpirationTest extends JUnit3Suite { // Now actually make a fetch to the end of the log. The replicas should be back in ISR (partition0.assignedReplicas() - leaderReplica).foreach( - r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty), -1L, -1, true))) partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 5cb3cff..519888e 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -109,7 +109,7 @@ class SimpleFetchTest extends JUnit3Suite { // create the follower replica with defined log end offset val followerReplica= new Replica(configs(1).brokerId, partition, time) val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) - followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MessageSet.Empty), -1L, -1, leo)) + followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MessageSet.Empty), -1L, -1, true)) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica) -- 1.7.12.4