Kafka
  1. Kafka
  2. KAFKA-727

broker can still expose uncommitted data to a consumer

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:

      Description

      Even after kafka-698 is fixed, we still see consumer clients occasionally see uncommitted data. The following is how this can happen.

      1. In Log.read(), we pass in startOffset < HW and maxOffset = HW.
      2. Then we call LogSegment.read(), in which we call translateOffset on the maxOffset. The offset doesn't exist and translateOffset returns null.
      3. Continue in LogSegment.read(), we then call messageSet.sizeInBytes() to fetch and return the data.

      What can happen is that between step 2 and step 3, a new message is appended to the log and is not committed yet. Now, we have exposed uncommitted data to the client.

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          Checked in patch v1 to proceed with build.

          Show
          Neha Narkhede added a comment - Checked in patch v1 to proceed with build.
          Hide
          Neha Narkhede added a comment -

          +1. Minor change before checkin
          Remove unused import "import joptsimple._" from StressTestLog

          Show
          Neha Narkhede added a comment - +1. Minor change before checkin Remove unused import "import joptsimple._" from StressTestLog
          Hide
          Jay Kreps added a comment -

          Patch v1.
          1. Snapshot the size of the log prior to translating the maxOffset to a file position to give a consistent end point.
          2. Fix docs on read so they match the code
          3. Add a stress test that does reads and writes at the same point to validate fix

          Show
          Jay Kreps added a comment - Patch v1. 1. Snapshot the size of the log prior to translating the maxOffset to a file position to give a consistent end point. 2. Fix docs on read so they match the code 3. Add a stress test that does reads and writes at the same point to validate fix
          Hide
          Neha Narkhede added a comment -

          >> I think another fix is to just save the size of the log prior to translating the hw mark and use this rather than dynamically checking log.sizeInBytes later in the method. This will effectively act as a valid lower bound.

          +1

          >> It might also be worthwhile to write a throw away torture test that has one thread do appends and another thread do reads and check that this condition is not violated in case there are any more of these subtleties.

          This will be really good to have going forward

          Show
          Neha Narkhede added a comment - >> I think another fix is to just save the size of the log prior to translating the hw mark and use this rather than dynamically checking log.sizeInBytes later in the method. This will effectively act as a valid lower bound. +1 >> It might also be worthwhile to write a throw away torture test that has one thread do appends and another thread do reads and check that this condition is not violated in case there are any more of these subtleties. This will be really good to have going forward
          Hide
          Jun Rao added a comment -

          Jay, sure, you can take this on. The way we saw this is that we had a consumer client that uses minBytes=1 and a producer that produces data once every couple of secs.

          Show
          Jun Rao added a comment - Jay, sure, you can take this on. The way we saw this is that we had a consumer client that uses minBytes=1 and a producer that produces data once every couple of secs.
          Hide
          Jay Kreps added a comment -

          Fantastic catch.

          I think another fix is to just save the size of the log prior to translating the hw mark and use this rather than dynamically checking log.sizeInBytes later in the method. This will effectively act as a valid lower bound.

          It might also be worthwhile to write a throw away torture test that has one thread do appends and another thread do reads and check that this condition is not violated in case there are any more of these subtleties.

          Happy to take this one on since it is my bad.

          Show
          Jay Kreps added a comment - Fantastic catch. I think another fix is to just save the size of the log prior to translating the hw mark and use this rather than dynamically checking log.sizeInBytes later in the method. This will effectively act as a valid lower bound. It might also be worthwhile to write a throw away torture test that has one thread do appends and another thread do reads and check that this condition is not violated in case there are any more of these subtleties. Happy to take this one on since it is my bad.
          Hide
          Jun Rao added a comment -

          One way to fix this is for FileMessageSet.searchFor() to return OffsetPosition(-1L, the value of size) if offset is not found, instead of returning null. In LogSegment.read(), we can use the returned position to guard the length of messageSet.read.

          Show
          Jun Rao added a comment - One way to fix this is for FileMessageSet.searchFor() to return OffsetPosition(-1L, the value of size) if offset is not found, instead of returning null. In LogSegment.read(), we can use the returned position to guard the length of messageSet.read.

            People

            • Assignee:
              Jay Kreps
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development