Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 3f1e0882297a102843dcd17e05f54ddbedb4882a) +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision ) @@ -174,6 +174,31 @@ requestHandlerLogger.setLevel(Level.ERROR) } + def testLogEndOffset() { + // send some messages to each broker + val partitionMessageCounts = Map(0 -> 3, 1 -> 5) + val sentMessages = sendMessagesToPartition(configs, topic, 0, partitionMessageCounts(0), DefaultCompressionCodec) ++ + sendMessagesToPartition(configs, topic, 1, partitionMessageCounts(1), DefaultCompressionCodec) + + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) + + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) + val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + topicMessageStreams1.values.head foreach { streams => + + val iter: ConsumerIterator[String, String] = streams.iterator() + + for (i <- 0 until partitionMessageCounts.values.sum) { + val msg: MessageAndMetadata[String, String] = iter.next() + assertEquals(partitionMessageCounts(msg.partition), msg.logEndOffset) + } + } + + zkConsumerConnector1.shutdown + } def testCompression() { val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 3f1e0882297a102843dcd17e05f54ddbedb4882a) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision ) @@ -100,7 +100,7 @@ item.message.ensureValid() // validate checksum of message to ensure it is valid - new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) + new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, currentTopicInfo.getMaxEndOffset, keyDecoder, valueDecoder) } def clearCurrentChunk() { Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 3f1e0882297a102843dcd17e05f54ddbedb4882a) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision ) @@ -835,6 +835,7 @@ queue, consumedOffset, fetchedOffset, + new AtomicLong(0), new AtomicInteger(config.fetchMessageMaxBytes), config.clientId) partTopicInfoMap.put(partition, partTopicInfo) Index: core/src/main/scala/kafka/message/MessageAndMetadata.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/message/MessageAndMetadata.scala (revision 3f1e0882297a102843dcd17e05f54ddbedb4882a) +++ core/src/main/scala/kafka/message/MessageAndMetadata.scala (revision ) @@ -20,8 +20,11 @@ import kafka.serializer.Decoder import kafka.utils.Utils -case class MessageAndMetadata[K, V](topic: String, partition: Int, - private val rawMessage: Message, offset: Long, +case class MessageAndMetadata[K, V](topic: String, + partition: Int, + private val rawMessage: Message, + offset: Long, + logEndOffset: Long, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) { /** Index: core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala (revision 3f1e0882297a102843dcd17e05f54ddbedb4882a) +++ core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala (revision ) @@ -53,6 +53,7 @@ queue, new AtomicLong(consumedOffset), new AtomicLong(0), + new AtomicLong(0), new AtomicInteger(0), "")) val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) Index: core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (revision 3f1e0882297a102843dcd17e05f54ddbedb4882a) +++ core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (revision ) @@ -29,7 +29,7 @@ sourceBroker: Broker, partitionMap: Map[TopicAndPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) - extends AbstractFetcherThread(name = name, + extends AbstractFetcherThread(name = name, clientId = config.clientId, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, @@ -46,6 +46,7 @@ if (pti.getFetchOffset != fetchOffset) throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) + pti.resetMaxEndOffset(partitionData.hw) pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) } Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/test/scala/unit/kafka/integration/FetcherTest.scala (revision 3f1e0882297a102843dcd17e05f54ddbedb4882a) +++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala (revision ) @@ -47,6 +47,7 @@ queue, new AtomicLong(0), new AtomicLong(0), + new AtomicLong(0), new AtomicInteger(0), "")) Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 3f1e0882297a102843dcd17e05f54ddbedb4882a) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision ) @@ -27,6 +27,7 @@ private val chunkQueue: BlockingQueue[FetchedDataChunk], private val consumedOffset: AtomicLong, private val fetchedOffset: AtomicLong, + private val maxEndOffset: AtomicLong, private val fetchSize: AtomicInteger, private val clientId: String) extends Logging { @@ -39,6 +40,8 @@ def getFetchOffset() = fetchedOffset.get + def getMaxEndOffset() = maxEndOffset.get + def resetConsumeOffset(newConsumeOffset: Long) = { consumedOffset.set(newConsumeOffset) debug("reset consume offset of " + this + " to " + newConsumeOffset) @@ -47,6 +50,11 @@ def resetFetchOffset(newFetchOffset: Long) = { fetchedOffset.set(newFetchOffset) debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset)) + } + + def resetMaxEndOffset(newMaxEndOffset: Long) = { + maxEndOffset.set(newMaxEndOffset) + debug("reset max end offset of ( %s ) to %d".format(this, newMaxEndOffset)) } /** \ No newline at end of file