diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index 64b702b..9c779ce 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -62,6 +62,8 @@ class PartitionTopicInfo(val topic: String, debug("updated fetch offset of (%s) to %d".format(this, next)) consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size) consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size) + } else if(messages.sizeInBytes > 0) { + chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) } }