From a51bd8c5a1dcc2ba9c60bb39e3cf8c0a444ec2e4 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 25 Nov 2013 18:04:28 -0600 Subject: [PATCH 3/3] dont do conditional update check if the path doesnt exist yet --- .../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 8d1d35a..e1133a3 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -266,8 +266,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (newOffset != checkpointedOffsets.get(TopicAndPartition(topic, info.partition))) { try { val path = topicDirs.consumerOffsetDir + "/" + info.partition - if (preventBackwardsCommit) { - //TODO + //If the path doesn't exist yet, unfortunately the api doesn't give us a way to check that we are the ones that create it + val exists = pathExists(zkClient, path) + if (preventBackwardsCommit && exists) { var needToUpdate = true while (needToUpdate) { val (originalValue, originalStat) = readData(zkClient, path) -- 1.7.10.4