From cc351474f05618ec3424e98eb33bc36b1abf05a5 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 21 Nov 2013 14:51:12 -0600 Subject: [PATCH 1/2] allow committing of arbitrary offsets, to facilitate batch processing --- .../scala/kafka/consumer/ConsumerConnector.scala | 21 +++++++++++- .../consumer/ZookeeperConsumerConnector.scala | 36 ++++++++++++-------- 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala index 13c3f77..5902ef7 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala @@ -71,8 +71,25 @@ trait ConsumerConnector { * Commit the offsets of all broker partitions connected by this connector. */ def commitOffsets - + + /** + * Commit the given offsets. Entirely left to the user to make sure they are committing sensible updates. + * Generally they should be taken from MessageAndMetadata. Note that you are free to only update + * some offsets, and leave others as is + */ + def commitOffsets(offsets: Seq[PartitionTopicOffset]) { + commitOffsets(offsets.groupBy{pto => pto.topic}) + } + /** + * Commit the given offsets. Entirely left to the user to make sure they are committing sensible updates. + * Generally they should be taken from MessageAndMetadata. Note that you are free to only update + * some offsets, and leave others as is + */ + def commitOffsets(offsets: Iterable[(String, Iterable[PartitionTopicOffset])]) + + + /** * Shut down the connector */ def shutdown() @@ -101,3 +118,5 @@ object Consumer extends Logging { consumerConnect } } + +case class PartitionTopicOffset(topic: String, partition: Int, offset: Long) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 612aeec..11091af 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -247,25 +247,33 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } def commitOffsets() { + val offsets = topicRegistry.map{case(k,v) => k -> v.values.map{ + info => PartitionTopicOffset(k, info.partitionId, info.getConsumeOffset) + }} + commitOffsets(offsets) + } + + def commitOffsets(offsets: Iterable[(String, Iterable[PartitionTopicOffset])]) { if (zkClient == null) { error("zk client is null. Cannot commit offsets") return } - for ((topic, infos) <- topicRegistry) { - val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - for (info <- infos.values) { - val newOffset = info.getConsumeOffset - if (newOffset != checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))) { - try { - updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId, newOffset.toString) - checkpointedOffsets.put(TopicAndPartition(topic, info.partitionId), newOffset) - } catch { - case t: Throwable => - // log it and let it go - warn("exception during commitOffsets", t) - } - debug("Committed offset " + newOffset + " for topic " + info) + for { + (topic, infos) <- offsets + topicDirs = new ZKGroupTopicDirs(config.groupId, topic) + info <- infos + } { + val newOffset = info.offset + if (newOffset != checkpointedOffsets.get(TopicAndPartition(topic, info.partition))) { + try { + updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition, + newOffset.toString) + } catch { + case t: Throwable => + // log it and let it go + warn("exception during commitOffsets", t) } + debug("Committed offset " + newOffset + " for topic " + info) } } } -- 1.7.10.4