diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index fb2a230..d41a705 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -132,7 +132,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV }) } - def isFromFollower = replicaId != Request.OrdinaryConsumerId && replicaId != Request.DebuggingConsumerId + def isFromFollower = Request.isReplicaIdFromFollower(replicaId) def isFromOrdinaryConsumer = replicaId == Request.OrdinaryConsumerId diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index b62330b..44b7677 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -24,6 +24,8 @@ import kafka.utils.Logging object Request { val OrdinaryConsumerId: Int = -1 val DebuggingConsumerId: Int = -2 + + def isReplicaIdFromFollower(replicaId: Int): Boolean = (replicaId >= 0) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 80a70f1..c9f92a2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -392,10 +392,10 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) val maxOffsetOpt = - if (fromReplicaId == Request.OrdinaryConsumerId) - Some(localReplica.highWatermark) - else + if (Request.isReplicaIdFromFollower(fromReplicaId)) None + else + Some(localReplica.highWatermark) val messages = localReplica.log match { case Some(log) => log.read(offset, maxSize, maxOffsetOpt) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 54f6e16..874eadc 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -314,11 +314,11 @@ class ReplicaManager(val config: KafkaConfig, /* * Make the current broker to become follower for a given set of partitions by: * - * 1. Stop fetchers for these partitions - * 2. Truncate the log and checkpoint offsets for these partitions. - * 3. If the broker is not shutting down, add the fetcher to the new leaders - * 4. Update the partition metadata in cache - * 5. Remove these partitions from the leader partitions set + * 1. Remove these partitions from the leader partitions set. + * 2. Mark the replicas as followers so that no more data can be added from the producer clients. + * 3. Stop fetchers for these partitions so that no more data can be added by the replica fetcher threads. + * 4. Truncate the log and checkpoint offsets for these partitions. + * 5. If the broker is not shutting down, add the fetcher to the new leaders. * * The ordering of doing these steps make sure that the replicas in transition will not * take any more messages before checkpointing offsets so that all messages before the checkpoint @@ -339,6 +339,13 @@ class ReplicaManager(val config: KafkaConfig, responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) try { + leaderPartitionsLock synchronized { + leaderPartitions --= partitionState.keySet + } + + partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => + partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} + replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) @@ -372,13 +379,6 @@ class ReplicaManager(val config: KafkaConfig, "controller %d epoch %d since it is shutting down") .format(localBrokerId, correlationId, controllerId, epoch)) } - - partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => - partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} - - leaderPartitionsLock synchronized { - leaderPartitions --= partitionState.keySet - } } catch { case e: Throwable => val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index f1f139e..5e8c56d 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -269,34 +269,40 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa while (isMessageInAllReplicas) { var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None for ( (replicaId, messageIterator) <- messageIteratorMap) { - if (messageIterator.hasNext) { - val messageAndOffset = messageIterator.next() - - // only verify up to the high watermark - if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw) - isMessageInAllReplicas = false - else { - messageInfoFromFirstReplicaOpt match { - case None => - messageInfoFromFirstReplicaOpt = Some( - MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) - case Some(messageInfoFromFirstReplica) => - if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { - println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition - + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " - + messageInfoFromFirstReplica.offset + " doesn't match replica " - + replicaId + "'s offset " + messageAndOffset.offset) - System.exit(1) - } - if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum) - println(ReplicaVerificationTool.getCurrentTimeString + ": partition " - + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset + "; replica " - + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum - + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum) + try { + if (messageIterator.hasNext) { + val messageAndOffset = messageIterator.next() + + // only verify up to the high watermark + if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw) + isMessageInAllReplicas = false + else { + messageInfoFromFirstReplicaOpt match { + case None => + messageInfoFromFirstReplicaOpt = Some( + MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) + case Some(messageInfoFromFirstReplica) => + if (messageInfoFromFirstReplica.offset != messageAndOffset.offset) { + println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition + + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " + + messageInfoFromFirstReplica.offset + " doesn't match replica " + + replicaId + "'s offset " + messageAndOffset.offset) + System.exit(1) + } + if (messageInfoFromFirstReplica.checksum != messageAndOffset.message.checksum) + println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset + "; replica " + + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum + + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum) + } } - } - } else - isMessageInAllReplicas = false + } else + isMessageInAllReplicas = false + } catch { + case t => + throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." + .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) + } } if (isMessageInAllReplicas) { val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset