From 81bb36b5652ce3fa208dc7221aa00d69ceb49d7e Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Sun, 24 Nov 2013 19:32:22 -0600 Subject: [PATCH 2/2] add protection against backward commits --- .../scala/kafka/consumer/ConsumerConnector.scala | 6 ++-- .../consumer/ZookeeperConsumerConnector.scala | 31 ++++++++++++++++---- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala index 5902ef7..0e3e29f 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala @@ -77,8 +77,8 @@ trait ConsumerConnector { * Generally they should be taken from MessageAndMetadata. Note that you are free to only update * some offsets, and leave others as is */ - def commitOffsets(offsets: Seq[PartitionTopicOffset]) { - commitOffsets(offsets.groupBy{pto => pto.topic}) + def commitOffsets(offsets: Seq[PartitionTopicOffset], preventBackwardsCommit: Boolean) { + commitOffsets(offsets.groupBy{pto => pto.topic}, preventBackwardsCommit) } /** @@ -86,7 +86,7 @@ trait ConsumerConnector { * Generally they should be taken from MessageAndMetadata. Note that you are free to only update * some offsets, and leave others as is */ - def commitOffsets(offsets: Iterable[(String, Iterable[PartitionTopicOffset])]) + def commitOffsets(offsets: Iterable[(String, Iterable[PartitionTopicOffset])], preventBackwardsCommit: Boolean) /** diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 11091af..8d1d35a 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -34,7 +34,6 @@ import kafka.utils.Utils.inLock import kafka.common._ import com.yammer.metrics.core.Gauge import kafka.metrics._ -import scala.Some /** @@ -250,10 +249,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val offsets = topicRegistry.map{case(k,v) => k -> v.values.map{ info => PartitionTopicOffset(k, info.partitionId, info.getConsumeOffset) }} - commitOffsets(offsets) + commitOffsets(offsets, false) } - def commitOffsets(offsets: Iterable[(String, Iterable[PartitionTopicOffset])]) { + def commitOffsets(offsets: Iterable[(String, Iterable[PartitionTopicOffset])], preventBackwardsCommit: Boolean = true) { if (zkClient == null) { error("zk client is null. Cannot commit offsets") return @@ -266,8 +265,30 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val newOffset = info.offset if (newOffset != checkpointedOffsets.get(TopicAndPartition(topic, info.partition))) { try { - updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition, - newOffset.toString) + val path = topicDirs.consumerOffsetDir + "/" + info.partition + if (preventBackwardsCommit) { + //TODO + var needToUpdate = true + while (needToUpdate) { + val (originalValue, originalStat) = readData(zkClient, path) + val originalOffset = try { + java.lang.Long.parseLong(originalValue) + } catch { + case _ => -1 + } + if (originalOffset < newOffset) { + conditionalUpdatePersistentPath(zkClient, path, newOffset.toString, originalStat.getVersion) + } else { + // we can just ignore this update completely (maybe this partition was reassigned somewhere else, + // and the other processor of the thread has gotten further) + needToUpdate = false + } + + } + } else { + updatePersistentPath(zkClient, path, + newOffset.toString) + } } catch { case t: Throwable => // log it and let it go -- 1.7.10.4