Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 2cdc83e85b56c946eed51da6143fbe9d2d0c9d08) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision ) @@ -281,18 +281,20 @@ updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) zkCommitMeter.mark() } - + val forceCommit = config.offsetsStorage.equals("kafka") def commitOffsets(isAutoCommit: Boolean = true) { var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) // no retries for commits from auto-commit var done = false - while (!done) { val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => - partitionTopicInfos.filterNot { case (partition, info) => + val relevantOffsets = + if (forceCommit) partitionTopicInfos + else partitionTopicInfos.filterNot { case (partition, info) => - val newOffset = info.getConsumeOffset() - newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId)) + val newOffset = info.getConsumeOffset() + newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId)) - }.map { case (partition, info) => + } + relevantOffsets.map { case (partition, info) => TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) } }.toSeq:_*)