diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index d25aae3..3c08dee 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -51,7 +51,7 @@ object TopicCommand { else if(opts.options.has(opts.deleteOpt)) deleteTopic(zkClient, opts) else if(opts.options.has(opts.listOpt)) - listTopics(zkClient, opts) + listTopics(zkClient) else if(opts.options.has(opts.describeOpt)) describeTopic(zkClient, opts) @@ -109,22 +109,9 @@ object TopicCommand { } } - def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { - if(opts.options.has(opts.topicsWithOverridesOpt)) { - ZkUtils.getAllTopics(zkClient).sorted.foreach { topic => - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) - if(configs.size() != 0) { - val replicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) - val numPartitions = replicaAssignment.size - val replicationFactor = replicaAssignment.head._2.size - println("\nTopic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s".format(topic, numPartitions, - replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) - } - } - } else { - for(topic <- ZkUtils.getAllTopics(zkClient).sorted) - println(topic) - } + def listTopics(zkClient: ZkClient) { + for(topic <- ZkUtils.getAllTopics(zkClient).sorted) + println(topic) } def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { @@ -242,8 +229,6 @@ object TopicCommand { "if set when describing topics, only show under replicated partitions") val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions", "if set when describing topics, only show partitions whose leader is not available") - val topicsWithOverridesOpt = parser.accepts("topics-with-overrides", - "if set when listing topics, only show topics that have overridden configs") val options = parser.parse(args : _*) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 13f48ba..02ccc17 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -310,15 +310,19 @@ class Partition(val topic: String, def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = { /** * there are two cases that need to be handled here - - * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms, - * the follower is stuck and should be removed from the ISR + * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated + * for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the * follower is not catching up and should be removed from the ISR **/ val leaderLogEndOffset = leaderReplica.logEndOffset val candidateReplicas = inSyncReplicas - leaderReplica // Case 1 above - val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs) + val possiblyStuckReplicas = candidateReplicas.filter(r => r.logEndOffset < leaderLogEndOffset) + if(possiblyStuckReplicas.size > 0) + debug("Possibly stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, + possiblyStuckReplicas.map(_.brokerId).mkString(","))) + val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs)) if(stuckReplicas.size > 0) debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(","))) // Case 2 above diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 4c121e4..beca460 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -238,9 +238,8 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) for (p <- partitionStateInfos) { val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" - stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " + - "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, - p._2.leaderIsrAndControllerEpoch, correlationId, broker, + stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " + + "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, correlationId, broker, p._1._1, p._1._2)) } sendRequest(broker, leaderAndIsrRequest, null) @@ -251,9 +250,8 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq val partitionStateInfos = m._2.toMap val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId, partitionStateInfos, controllerContext.liveOrShuttingDownBrokers) - partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " + - "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, - correlationId, broker, p._1))) + partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request with " + + "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, correlationId, broker, p._1))) sendRequest(broker, updateMetadataRequest, null) } updateMetadataRequestMap.clear() diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index fd92c65..3beaf75 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -812,7 +812,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString())) updateSucceeded } else { - warn("Cannot remove replica %d from ISR of partition %s since it is not in the ISR. Leader = %d ; ISR = %s" + warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s" .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr)) finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)) controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index ad4ee53..c52225a 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -171,17 +171,24 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(currLeaderIsrAndControllerEpoch) => - controller.removeReplicaFromIsr(topic, partition, replicaId) match { - case Some(updatedLeaderIsrAndControllerEpoch) => - // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), - topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) - replicaState.put((topic, partition, replicaId), OfflineReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) - false - case None => - true + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId)) + controller.removeReplicaFromIsr(topic, partition, replicaId) match { + case Some(updatedLeaderIsrAndControllerEpoch) => + // send the shrunk ISR state change request only to the leader + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), + topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) + replicaState.put((topic, partition, replicaId), OfflineReplica) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + false + case None => + true + } + else { + replicaState.put((topic, partition, replicaId), OfflineReplica) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + false } case None => true diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 29abc46..c9f92a2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -519,11 +519,8 @@ class KafkaApis(val requestChannel: RequestChannel, uniqueTopics = { if(metadataRequest.topics.size > 0) metadataRequest.topics.toSet - else { - partitionMetadataLock synchronized { - leaderCache.keySet.map(_.topic) - } - } + else + leaderCache.keySet.map(_.topic) } val topicMetadataList = partitionMetadataLock synchronized { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a7e5b73..8f9db10 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -198,11 +198,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the number of byes of messages to attempt to fetch */ val replicaFetchMaxBytes = props.getIntInRange("replica.fetch.max.bytes", ConsumerConfig.FetchSize, (messageMaxBytes, Int.MaxValue)) - /* max wait time for each fetcher request issued by follower replicas. This value should always be less than the - * replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics */ + /* max wait time for each fetcher request issued by follower replicas*/ val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500) - require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be at least replica.lag.time.max.ms" + - " to prevent frequent changes in ISR") /* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */ val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b0a3962..f9c7c29 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -204,19 +204,17 @@ class ReplicaManager(val config: KafkaConfig, } def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { - leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => - stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" - .format(localBrokerId, stateInfo.leaderIsrAndControllerEpoch, leaderAndISRRequest.correlationId, - leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition)) - } + leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partition), stateInfo) => + stateChangeLogger.trace("Broker %d handling LeaderAndIsr request correlation id %d received from controller %d epoch %d for partition [%s,%d]" + .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, + leaderAndISRRequest.controllerEpoch, topic, partition))} + info("Handling LeaderAndIsr request %s".format(leaderAndISRRequest)) + replicaStateChangeLock synchronized { val responseMap = new collection.mutable.HashMap[(String, Int), Short] if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { - leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => - stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." + - " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId, - leaderAndISRRequest.controllerEpoch, controllerEpoch)) - } + stateChangeLogger.warn("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d. Latest known controller epoch is %d" + .format(localBrokerId, leaderAndISRRequest.controllerEpoch, leaderAndISRRequest.correlationId, controllerEpoch)) (responseMap, ErrorMapping.StaleControllerEpochCode) } else { val controllerId = leaderAndISRRequest.controllerId @@ -236,7 +234,7 @@ class ReplicaManager(val config: KafkaConfig, // Otherwise record the error code in response stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") - .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch, + .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } @@ -249,6 +247,7 @@ class ReplicaManager(val config: KafkaConfig, if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap) if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) + info("Handled leader and isr request %s".format(leaderAndISRRequest)) // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions if (!hwThreadInitialized) { @@ -275,10 +274,10 @@ class ReplicaManager(val config: KafkaConfig, private def makeLeaders(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = { - partitionState.foreach(state => - stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-leader transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) + stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "starting the become-leader transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, + partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) @@ -286,11 +285,9 @@ class ReplicaManager(val config: KafkaConfig, try { // First stop fetchers for all the partitions replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " + - "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) + // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => partition.makeLeader(controllerId, partitionStateInfo, correlationId)} @@ -301,21 +298,17 @@ class ReplicaManager(val config: KafkaConfig, } } catch { case e: Throwable => - partitionState.foreach { state => - val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" + - "epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, - TopicAndPartition(state._1.topic, state._1.partitionId)) - stateChangeLogger.error(errorMsg, e) - } + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) + stateChangeLogger.error(errorMsg, e) // Re-throw the exception for it to be caught in KafkaApis throw e } - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "for the become-leader transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-leader transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, + partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) } /* @@ -336,10 +329,10 @@ class ReplicaManager(val config: KafkaConfig, */ private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { - partitionState.foreach(state => - stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "starting the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))) + stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "starting the become-follower transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, + partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) @@ -353,20 +346,15 @@ class ReplicaManager(val config: KafkaConfig, partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + - "%d epoch %d with correlation id %d for partition %s") - .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark }) - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + - "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, - TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch)) - } + stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries for partitions %s as per becoming-follower request from controller %d epoch %d" + .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) + if (!isShuttingDown.get()) { val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]() partitionState.foreach { @@ -378,34 +366,30 @@ class ReplicaManager(val config: KafkaConfig, BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset)) case None => stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " + - "controller %d epoch %d for partition %s since the designated leader %d " + - "cannot be found in live or shutting down brokers %s").format(localBrokerId, - correlationId, controllerId, epoch, partition, leader, leaders.mkString(","))) + "controller %d epoch %d for topic-partition %s since the designated leader %d " + + "cannot be found in live or shutting down brokers %s") + .format(localBrokerId, correlationId, controllerId, epoch, partition, leader, leaders)) } } replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets) } else { - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, - controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d since it is shutting down") + .format(localBrokerId, correlationId, controllerId, epoch)) } } catch { case e: Throwable => - val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " + + val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) stateChangeLogger.error(errorMsg, e) // Re-throw the exception for it to be caught in KafkaApis throw e } - partitionState.foreach { state => - stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + - "for the become-follower transition for partition %s") - .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) - } + stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + + "for the become-follower transition for partitions %s") + .format(localBrokerId, correlationId, controllerId, epoch, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) } private def maybeShrinkIsr(): Unit = { diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 86f88f5..c30069e 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -153,9 +153,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted; }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) - // in sync replicas should not have any replica that is not in the new assigned replicas - checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) servers.foreach(_.shutdown()) } @@ -182,7 +179,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) - checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) + // leader should be 2 servers.foreach(_.shutdown()) } @@ -208,7 +205,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 2000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) - checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) + // leader should be 2 servers.foreach(_.shutdown()) } @@ -225,6 +222,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient) assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition)) + // leader should be 2 servers.foreach(_.shutdown()) } @@ -246,7 +244,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) - checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) servers.foreach(_.shutdown()) } @@ -329,7 +326,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { servers.foreach(_.shutdown()) } } - + /** * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic * then changes the config and checks that the new values take effect. @@ -339,14 +336,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partitions = 3 val topic = "my-topic" val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0))) - + def makeConfig(messageSize: Int, retentionMs: Long) = { var props = new Properties() props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString) props.setProperty(LogConfig.RententionMsProp, retentionMs.toString) props } - + def checkConfig(messageSize: Int, retentionMs: Long) { TestUtils.retry(10000) { for(part <- 0 until partitions) { @@ -357,14 +354,14 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } } } - + try { // create a topic with a few config overrides and check that they are applied val maxMessageSize = 1024 val retentionMs = 1000*1000 AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs)) checkConfig(maxMessageSize, retentionMs) - + // now double the config values for the topic and check that it is applied AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) checkConfig(2*maxMessageSize, 2 * retentionMs) @@ -374,14 +371,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } } - private def checkForPhantomInSyncReplicas(topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) { - val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned) - // in sync replicas should not have any replica that is not in the new assigned replicas - val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet - assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas), - phantomInSyncReplicas.size == 0) - } - private def checkIfReassignPartitionPathExists(): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 2cd3a3f..7026432 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -31,7 +31,6 @@ class IsrExpirationTest extends JUnit3Suite { var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]() val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 100L - override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L }) val topic = "foo" diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 17a99f1..34e39e7 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -32,7 +32,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 5000L override val replicaLagMaxMessages = 10L - override val replicaFetchWaitMaxMs = 1000 override val replicaFetchMinBytes = 20 }) val topic = "new-topic" diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 03e6266..bab436d 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -34,7 +34,6 @@ class SimpleFetchTest extends JUnit3Suite { val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { override val replicaLagTimeMaxMs = 100L - override val replicaFetchWaitMaxMs = 100 override val replicaLagMaxMessages = 10L }) val topic = "foo" diff --git a/kafka-patch-review.py b/kafka-patch-review.py index 7fa6cb5..e243455 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -37,6 +37,21 @@ def main(): st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d_%H:%M:%S') patch_file=tempfile.gettempdir() + "/" + opt.jira + '_' + st + '.patch' + # first check if rebase is needed + git_branch_hash="git rev-parse " + opt.branch + p=os.popen(git_branch_hash) + branch_now=p.read() + p.close() + + git_common_ancestor="git merge-base " + opt.branch + " HEAD" + p=os.popen(git_branch_hash) + branch_then=p.read() + p.close() + + if branch_now != branch_then: + print 'ERROR: Your current working branch is from an older version of ' + opt.branch + '. Please rebase first by using git pull --rebase' + sys.exit(1) + git_configure_reviewboard="git config reviewboard.url https://reviews.apache.org" print "Configuring reviewboard url to https://reviews.apache.org" p=os.popen(git_configure_reviewboard)