Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8187

State store record loss across multiple reassignments when using standby tasks



    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.0.1
    • Fix Version/s: 2.3.0, 2.1.2, 2.2.1
    • Component/s: streams
    • Labels:


      There is a race condition that can cause a partitioned state store to be missing records up to an offset when using standby tasks.

      When a reassignment occurs and a task is migrated to a StandbyTask in another StreamThread/TaskManager on the same JVM, there can be lock contention that prevents the StandbyTask on the currently assigned StreamThread from acquiring the lock and to not retry acquiring the lock because all of the active StreamTasks are running for that StreamThread. If the StandbyTask does not acquire the lock before the StreamThread enters into the RUNNING state, then the StandbyTask will not consume any records. If there is no subsequent reassignment before the second execution of the stateDirCleaner Thread, then the task directory for the StandbyTask will be deleted. When the next reassignment occurs the offset that was read by the StandbyTask at creation time before acquiring the lock will be written back to the state store directory, this re-creates the state store directory.

      An example:
      StreamThread(A) and StreamThread(B) are running on the same JVM in the same streams application.

      StreamThread(A) has StandbyTask 1_0
      StreamThread(B) has no tasks

      A reassignment is triggered by another host in the streams application fleet.

      StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads one task
      StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby task for 1_0

      Here begins the race condition.
      StreamThread(B) creates the StandbyTask which reads the current checkpoint from disk.
      StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's assigned tasks. [0]
      StreamThread(B) initializes the new tasks for the active and standby tasks. [1] [2]
      StreamThread(B) attempts to lock the state directory for task 1_0 but fails with a LockException [3], since StreamThread(A) still holds the lock.
      StreamThread(B) returns true from updateNewAndRestoringTasks() due to the check at [4] which only checks that the active assigned tasks are running.
      StreamThread(B) state is set to RUNNING
      StreamThread(A) closes the previous StandbyTask specifically calling closeStateManager() [5]
      StreamThread(A) state is set to RUNNING

      Streams application for this host has completed re-balancing and is now in the RUNNING state.

      State at this point is the following: State directory exists for 1_0 and all data is present.

      Then at a period that is 1 to 2 intervals of [6](which is default of 10 minutes) after the reassignment had completed the stateDirCleaner thread will execute [7].

      The stateDirCleaner will then do [8], which finds the directory 1_0, finds that there isn't an active lock for that directory, acquire the lock, and deletes the directory.

      State at this point is the following: State directory does not exist for 1_0.

      When the next reassignment occurs. The offset that was read by StreamThread(B) during construction of the StandbyTask for 1_0 will be written back to disk. This write re-creates the state store directory and writes the .checkpoint file with the old offset.

      State at this point is the following: State directory exists for 1_0 with a '.checkpoint' file in it, but there is no other state store data in the directory.

      If this host is assigned the active task for 1_0 then all the history in the state store will be missing from before the offset that was read at the previous reassignment.
      If this host is assigned the standby task for 1_0 then the lock will be acquired and the standby will start to consume records, but it will still be missing all records from before the offset that was read at the previous reassignment.
      If this host is not assigned 1_0, then the state directory will get cleaned up by the stateDirCleaner thread 10 to 20 minutes later and the record loss issue will be hidden.

      [0] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869
      [1] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L324-L340
      [2] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L65-L84
      [3] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L212-L236
      [4] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L332
      [5] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L245-L264
      [6] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L797
      [7] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L798-L803
      [8] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java#L262-L327

      How we recovered from the record loss:
      1. Stop the streams application
      2. Delete the impacted task directory to remove the .checkpoint file
      3. Restart the streams application

      Some possible ways of addressing this issue could be the following:
      1. Check that the assigned standbys are running in addition to the assigned active tasks before returning in this method [1]
      2. Only write the checkpoint file for a task if the thread still has the state directory lock for the task [9], on close StandbyTasks commit the offsets they have in memory. [10]
      3. Read the checkpoint file after acquiring the locks for a task in the StreamThread.

      [9] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L325
      [10] https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java#L125-L148




            • Assignee:
              hustclf Lifei Chen
              wgreerx William Greer
            • Votes:
              0 Vote for this issue
              7 Start watching this issue


              • Created: