Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 1181905) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (working copy) @@ -64,10 +64,10 @@ // update fetched offset to the compressed data chunk size, not the decompressed message set size if(logger.isTraceEnabled) logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size) + chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) val newOffset = fetchedOffset.addAndGet(size) if (logger.isDebugEnabled) logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) - chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) } size }