Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9807

Race condition updating high watermark allows reads above LSO



    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s:, 1.0.2, 1.1.1, 2.0.1, 2.1.1, 2.2.2, 2.3.1, 2.4.1
    • Fix Version/s: 2.5.0, 2.4.2
    • Component/s: None
    • Labels:


      We had a transaction system test fail with the following error:

      AssertionError: Detected 37 dups in concurrently consumed messages

      After investigation, we found the duplicates were a result of the consumer reading an aborted transaction, which should not be possible with the read_committed isolation level.

      We tracked down the fetch request which returned the aborted data:

      [2020-03-24 07:27:58,284] INFO Completed request:RequestHeader(apiKey=FETCH, apiVersion=11, clientId=console-consumer, correlationId=283) -- {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=2043970605,session_epoch=87,topics=[{topic=output-topic,partitions=[{partition=1,current_leader_epoch=3,fetch_offset=48393,log_start_offset=-1,partition_max_bytes=1048576}]}],forgotten_topics_data=[],rack_id=},response:{throttle_time_ms=0,error_code=0,session_id=2043970605,responses=[{topic=output-topic,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=50646,last_stable_offset=50646,log_start_offset=0,aborted_transactions=[],preferred_read_replica=-1},record_set=FileRecords(size=31582, file=/mnt/kafka/kafka-data-logs-1/output-topic-1/00000000000000045694.log, start=37613, end=69195)}]}]} 

      After correlating with the contents of the log segment 00000000000000045694.log, we found that this fetch response included data which was above the returned LSO which is 50646. In fact, the high watermark matched the LSO in this case, so the data was above the high watermark as well.

      At the same time this request was received, we noted that the high watermark was updated:

      [2020-03-24 07:27:58,284] DEBUG [Partition output-topic-1 broker=3] High watermark updated from (offset=50646 segment=[45694:68690]) to (offset=50683 segment=[45694:69195]) (kafka.cluster.Partition)

      The position of the new high watermark matched the end position from the fetch response, so that led us to believe there was a race condition with the updating of this value. In the code, we have the following (abridged) logic for fetching the LSO:

          firstUnstableOffsetMetadata match {
            case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark => offsetMetadata
            case _ => fetchHighWatermarkMetadata

      If the first unstable offset is less than the high watermark, we should use that; otherwise we use the high watermark. The problem is that the high watermark referenced here could be updated between the range check and the call to `fetchHighWatermarkMetadata`. If that happens, we would end up reading data which is above the first unstable offset.

      The solution to fix this problem is to cache the high watermark value so that it is used in both places. We may consider some additional improvements here as well, such as fixing the inconsistency problem in the fetch response which included data above the returned high watermark. We may also consider having the client react more defensively by ignoring fetched data above the high watermark. This would fix this problem for newer clients talking to older brokers which might hit this problem.


          Issue Links



              • Assignee:
                hachikuji Jason Gustafson
                hachikuji Jason Gustafson
              • Votes:
                0 Vote for this issue
                3 Start watching this issue


                • Created: