Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16025

Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.8.0, 3.7.1
    • streams
    • None
    • Linux

    Description

      Hello,

       

      We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory:

       

      2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry

       

      We were able to reproduce this behavior reliably on 3.4.0. This is the sequence that triggers the bug.

      Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), each with 5 threads (1-5), and the consumer is using stateful tasks which have state stores on disk. There are 10 active tasks and 10 standby tasks.

      1. Instance A is deactivated
      2. As an example, lets say task 0_1, previously on instance B, moves to instance C
      3. Task 0_1 leaves behind it's state directory on Instance B's disk, currently unused, and no lock for it exists in Instance B's StateDirectory in-memory lock tracker
      4. Instance A is re-activated
      5. Streams thread 1 on Instance B is asked to re-join the consumer group due to a new member being added
      6. As part of re-joining, thread 1 lists non-empty state directories in order to report the offset's it has in it's state stores as part of it's metadata. Thread 1 sees that the directory for 0_1 is not empty.
      7. The cleanup thread on instance B runs. The cleanup thread locks state store 0_1, sees the directory for 0_1 was last modified more than `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
      8. Thread 1 takes a lock on directory 0_1 due to it being found not-empty before, unaware that the cleanup has run between the time of the check and the lock. It tracks this lock in it's own in-memory store, in addition to StateDirectory's in-memory lock store
      9. Thread 1 successfully joins the consumer group
      10. After every consumer in the group joins the group, assignments are calculated, and then every consumer calls sync group to receive the new assignments
      11. Thread 1 on Instance B calls sync group but gets an error - the group coordinator has triggered a new rebalance and all members must rejoin the group
      12. Thread 1 again lists non-empty state directories in order to report the offset's it has in it's state stores as part of it's metadata. Prior to doing so, it clears it's in-memory store tracking the locks it has taken for the purpose of gathering rebalance metadata
      13. Thread 1 no longer takes a lock on 0_1 as it is empty
      14. However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
      15. All consumers re-join and sync successfully, receiving their new assignments
      16. Thread 2 on Instance B is assigned task 0_1
      17. Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is still being held by Thread 1
      18. Thread 2 remains in rebalancing state, and cannot make progress on task 0_1, or any other tasks it has assigned.

      Attachments

        Issue Links

          Activity

            People

              sabitn Sabit
              sabitn Sabit
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: