Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1179709) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -30,6 +30,7 @@ import kafka.api.OffsetRequest import java.util.UUID import kafka.serializer.Decoder +import kafka.common.InvalidConfigException /** * This class handles the consumers interaction with zookeeper @@ -302,7 +303,10 @@ return -2 } - def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = { + def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = + getEarliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime) + + private def getEarliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = { var simpleConsumer: SimpleConsumer = null var producedOffset: Long = -1L try { @@ -310,8 +314,7 @@ val broker = cluster.getBroker(brokerId) simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout, ConsumerConfig.SocketBufferSize) - val latestOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, - OffsetRequest.LatestTime, 1) + val latestOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1) producedOffset = latestOffset(0) } catch { @@ -553,9 +556,19 @@ val znode = topicDirs.consumerOffsetDir + "/" + partition.name val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode) - // If first time starting a consumer, use default offset. - // TODO: handle this better (if client doesn't know initial offsets) - val offset : Long = if (offsetString == null) Long.MaxValue else offsetString.toLong + // If first time starting a consumer, set the initial offset based on the config + var offset : Long = 0L + if (offsetString == null) + offset = config.autoOffsetReset match { + case OffsetRequest.SmallestTimeString => + getEarliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.EarliestTime) + case OffsetRequest.SmallestTimeString => + getEarliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.LatestTime) + case _ => + throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig") + } + else + offset = offsetString.toLong val queue = queues.get((topic, consumerThreadId)) val consumedOffset = new AtomicLong(offset) val fetchedOffset = new AtomicLong(offset)