Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7672

The local state not fully restored after KafkaStream rebalanced, resulting in data loss



    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.1.0, 1.1.1, 2.0.0, 2.1.0
    • Fix Version/s: 2.2.0
    • Component/s: streams
    • Labels:
    • Flags:
      Patch, Important


      Normally, when a task is migrated to a new thread and no checkpoint file was found under its task folder, Kafka Stream needs to restore the local state for remote changelog topic completely and then resume running. However, in some scenarios, we found that Kafka Stream NOT restore this state even no checkpoint was found, but just clean the state folder and transition to running state directly, resulting the historic data loss. 

      To be specific, I will give the detailed logs for Kafka Stream in our project to show this scenario: 

      2018-10-23 08:27:07,684 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] Revoking previously assigned partitions [AuditTrailBatch-0-5]

      2018-10-23 08:27:07,684 INFO  org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_REVOKED

      2018-10-23 08:27:10,856 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] (Re-)joining group

      2018-10-23 08:27:53,153 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] Successfully joined group with generation 323

      2018-10-23 08:27:53,153 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]

      2018-10-23 08:27:53,153 INFO  org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

      2018-10-23 08:27:53,153 INFO  org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread [AuditTrailBatch-StreamThread-1] Creating producer client for task 1_1

      2018-10-23 08:27:53,622 INFO  org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.

      2018-10-23 08:27:54,357 INFO  org.apache.kafka.streams.processor.internals.StoreChangelogReader - stream-thread [AuditTrailBatch-StreamThread-1]No checkpoint found for task 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on. Reinitializing the task and restore its state from the beginning.

      2018-10-23 08:27:54,357 INFO  org.apache.kafka.clients.consumer.internals.Fetcher          - [Consumer clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]Resetting offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.

      2018-10-23 08:27:54,653 INFO  org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread [AuditTrailBatch-StreamThread-1]State transition from PARTITIONS_ASSIGNED to RUNNING

      From the logs above, we can get the procedure for thread AuditTrailBatch-StreamThread-1:

      1. the previous running task assigned to thread 1 is task 0_5 (the corresponding partition is AuditTrailBatch-0-5)
      2. group begins to rebalance, the new task 1_1 is assigned to thread 1.
      3. no checkpoint was found under 1_1 state folder, so reset the offset to 0 and clean the local state folder.
      4. thread 1 transitions to RUNNING state directly without the restoration for task 1_1, so the historic data for state 1_1 is lost for thread 1. 


      To investigate the cause for this issue, we analysis the source code in KafkaStream and found the key is the variable named "completedRestorers".

      This is the definition of the variable:

      private final Set<TopicPartition> completedRestorers = new HashSet<>();

      Each thread object has its own completedRestorers, which is created in the thread initialization, and not accessed crossly by other threads. The completedRestorers is used to record the partitions that has been restored completely in the thread.

      if (restorer.hasCompleted(pos, endOffsets.get(partition))) {

      Once the partition is added to completedRestorers set, it will be returned by restore() and pass to the next caller updateRestored(), and then the transitionToRunning() will set this task to running state. 

      But we found that completedRestorers never be cleared during the life cycle of this thread, even in the reset function:

      public void reset() {

      It will cause a problem: we assume that the task 1 once assigned to thread A, so its partition has been added to completeRestores. Then it migrated to another thread (maybe in a different instance). After several rounds of rebalancing, it transitioned to thread A again and no checkpoint was here for some reason. The right way is to clean the state folder and restore it for beginning, but now, it found this task's partition is already in completedRestorers list, so it will consider this task as restored completely and resumed running directly.

      To avoid it, we should clean the historical completedRestorers set every time after reassignment. So I add the clear operation in the reset() and validate it works.

      public void reset() {
        //add by linyli



      In addition, I also investigate why no checkpoint was found for this state sometimes, and I found that the most common sense is when a task is migrated from one thread to another thread in the same instance.


      From source code about task reassignment, we know that the task needs write to its checkpoint file in EOS when it's closed by the previous thread, and the next thread will create the task and read from the checkpoint file for restoration. But the read/write process for this checkpoint file is Asynchronous! So it's most probably that the next thread read before the previous one finished writing, causing no checkpoint found issue and need extra restoration, which is totally a waste of time and network.

      To avoid the concurrency of read/write, I advise to add some wait time when read checkpoint to restore.

      This is my fix: 

      AbstractStateManager(final File baseDir,
       final boolean eosEnabled) {
       this.baseDir = baseDir;
       this.eosEnabled = eosEnabled;
       //add by linyli to fix checkpoint file latency in the same instance.
         File checkpointfile = new File(baseDir, CHECKPOINT_FILE_NAME);
         if(!checkpointfile.exists()) {
       }catch (InterruptedException e)
       this.checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));




            • Assignee:
              linyli linyue li
              linyli linyue li
              John Roesler
            • Votes:
              3 Vote for this issue
              20 Start watching this issue


              • Created: