Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1179458) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -30,6 +30,7 @@ import kafka.api.OffsetRequest import java.util.UUID import kafka.serializer.Decoder +import kafka.common.InvalidConfigException /** * This class handles the consumers interaction with zookeeper @@ -302,7 +303,10 @@ return -2 } - def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = { + def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = + earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime) + + private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = { var simpleConsumer: SimpleConsumer = null var producedOffset: Long = -1L try { @@ -310,13 +314,12 @@ val broker = cluster.getBroker(brokerId) simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout, ConsumerConfig.SocketBufferSize) - val latestOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, - OffsetRequest.LatestTime, 1) - producedOffset = latestOffset(0) + val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1) + producedOffset = offsets(0) } catch { case e => - logger.error("error in getLatestOffset jmx ", e) + logger.error("error in earliestOrLatestOffset() ", e) } finally { if (simpleConsumer != null) @@ -553,9 +556,19 @@ val znode = topicDirs.consumerOffsetDir + "/" + partition.name val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode) - // If first time starting a consumer, use default offset. - // TODO: handle this better (if client doesn't know initial offsets) - val offset : Long = if (offsetString == null) Long.MaxValue else offsetString.toLong + // If first time starting a consumer, set the initial offset based on the config + var offset : Long = 0L + if (offsetString == null) + offset = config.autoOffsetReset match { + case OffsetRequest.SmallestTimeString => + earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.EarliestTime) + case OffsetRequest.LargestTimeString => + earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.LatestTime) + case _ => + throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig") + } + else + offset = offsetString.toLong val queue = queues.get((topic, consumerThreadId)) val consumedOffset = new AtomicLong(offset) val fetchedOffset = new AtomicLong(offset)