diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b31b432..4b0ca35 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -111,10 +111,16 @@ class ReplicaFetcherThread(name:String, * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching. */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) - replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) - warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" - .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) - leaderStartOffset + if (replica.logEndOffset.messageOffset < leaderStartOffset) { + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) + warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) + leaderStartOffset + } else { + warn("Replica %d for partition %s reset its fetch offset to %d" + .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset)) + replica.logEndOffset.messageOffset + } } }