Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1375670) +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) @@ -110,6 +110,10 @@ val stringProducer1 = new Producer[String, String](config) stringProducer1.send(new ProducerData[String, String](topic, Array("test-message"))) + val replica = servers.head.replicaManager.getReplica(topic, 0).get + assertTrue("HighWatermark should equal logEndOffset with just 1 replica", + replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark) + val request = new FetchRequestBuilder() .correlationId(100) .clientId("test-client") Index: core/src/main/scala/kafka/cluster/Partition.scala =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (revision 1375670) +++ core/src/main/scala/kafka/cluster/Partition.scala (working copy) @@ -113,6 +113,8 @@ assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) inSyncReplicas = newInSyncReplicas leaderReplicaIdOpt = Some(localBrokerId) + // we may need to increment high watermark since ISR could be down to 1 + maybeIncrementLeaderHW(getReplica().get) true } else false @@ -208,7 +210,7 @@ } } - private def maybeIncrementLeaderHW(leaderReplica: Replica) { + def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min val oldHighWatermark = leaderReplica.highWatermark @@ -230,6 +232,8 @@ info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache updateISR(newInSyncReplicas) + // we may need to increment high watermark since ISR could be down to 1 + maybeIncrementLeaderHW(leaderReplica) } case None => // do nothing if no longer leader } Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1375670) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -184,6 +184,8 @@ val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition) val log = localReplica.log.get log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) + // we may need to increment high watermark since ISR could be down to 1 + localReplica.partition.maybeIncrementLeaderHW(localReplica) offsets(msgIndex) = log.logEndOffset errors(msgIndex) = ErrorMapping.NoError.toShort trace("%d bytes written to logs, nextAppendOffset = %d"