Even after kafka-698 is fixed, we still see consumer clients occasionally see uncommitted data. The following is how this can happen.
1. In Log.read(), we pass in startOffset < HW and maxOffset = HW.
2. Then we call LogSegment.read(), in which we call translateOffset on the maxOffset. The offset doesn't exist and translateOffset returns null.
3. Continue in LogSegment.read(), we then call messageSet.sizeInBytes() to fetch and return the data.
What can happen is that between step 2 and step 3, a new message is appended to the log and is not committed yet. Now, we have exposed uncommitted data to the client.