Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10148

Kafka Streams Restores too few Records with eos-beta Enabled

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.6.0
    • Component/s: streams
    • Labels:
      None

      Description

      System test StreamsEosTest.test_failure_and_recovery for eos-beta exposes a bug that results in wrong results in the output topic. The cause seems to be a too low end offset during restoration of a state store.

      Example:

      The system test computes a minimum aggregate over records in an input topic and writes the results to an output topic. The input topic partition data-1 contains the following records among others:

      ...
      offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215
      ...
      offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595
      ...
      offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 headerKeys: [] key: 14920 payload: 9274
      ...
      

      The output topic partition min-1 contains:

      ...
      offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215
      ...
      offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595
      ...
      offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215
      ...
      

      The last record is obviously wrong because 1595 is less than 9215.

      To test the resilience to an unexpected failure of a Streams client, the system tests aborts a Streams client, i.e., the client is closed in a dirty manner. This dirty close causes the Streams client to restore its local state store that maintains the minimum aggregate from the beginning of the changelog topic partitions EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1. The partition EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1 contains:

      ...
      offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 headerKeys: [] key: 14920 payload: 9215
      ...
      offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595
      ...
      offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 headerKeys: [] key: 14920 payload: 9215
      ...
      

      Also here the last record is wrong.

      During the restoration, the Streams client uses its Kafka consumer to issue a list offsets request to get the end offset of the changelog topic partition. The response to the list offsets request contains end offset 1518 for EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1 as can be seen here:

      [2020-06-09 08:11:49,250] DEBUG [Consumer clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, groupId=null] Received LIST_OFFSETS response from node 2 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer, correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-4=PartitionData(errorCode: 0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1=PartitionData(errorCode: 0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) (org.apache.kafka.clients.NetworkClient)
      

      Offset 1518 is before record in EosTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-1

      offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 headerKeys: [] key: 14920 payload: 1595
      

      Hence, this record is not restored into the local state store. However, after the restoration the input topic partition data-1 is read starting with offset 2094. That means that record

      offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 headerKeys: [] key: 14920 payload: 1595
      

      is not read there either because it has a lower offset. Instead the following record with with key 14920 and value 9274 is read, but since 9274 is not less than 9215, value 9215 is written a second time to the output topic.

      I ran the system tests 10x with eos_alpha and 10x with eos_beta and only eos_beta failed a couple of times.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                cadonna Bruno Cadonna
                Reporter:
                cadonna Bruno Cadonna
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: