Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16017

Checkpointed offset is incorrect when task is revived and restoring



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.1
    • 3.4.2, 3.7.0, 3.6.2, 3.5.3
    • streams
    • None


      Streams checkpoints the wrong offset when a task is revived after a TaskCorruptedException and the task is then migrated to another stream thread during restoration.

      This might happen in a situation like the following if the Streams application runs under EOS:

      1. Streams encounters a Network error which triggers a TaskCorruptedException
      2. The task that encountered the exception is closed dirty and revived. The state store directory is wiped out and a rebalance is triggered.
      3. Until the sync of the rebalance is received the revived task is restoring.
      4. When the sync is received the revived task is revoked and a new rebalance is triggered. During the revocation the task is closed cleanly and a checkpoint file is written.
      5. With the next rebalance the task moves back to stream thread from which it was revoked, read the checkpoint and starts restoring. (I might be enough if the task moves to a stream thread on the same Streams client that shares the same state directory).
      6. The state of the task misses some records

      To mitigate the issue one can restart the the stream thread and delete of the state on disk. After that the state restores completely from the changelog topic and the state does not miss any records anymore.

      The root cause is that the checkpoint that is written in step 4 contains the offset that the record collector stored when it sent the records to the changelog topic. However, since in step 2 the state directory is wiped out, the state does not contain those records anymore. It only contains the records that it restored in step 3 which might be less. The root cause of this is that the offsets in the record collector are not cleaned up when the record collector is closed.

      I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.

      The repro can be started with

      ./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x spotbugsTest --tests RestoreIntegrationTest.test --info > test.log

      The repro writes records into a state store and tries to retrieve them again (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582). It will throw an IllegalStateException if it cannot find a record in the state (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594). Once the offsets in the record collector are cleared on close (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332 and https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349), the IllegalStateException does not occur anymore.

      In the logs you can check for

      • Restore batch end offset is which are the restored offsets in the state.
      • task [0_1] Writing checkpoint: which are the written checkpoints.
      • task [0_1] Checkpointable offsets which show the offsets coming from the sending records to the changelog topic RestoreIntegrationTesttest-stateStore-changelog-1
        Always the last instances of these before the IllegalStateException is thrown.

      You will see that the restored offsets are less than the offsets that are written to the checkpoint. The offsets written to the checkpoint come from the offsets stored when sending the records to the changelog topic.




            cadonna Bruno Cadonna
            cadonna Bruno Cadonna
            0 Vote for this issue
            3 Start watching this issue