Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1384338) +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy) @@ -119,7 +119,7 @@ Assert.assertEquals(1, response2.errors.length) Assert.assertEquals(ErrorMapping.NoError, response2.errors(0)) - Assert.assertEquals(messageSet2.sizeInBytes, response2.offsets(0)) + Assert.assertEquals(0, response2.offsets(0)) } @Test @@ -162,8 +162,8 @@ // the first and last message should have been accepted by broker Assert.assertEquals(0, response2.errors(0)) Assert.assertEquals(0, response2.errors(2)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(0)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) + Assert.assertEquals(0, response2.offsets(0)) + Assert.assertEquals(0, response2.offsets(2)) // the middle message should have been rejected because broker doesn't lead partition Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1)) Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1384338) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -245,7 +245,7 @@ * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary. * Returns the offset at which the messages are written. */ - def append(messages: ByteBufferMessageSet): Unit = { + def append(messages: ByteBufferMessageSet): Long = { // validate the messages messages.verifyMessageSize(maxMessageSize) var numberOfMessages = 0 @@ -268,8 +268,10 @@ val validMessages = new ByteBufferMessageSet(validByteBuffer) // they are valid, insert them in the log + var messageOffset = -1L lock synchronized { try { + messageOffset = logEndOffset var segment = segments.view.last maybeRoll(segment) segment = segments.view.last @@ -283,6 +285,7 @@ case e2 => throw e2 } } + messageOffset } /** Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1384338) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -177,10 +177,10 @@ try { val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition) val log = localReplica.log.get - log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) + val messageOffset = log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) // we may need to increment high watermark since ISR could be down to 1 localReplica.partition.maybeIncrementLeaderHW(localReplica) - offsets(msgIndex) = log.logEndOffset + offsets(msgIndex) = messageOffset errors(msgIndex) = ErrorMapping.NoError.toShort trace("%d bytes written to logs, nextAppendOffset = %d" .format(partitionData.messages.sizeInBytes, offsets(msgIndex)))