Description
We saw the following error in the log during testing. The problem seems to be that when the high watermark was at offset 39021, the broker incorrectly exposed an uncommitted message (at offset 39022) to the client. This doesn't always happen, but can happen when certain conditions are met, which I should explain in the comments.
2013/01/11 00:54:42.059 ERROR [KafkaApis] [kafka-request-handler-2] [kafka] [] [KafkaApi-277] error when processing request (service_metrics,2,39022,2000000)
java.lang.IllegalArgumentException: Attempt to read with a maximum offset (39021) less than the start offset (39022).
at kafka.log.LogSegment.read(LogSegment.scala:105)
at kafka.log.Log.read(Log.scala:386)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:369)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.immutable.Map$Map1.map(Map.scala:93)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
at kafka.server.KafkaApis$$anonfun$handleProducerRequest$3.apply(KafkaApis.scala:186)
at kafka.server.KafkaApis$$anonfun$handleProducerRequest$3.apply(KafkaApis.scala:185)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
at java.lang.Thread.run(Thread.java:619)