Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (date 1409681830000) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision ) @@ -89,7 +89,7 @@ private var fetcher: Option[ConsumerFetcherManager] = None private var zkClient: ZkClient = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] - private val checkpointedOffsets = new Pool[TopicAndPartition, Long] + private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long] private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-") private val messageStreamCreated = new AtomicBoolean(false) @@ -277,10 +277,13 @@ } def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) { + if (checkpointedZkOffsets.get(topicPartition) != offset) { - val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) - updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) + val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) + updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) + checkpointedZkOffsets.put(topicPartition, offset) - zkCommitMeter.mark() - } + zkCommitMeter.mark() + } + } def commitOffsets(isAutoCommit: Boolean = true) { var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) // no retries for commits from auto-commit @@ -289,10 +292,7 @@ while (!done) { val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => - partitionTopicInfos.filterNot { case (partition, info) => - val newOffset = info.getConsumeOffset() - newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId)) - }.map { case (partition, info) => + partitionTopicInfos.map { case (partition, info) => TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) } }.toSeq:_*) @@ -301,7 +301,6 @@ if (config.offsetsStorage == "zookeeper") { offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) => commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset) - checkpointedOffsets.put(topicAndPartition, offsetAndMetadata.offset) } true } else { @@ -316,13 +315,10 @@ val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded, (topicPartition, errorCode)) => - if (errorCode == ErrorMapping.NoError) { + if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled) { - val offset = offsetsToCommit(topicPartition).offset + val offset = offsetsToCommit(topicPartition).offset - checkpointedOffsets.put(topicPartition, offset) - if (config.dualCommitEnabled) { commitOffsetToZooKeeper(topicPartition, offset) - } + } - } (folded._1 || // update commitFailed errorCode != ErrorMapping.NoError, @@ -808,7 +804,7 @@ config.clientId) partTopicInfoMap.put(partition, partTopicInfo) debug(partTopicInfo + " selected new offset " + offset) - checkpointedOffsets.put(TopicAndPartition(topic, partition), offset) + checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset) } }