diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 02ccc17..13f48ba 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -310,19 +310,15 @@ class Partition(val topic: String, def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = { /** * there are two cases that need to be handled here - - * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated - * for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR + * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms, + * the follower is stuck and should be removed from the ISR * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the * follower is not catching up and should be removed from the ISR **/ val leaderLogEndOffset = leaderReplica.logEndOffset val candidateReplicas = inSyncReplicas - leaderReplica // Case 1 above - val possiblyStuckReplicas = candidateReplicas.filter(r => r.logEndOffset < leaderLogEndOffset) - if(possiblyStuckReplicas.size > 0) - debug("Possibly stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, - possiblyStuckReplicas.map(_.brokerId).mkString(","))) - val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs)) + val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs) if(stuckReplicas.size > 0) debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index beca460..4c121e4 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -238,8 +238,9 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) for (p <- partitionStateInfos) { val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" - stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " + - "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, correlationId, broker, + stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, + p._2.leaderIsrAndControllerEpoch, correlationId, broker, p._1._1, p._1._2)) } sendRequest(broker, leaderAndIsrRequest, null) @@ -250,8 +251,9 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq val partitionStateInfos = m._2.toMap val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId, partitionStateInfos, controllerContext.liveOrShuttingDownBrokers) - partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request with " + - "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, correlationId, broker, p._1))) + partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + + "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, + correlationId, broker, p._1))) sendRequest(broker, updateMetadataRequest, null) } updateMetadataRequestMap.clear() diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index c52225a..ad4ee53 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -171,24 +171,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(currLeaderIsrAndControllerEpoch) => - if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId)) - controller.removeReplicaFromIsr(topic, partition, replicaId) match { - case Some(updatedLeaderIsrAndControllerEpoch) => - // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), - topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) - replicaState.put((topic, partition, replicaId), OfflineReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) - false - case None => - true - } - else { - replicaState.put((topic, partition, replicaId), OfflineReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) - false + controller.removeReplicaFromIsr(topic, partition, replicaId) match { + case Some(updatedLeaderIsrAndControllerEpoch) => + // send the shrunk ISR state change request only to the leader + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), + topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) + replicaState.put((topic, partition, replicaId), OfflineReplica) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + false + case None => + true } case None => true diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 8f9db10..a7e5b73 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -198,8 +198,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the number of byes of messages to attempt to fetch */ val replicaFetchMaxBytes = props.getIntInRange("replica.fetch.max.bytes", ConsumerConfig.FetchSize, (messageMaxBytes, Int.MaxValue)) - /* max wait time for each fetcher request issued by follower replicas*/ + /* max wait time for each fetcher request issued by follower replicas. This value should always be less than the + * replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics */ val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500) + require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be at least replica.lag.time.max.ms" + + " to prevent frequent changes in ISR") /* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */ val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f9c7c29..b0a3962 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -204,17 +204,19 @@ class ReplicaManager(val config: KafkaConfig, } def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { - leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partition), stateInfo) => - stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d received from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, - leaderAndISRRequest.controllerEpoch, topic, partition))} - info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest)) - + leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => + stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" + .format(localBrokerId, stateInfo.leaderIsrAndControllerEpoch, leaderAndISRRequest.correlationId, + leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition)) + } replicaStateChangeLock synchronized { val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { - stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d. Latest known controller epoch is %d" - .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch)) + leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => + stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." + + " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId, + leaderAndISRRequest.controllerEpoch, controllerEpoch)) + } (responseMap, ErrorMapping.StaleControllerEpochCode) } else { val controllerId = leaderAndISRRequest.controllerId @@ -234,7 +236,7 @@ class ReplicaManager(val config: KafkaConfig, // Otherwise record the error code in response stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") - .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch, + .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } @@ -247,7 +249,6 @@ class ReplicaManager(val config: KafkaConfig, if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap) if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) - info("Handled leader and isr request %s".format(leaderAndISRRequest)) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions if (!hwThreadInitialized) { @@ -274,10 +275,10 @@ class ReplicaManager(val config: KafkaConfig, private def makeLeaders(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = { - stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-leader transition for partitions %s") - .format(localBrokerId, correlationId, controllerId, epoch, - partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) + partitionState.foreach(state => + stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "starting the become-leader transition for partition %s") + .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) @@ -285,9 +286,11 @@ class ReplicaManager(val config: KafkaConfig, try { // First stop fetchers for all the partitions replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" - .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) - + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " + + "%d epoch %d with correlation id %d for partition %s") + .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) + } // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => partition.makeLeader(controllerId, partitionStateInfo, correlationId)} @@ -298,17 +301,21 @@ class ReplicaManager(val config: KafkaConfig, } } catch { case e: Throwable => - val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + - "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) - stateChangeLogger.error(errorMsg, e) + partitionState.foreach { state => + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" + + "epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, + TopicAndPartition(state._1.topic, state._1.partitionId)) + stateChangeLogger.error(errorMsg, e) + } // Re-throw the exception for it to be caught in KafkaApis throw e } - stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "for the become-leader transition for partitions %s") - .format(localBrokerId, correlationId, controllerId, epoch, - partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-leader transition for partition %s") + .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + } } /* @@ -329,10 +336,10 @@ class ReplicaManager(val config: KafkaConfig, */ private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { - stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-follower transition for partitions %s") - .format(localBrokerId, correlationId, controllerId, epoch, - partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) + partitionState.foreach(state => + stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "starting the become-follower transition for partition %s") + .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) @@ -346,15 +353,20 @@ class ReplicaManager(val config: KafkaConfig, partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" - .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + + "%d epoch %d with correlation id %d for partition %s") + .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) + } logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark }) - stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries for partitions %s as per becoming-follower request from controller %d epoch %d" - .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) - + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + + "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, + TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch)) + } if (!isShuttingDown.get()) { val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() partitionState.foreach { @@ -366,30 +378,34 @@ class ReplicaManager(val config: KafkaConfig, BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) case None => stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " + - "controller %d epoch %d for topic-partition %s since the designated leader %d " + - "cannot be found in live or shutting down brokers %s") - .format(localBrokerId, correlationId, controllerId, epoch, partition, leader, leaders)) + "controller %d epoch %d for partition %s since the designated leader %d " + + "cannot be found in live or shutting down brokers %s").format(localBrokerId, + correlationId, controllerId, epoch, partition, leader, leaders.mkString(","))) } } replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) } else { - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + - "controller %d epoch %d since it is shutting down") - .format(localBrokerId, correlationId, controllerId, epoch)) + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, + controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + } } } catch { case e: Throwable => - val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) stateChangeLogger.error(errorMsg, e) // Re-throw the exception for it to be caught in KafkaApis throw e } - stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "for the become-follower transition for partitions %s") - .format(localBrokerId, correlationId, controllerId, epoch, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) + partitionState.foreach { state => + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-follower transition for partition %s") + .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) + } } private def maybeShrinkIsr(): Unit = { diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index c30069e..958f53b 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -153,6 +153,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) + // in sync replicas should not have any replica that is not in the new assigned replicas + val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet + assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas), + phantomInSyncReplicas.size == 0) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) servers.foreach(_.shutdown()) } @@ -179,6 +184,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) + // in sync replicas should not have any replica that is not in the new assigned replicas + val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet + assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas), + phantomInSyncReplicas.size == 0) // leader should be 2 servers.foreach(_.shutdown()) } @@ -205,7 +215,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 2000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) - // leader should be 2 + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) + // in sync replicas should not have any replica that is not in the new assigned replicas + val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet + assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas), + phantomInSyncReplicas.size == 0) servers.foreach(_.shutdown()) } @@ -222,7 +236,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient) assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition)) - // leader should be 2 servers.foreach(_.shutdown()) } @@ -244,6 +257,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) + val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) + // in sync replicas should not have any replica that is not in the new assigned replicas + val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet + assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas), + phantomInSyncReplicas.size == 0) servers.foreach(_.shutdown()) } @@ -326,7 +344,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { servers.foreach(_.shutdown()) } } - + /** * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic * then changes the config and checks that the new values take effect. @@ -336,14 +354,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partitions = 3 val topic = "my-topic" val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0))) - + def makeConfig(messageSize: Int, retentionMs: Long) = { var props = new Properties() props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString) props.setProperty(LogConfig.RententionMsProp, retentionMs.toString) props } - + def checkConfig(messageSize: Int, retentionMs: Long) { TestUtils.retry(10000) { for(part <- 0 until partitions) { @@ -354,14 +372,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } } } - + try { // create a topic with a few config overrides and check that they are applied val maxMessageSize = 1024 val retentionMs = 1000*1000 AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs)) checkConfig(maxMessageSize, retentionMs) - + // now double the config values for the topic and check that it is applied AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) checkConfig(2*maxMessageSize, 2 * retentionMs) diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 7026432..2cd3a3f 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -31,6 +31,7 @@ class IsrExpirationTest extends JUnit3Suite { var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 100L + override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L }) val topic = "foo" diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 34e39e7..17a99f1 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -32,6 +32,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 5000L override val replicaLagMaxMessages = 10L + override val replicaFetchWaitMaxMs = 1000 override val replicaFetchMinBytes = 20 }) val topic = "new-topic" diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index bab436d..03e6266 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -34,6 +34,7 @@ class SimpleFetchTest extends JUnit3Suite { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 100L + override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L }) val topic = "foo"