From 196a43b563727189590435d0e247a999fe3984aa Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 11 Mar 2015 18:35:53 -0700 Subject: [PATCH] Patch for KAFKA-1546 --- core/src/main/scala/kafka/cluster/Partition.scala | 45 +++++++------- core/src/main/scala/kafka/cluster/Replica.scala | 18 +++++- core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +--- .../main/scala/kafka/server/ReplicaManager.scala | 47 ++++++++++----- .../unit/kafka/server/ISRExpirationTest.scala | 70 ++++++++++++++++++---- .../kafka/server/KafkaConfigConfigDefTest.scala | 2 - .../scala/unit/kafka/server/LogRecoveryTest.scala | 1 - .../scala/unit/kafka/server/SimpleFetchTest.scala | 9 ++- 9 files changed, 139 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index c4bf48a..69b8bfc 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,7 @@ import kafka.utils.Utils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, ReplicaManager} +import kafka.server._ import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -32,6 +32,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.immutable.Set import com.yammer.metrics.core.Gauge +import scala.Some +import kafka.server.TopicPartitionOperationKey +import kafka.common.TopicAndPartition /** @@ -51,6 +54,10 @@ class Partition(val topic: String, @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 @volatile var leaderReplicaIdOpt: Option[Int] = None @volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica] + + // What is the lag begin time for each replica + @volatile var lagBegin : Map[Replica, Int] = Map() + /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. * One way of doing that is through the controller's start replica state change command. When a new broker starts up * the controller sends it a start replica command containing the leader for each partition that the broker hosts. @@ -182,7 +189,7 @@ class Partition(val topic: String, val newLeaderReplica = getReplica().get newLeaderReplica.convertHWToLocalOffsetMetadata() // reset log end offset for remote replicas - assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata) + assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(newLeaderReplica) if (topic == OffsetManager.OffsetsTopicName) @@ -234,21 +241,20 @@ class Partition(val topic: String, /** * Update the log end offset of a certain replica of this partition */ - def updateReplicaLEO(replicaId: Int, offset: LogOffsetMetadata) { + def updateReplicaLEO(replicaId: Int, logReadResult: LogReadResult) { getReplica(replicaId) match { case Some(replica) => - replica.logEndOffset = offset - + replica.updateLogReadResult(logReadResult) // check if we need to expand ISR to include this replica // if it is not in the ISR yet maybeExpandIsr(replicaId) debug("Recorded replica %d log end offset (LEO) position %d for partition %s." - .format(replicaId, offset.messageOffset, TopicAndPartition(topic, partitionId))) + .format(replicaId, logReadResult.info.fetchOffset.messageOffset, TopicAndPartition(topic, partitionId))) case None => throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, - offset.messageOffset, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + logReadResult.info.fetchOffset, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) } } @@ -263,22 +269,17 @@ class Partition(val topic: String, leaderReplicaIfLocal() match { case Some(leaderReplica) => val replica = getReplica(replicaId).get - val leaderHW = leaderReplica.highWatermark - // For a replica to get added back to ISR, it has to satisfy 3 conditions- - // 1. It is not already in the ISR - // 2. It is part of the assigned replica list. See KAFKA-1097 - // 3. It's log end offset >= leader's high watermark - if (!inSyncReplicas.contains(replica) && - assignedReplicas.map(_.brokerId).contains(replicaId) && - replica.logEndOffset.offsetDiff(leaderHW) >= 0) { - // expand ISR + if(! inSyncReplicas.contains(replica) && + assignedReplicas.map(_.brokerId).contains(replicaId) && + replica.lagBeginTimeMs == -1) { val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" - .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) + .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache updateIsr(newInSyncReplicas) replicaManager.isrExpandRate.mark() } + // check if the HW of the partition can now be incremented // since the replica maybe now be in the ISR and its LEO has just incremented maybeIncrementLeaderHW(leaderReplica) @@ -353,11 +354,11 @@ class Partition(val topic: String, } } - def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) { + def maybeShrinkIsr(replicaMaxLagTimeMs: Long) { inWriteLock(leaderIsrUpdateLock) { leaderReplicaIfLocal() match { case Some(leaderReplica) => - val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages) + val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs) if(outOfSyncReplicas.size > 0) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.size > 0) @@ -374,7 +375,7 @@ class Partition(val topic: String, } } - def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = { + def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long): Set[Replica] = { /** * there are two cases that need to be handled here - * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms, @@ -389,9 +390,7 @@ class Partition(val topic: String, if(stuckReplicas.size > 0) debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above - val slowReplicas = candidateReplicas.filter(r => - r.logEndOffset.messageOffset >= 0 && - leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages) + val slowReplicas = candidateReplicas.filter(r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > keepInSyncTimeMs)) if(slowReplicas.size > 0) debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(","))) stuckReplicas ++ slowReplicas diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index bd13c20..b794607 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -19,7 +19,7 @@ package kafka.cluster import kafka.log.Log import kafka.utils.{SystemTime, Time, Logging} -import kafka.server.LogOffsetMetadata +import kafka.server.{LogReadResult, LogOffsetMetadata} import kafka.common.KafkaException import java.util.concurrent.atomic.AtomicLong @@ -47,7 +47,21 @@ class Replica(val brokerId: Int, } } - def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { + private[this] val lagBeginValue = new AtomicLong(-1) + + def lagBeginTimeMs = lagBeginValue.get() + + def updateLogReadResult(logReadResult : LogReadResult) { + logEndOffset = logReadResult.info.fetchOffset + val readToEndOfLog = logReadResult.initialLogEndOffset.messageOffset - logReadResult.info.fetchOffset.messageOffset <= 0 + if(! readToEndOfLog) { + lagBeginValue.compareAndSet(-1, time.milliseconds) + } else { + lagBeginValue.set(-1) + } + } + + private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { if (isLocal) { throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId)) } else { diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 06b8ecc..3b14af3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -826,7 +826,7 @@ object Log { nf.setGroupingUsed(false) nf.format(offset) } - + /** * Construct a log file name in the given dir with the given base offset * @param dir The directory in which the log will reside diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 48e3362..e56f3d7 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -87,7 +87,6 @@ object Defaults { val ControllerMessageQueueSize = Int.MaxValue val DefaultReplicationFactor = 1 val ReplicaLagTimeMaxMs = 10000L - val ReplicaLagMaxMessages = 4000 val ReplicaSocketTimeoutMs = ConsumerConfig.SocketTimeout val ReplicaSocketReceiveBufferBytes = ConsumerConfig.SocketBufferSize val ReplicaFetchMaxBytes = ConsumerConfig.FetchSize @@ -193,7 +192,6 @@ object KafkaConfig { val ControllerMessageQueueSizeProp = "controller.message.queue.size" val DefaultReplicationFactorProp = "default.replication.factor" val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms" - val ReplicaLagMaxMessagesProp = "replica.lag.max.messages" val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms" val ReplicaSocketReceiveBufferBytesProp = "replica.socket.receive.buffer.bytes" val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes" @@ -301,8 +299,8 @@ object KafkaConfig { val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels" val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" val DefaultReplicationFactorDoc = "default replication factors for automatically created topics" - val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests during this time, the leader will remove the follower from isr" - val ReplicaLagMaxMessagesDoc = "If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr" + val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasnt consumed upto the leaders log end offset during this time," + + " the leader will remove the follower from isr" val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests" val ReplicaFetchMaxBytesDoc = "The number of byes of messages to attempt to fetch" @@ -424,7 +422,6 @@ object KafkaConfig { .define(ControllerMessageQueueSizeProp, INT, Defaults.ControllerMessageQueueSize, MEDIUM, ControllerMessageQueueSizeDoc) .define(DefaultReplicationFactorProp, INT, Defaults.DefaultReplicationFactor, MEDIUM, DefaultReplicationFactorDoc) .define(ReplicaLagTimeMaxMsProp, LONG, Defaults.ReplicaLagTimeMaxMs, HIGH, ReplicaLagTimeMaxMsDoc) - .define(ReplicaLagMaxMessagesProp, LONG, Defaults.ReplicaLagMaxMessages, HIGH, ReplicaLagMaxMessagesDoc) .define(ReplicaSocketTimeoutMsProp, INT, Defaults.ReplicaSocketTimeoutMs, HIGH, ReplicaSocketTimeoutMsDoc) .define(ReplicaSocketReceiveBufferBytesProp, INT, Defaults.ReplicaSocketReceiveBufferBytes, HIGH, ReplicaSocketReceiveBufferBytesDoc) .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, HIGH, ReplicaFetchMaxBytesDoc) @@ -542,7 +539,6 @@ object KafkaConfig { controllerMessageQueueSize = parsed.get(ControllerMessageQueueSizeProp).asInstanceOf[Int], defaultReplicationFactor = parsed.get(DefaultReplicationFactorProp).asInstanceOf[Int], replicaLagTimeMaxMs = parsed.get(ReplicaLagTimeMaxMsProp).asInstanceOf[Long], - replicaLagMaxMessages = parsed.get(ReplicaLagMaxMessagesProp).asInstanceOf[Long], replicaSocketTimeoutMs = parsed.get(ReplicaSocketTimeoutMsProp).asInstanceOf[Int], replicaSocketReceiveBufferBytes = parsed.get(ReplicaSocketReceiveBufferBytesProp).asInstanceOf[Int], replicaFetchMaxBytes = parsed.get(ReplicaFetchMaxBytesProp).asInstanceOf[Int], @@ -682,7 +678,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val controllerMessageQueueSize: Int = Defaults.ControllerMessageQueueSize, val defaultReplicationFactor: Int = Defaults.DefaultReplicationFactor, val replicaLagTimeMaxMs: Long = Defaults.ReplicaLagTimeMaxMs, - val replicaLagMaxMessages: Long = Defaults.ReplicaLagMaxMessages, val replicaSocketTimeoutMs: Int = Defaults.ReplicaSocketTimeoutMs, val replicaSocketReceiveBufferBytes: Int = Defaults.ReplicaSocketReceiveBufferBytes, val replicaFetchMaxBytes: Int = Defaults.ReplicaFetchMaxBytes, @@ -850,7 +845,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(ControllerMessageQueueSizeProp, controllerMessageQueueSize.toString) props.put(DefaultReplicationFactorProp, defaultReplicationFactor.toString) props.put(ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) - props.put(ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString) props.put(ReplicaSocketTimeoutMsProp, replicaSocketTimeoutMs.toString) props.put(ReplicaSocketReceiveBufferBytesProp, replicaSocketReceiveBufferBytes.toString) props.put(ReplicaFetchMaxBytesProp, replicaFetchMaxBytes.toString) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index c527482..f9d1619 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -52,12 +52,26 @@ case class LogAppendResult(info: LogAppendInfo, error: Option[Throwable] = None) /* * Result metadata of a log read operation on the log + * @param info @FetchDataInfo returned by the @Log read + * @param hw high watermark of the local replica + * @param readSize amount of data that was read from the log i.e. size of the fetch + * @param initialLogEndOffset @LogOffsetMetadata before the actual read to the log + * @param error Exception if error encountered while reading from the log */ -case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, error: Option[Throwable] = None) { +case class LogReadResult(info: FetchDataInfo, hw: Long, readSize: Int, initialLogEndOffset : LogOffsetMetadata, error: Option[Throwable] = None) { + def errorCode = error match { case None => ErrorMapping.NoError case Some(e) => ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) } + + override def toString = { + "Fetch Data: [%s], HW: [%d], readSize: [%d], initialLogEndOffset: [%s], error: [%s]".format(info, hw, readSize, initialLogEndOffset, error) + } +} + +object LogReadResult { + val UnknownLogReadResult = LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, -1, LogOffsetMetadata.UnknownOffsetMetadata) } object ReplicaManager { @@ -406,7 +420,7 @@ class ReplicaManager(val config: KafkaConfig, // if the fetch comes from the follower, // update its corresponding log end offset if(Request.isValidBrokerId(replicaId)) - updateFollowerLEOs(replicaId, logReadResults.mapValues(_.info.fetchOffset)) + updateFollowerLEOs(replicaId, logReadResults) // check if this fetch request can be satisfied right away val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum @@ -466,7 +480,12 @@ class ReplicaManager(val config: KafkaConfig, else None - // read on log + /* Read the LogOffsetMetadata prior to performing the read from the log. We use the LogOffsetMetadata to determine if a particular replica is in-sync or not. + * Using the log end offset after performing the read can lead to a race condition where data gets appended to the log immediately after the replica has consumed from it + * This can cause a replica to always be out of sync. + */ + + val initialLogEndOffset = localReplica.logEndOffset val logReadInfo = localReplica.log match { case Some(log) => log.read(offset, fetchSize, maxOffsetOpt) @@ -475,23 +494,23 @@ class ReplicaManager(val config: KafkaConfig, FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty) } - LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, None) + LogReadResult(logReadInfo, localReplica.highWatermark.messageOffset, fetchSize, initialLogEndOffset, None) } catch { // NOTE: Failed fetch requests metric is not incremented for known exceptions since it // is supposed to indicate un-expected failure of a broker in handling a fetch request case utpe: UnknownTopicOrPartitionException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(utpe)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(utpe)) case nle: NotLeaderForPartitionException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(nle)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(nle)) case rnae: ReplicaNotAvailableException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(rnae)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(rnae)) case oor : OffsetOutOfRangeException => - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(oor)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(oor)) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic, partition, offset)) - LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(e)) + LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, LogOffsetMetadata.UnknownOffsetMetadata, Some(e)) } (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo) } @@ -748,15 +767,15 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages)) + allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs)) } - private def updateFollowerLEOs(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) { - debug("Recording follower broker %d log end offsets: %s ".format(replicaId, offsets)) - offsets.foreach { case (topicAndPartition, offset) => + private def updateFollowerLEOs(replicaId: Int, readResults: Map[TopicAndPartition, LogReadResult]) { + debug("Recording follower broker %d log read results: %s ".format(replicaId, readResults)) + readResults.foreach { case (topicAndPartition, readResult) => getPartition(topicAndPartition.topic, topicAndPartition.partition) match { case Some(partition) => - partition.updateReplicaLEO(replicaId, offset) + partition.updateReplicaLEO(replicaId, readResult) // for producer requests with ack > 1, we need to check // if they can be unblocked after some follower's log end offsets have moved diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 9215235..e195dbe 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -27,18 +27,18 @@ import kafka.log.Log import org.junit.Assert._ import kafka.utils._ import java.util.concurrent.atomic.AtomicBoolean +import kafka.message.MessageSet + class IsrExpirationTest extends JUnit3Suite { var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() val replicaLagTimeMaxMs = 100L val replicaFetchWaitMaxMs = 100 - val replicaLagMaxMessages = 10L val overridingProps = new Properties() overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) - overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString) val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps)) val topic = "foo" @@ -56,6 +56,9 @@ class IsrExpirationTest extends JUnit3Suite { super.tearDown() } + /* + * Test the case where a follower is caught up but stops making requests to the leader. Once beyond the configured time limit, it should fall out of ISR + */ def testIsrExpirationForStuckFollowers() { val log = getLogWithLogEndOffset(15L, 2) // set logEndOffset for leader to 15L @@ -65,8 +68,9 @@ class IsrExpirationTest extends JUnit3Suite { val leaderReplica = partition0.getReplica(configs.head.brokerId).get // let the follower catch up to 10 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(10L)) - var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) + (partition0.assignedReplicas() - leaderReplica).foreach( + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) // let some time pass @@ -74,26 +78,70 @@ class IsrExpirationTest extends JUnit3Suite { // now follower (broker id 1) has caught up to only 10, while the leader is at 15 AND the follower hasn't // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck - partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) + assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) + EasyMock.verify(log) + } + + /* + * Test the case where a follower never makes a fetch request. It should fall out of ISR because it will be declared stuck + */ + def testIsrExpirationIfNoFetchRequestMade() { + val log = getLogWithLogEndOffset(15L, 1) // set logEndOffset for leader to 15L + + // create one partition and all replicas + val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log) + assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) + val leaderReplica = partition0.getReplica(configs.head.brokerId).get + + // Let enough time pass for the replica to be considered stuck + time.sleep(150) + + val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) EasyMock.verify(log) } + /* + * Test the case where a follower continually makes fetch requests but is unable to catch up. It should fall out of the ISR + * However, any time it makes a request to the LogEndOffset it should be back in the ISR + */ def testIsrExpirationForSlowFollowers() { // create leader replica - val log = getLogWithLogEndOffset(15L, 1) + val log = getLogWithLogEndOffset(15L, 4) // add one partition val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, configs.head, log) assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId)) val leaderReplica = partition0.getReplica(configs.head.brokerId).get - // set remote replicas leo to something low, like 4 - (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(4L)) - // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than - // replicaMaxLagBytes, the follower is out of sync. - val partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages) + // Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms + (partition0.assignedReplicas() - leaderReplica).foreach( + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + + // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log. + // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck + var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) + assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) + + time.sleep(75) + + (partition0.assignedReplicas() - leaderReplica).foreach( + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) + assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) + + time.sleep(75) + + // The replicas will no longer be in ISR + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId)) + // Now actually make a fetch to the end of the log. The replicas should be back in ISR + (partition0.assignedReplicas() - leaderReplica).foreach( + r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L), MessageSet.Empty), -1L, -1, new LogOffsetMetadata(15L)))) + partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs) + assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId)) + EasyMock.verify(log) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index c124c8d..69ce345 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -114,7 +114,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { Assert.assertEquals(expectedConfig.controllerMessageQueueSize, actualConfig.controllerMessageQueueSize) Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor) Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs) - Assert.assertEquals(expectedConfig.replicaLagMaxMessages, actualConfig.replicaLagMaxMessages) Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs) Assert.assertEquals(expectedConfig.replicaSocketReceiveBufferBytes, actualConfig.replicaSocketReceiveBufferBytes) Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes) @@ -310,7 +309,6 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.ControllerMessageQueueSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ReplicaLagMaxMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 92d6b2c..de255e3 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -40,7 +40,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val overridingProps = new Properties() overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) - overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString) overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index efb4573..07cfc4c 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -21,7 +21,7 @@ import kafka.utils._ import kafka.cluster.Replica import kafka.common.TopicAndPartition import kafka.log.Log -import kafka.message.{ByteBufferMessageSet, Message} +import kafka.message.{MessageSet, ByteBufferMessageSet, Message} import scala.Some import java.util.{Properties, Collections} @@ -42,7 +42,6 @@ class SimpleFetchTest extends JUnit3Suite { val overridingProps = new Properties() overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString) overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString) - overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString) val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps)) @@ -78,6 +77,7 @@ class SimpleFetchTest extends JUnit3Suite { // create the log which takes read with either HW max offset or none max offset val log = EasyMock.createMock(classOf[Log]) EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes() + EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes() EasyMock.expect(log.read(0, fetchSize, Some(partitionHW))).andReturn( new FetchDataInfo( new LogOffsetMetadata(0L, 0L, 0), @@ -108,7 +108,9 @@ class SimpleFetchTest extends JUnit3Suite { // create the follower replica with defined log end offset val followerReplica= new Replica(configs(1).brokerId, partition, time) - followerReplica.logEndOffset = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) + val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) + followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MessageSet.Empty), -1L, -1, leo)) + //followerReplica.logEndOffset = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt) // add both of them to ISR val allReplicas = List(leaderReplica, followerReplica) @@ -140,6 +142,7 @@ class SimpleFetchTest extends JUnit3Suite { def testReadFromLog() { val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count(); val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count(); + LogReadResult.UnknownLogReadResult.toString assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW, replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) -- 1.7.12.4