diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 6360a98..a586675 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -33,6 +33,13 @@ object OffsetRequest { val LatestTime = -1L val EarliestTime = -2L + def getEarliestOrLatestTime(timeString: String): Long = { + timeString match { + case OffsetRequest.SmallestTimeString => OffsetRequest.EarliestTime + case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime + } + } + def readFrom(buffer: ByteBuffer): OffsetRequest = { val versionId = buffer.getShort val correlationId = buffer.getInt diff --git a/core/src/main/scala/kafka/common/Config.scala b/core/src/main/scala/kafka/common/Config.scala index d24fb0d..e80724a 100644 --- a/core/src/main/scala/kafka/common/Config.scala +++ b/core/src/main/scala/kafka/common/Config.scala @@ -19,6 +19,7 @@ package kafka.common import util.matching.Regex import kafka.utils.Logging +import kafka.api.OffsetRequest trait Config extends Logging { @@ -33,6 +34,15 @@ trait Config extends Logging { case None => throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") } } + + def validateAutoOffsetReset(prop: String, autoOffsetReset: String) { + autoOffsetReset match { + case OffsetRequest.SmallestTimeString => + case OffsetRequest.LargestTimeString => + case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " in " + prop + + "; valid values are " + OffsetRequest.SmallestTimeString + " or " + OffsetRequest.LargestTimeString) + } + } } diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 45db07b..4066457 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -48,7 +48,7 @@ object ConsumerConfig extends Config { def validate(config: ConsumerConfig) { validateClientId(config.clientId) validateGroupId(config.groupId) - validateAutoOffsetReset(config.autoOffsetReset) + validateAutoOffsetReset("auto.offset.reset", config.autoOffsetReset) } def validateClientId(clientId: String) { @@ -58,14 +58,6 @@ object ConsumerConfig extends Config { def validateGroupId(groupId: String) { validateChars("group.id", groupId) } - - def validateAutoOffsetReset(autoOffsetReset: String) { - autoOffsetReset match { - case OffsetRequest.SmallestTimeString => - case OffsetRequest.LargestTimeString => - case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of autoOffsetReset in ConsumerConfig") - } - } } class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) { diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 1dfc75c..e61cda6 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -52,13 +52,8 @@ class ConsumerFetcherThread(name: String, // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = { - var startTimestamp : Long = 0 - config.autoOffsetReset match { - case OffsetRequest.SmallestTimeString => startTimestamp = OffsetRequest.EarliestTime - case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime - case _ => startTimestamp = OffsetRequest.LatestTime - } - val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1))) + val request = OffsetRequest(Map(topicAndPartition -> + PartitionOffsetRequestInfo(OffsetRequest.getEarliestOrLatestTime(config.autoOffsetReset), 1))) val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition) val newOffset = partitionErrorAndOffset.error match { case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f65db33..d307c9c 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -21,11 +21,13 @@ import java.util.Properties import kafka.message.Message import kafka.consumer.ConsumerConfig import kafka.utils.{VerifiableProperties, ZKConfig, Utils} +import kafka.api.OffsetRequest +import kafka.common.Config /** * Configuration settings for the kafka server */ -class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) { +class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) with Config { def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) @@ -161,6 +163,16 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ val numReplicaFetchers = props.getInt("num.replica.fetchers", 1) + /** + * What to do if a replica fetches an offset that is out of the range of the leader? + * (Note that this can happen during an unclean leader election, i.e., a replica not in ISR + * is selected as the new leader.) + * smallest : truncate all its data and refetch from the smallest offset of the leader + * largest : truncate all its data and refetch from the largest offset of the leader + * anything else: throws configuration exception + */ + val replicaOffsetReset = props.getString("replica.offset.reset", OffsetRequest.SmallestTimeString) + /* the frequency with which the high watermark is saved out to disk */ val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L) @@ -170,4 +182,5 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the purge interval (in number of requests) of the producer request purgatory */ val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000) + validateAutoOffsetReset("replica.offset.reset", replicaOffsetReset) } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 37b71be..6ee4bcd 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -65,10 +65,11 @@ class ReplicaFetcherThread(name:String, // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = { - // This means the local replica is out of date. Truncate the log and catch up from beginning. + // This means the local replica is out of date. Truncate the log and catch up from the smallest or the largest offset. val request = OffsetRequest( replicaId = brokerConfig.brokerId, - requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)) + requestInfo = Map(topicAndPartition -> + PartitionOffsetRequestInfo(OffsetRequest.getEarliestOrLatestTime(brokerConfig.replicaOffsetReset), 1)) ) val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition) val offset = partitionErrorAndOffset.error match {