Description
Hello,
We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory:
2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry
We were able to reproduce this behavior reliably on 3.4.0. This is the sequence that triggers the bug.
Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), each with 5 threads (1-5), and the consumer is using stateful tasks which have state stores on disk. There are 10 active tasks and 10 standby tasks.
- Instance A is deactivated
- As an example, lets say task 0_1, previously on instance B, moves to instance C
- Task 0_1 leaves behind it's state directory on Instance B's disk, currently unused, and no lock for it exists in Instance B's StateDirectory in-memory lock tracker
- Instance A is re-activated
- Streams thread 1 on Instance B is asked to re-join the consumer group due to a new member being added
- As part of re-joining, thread 1 lists non-empty state directories in order to report the offset's it has in it's state stores as part of it's metadata. Thread 1 sees that the directory for 0_1 is not empty.
- The cleanup thread on instance B runs. The cleanup thread locks state store 0_1, sees the directory for 0_1 was last modified more than `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
- Thread 1 takes a lock on directory 0_1 due to it being found not-empty before, unaware that the cleanup has run between the time of the check and the lock. It tracks this lock in it's own in-memory store, in addition to StateDirectory's in-memory lock store
- Thread 1 successfully joins the consumer group
- After every consumer in the group joins the group, assignments are calculated, and then every consumer calls sync group to receive the new assignments
- Thread 1 on Instance B calls sync group but gets an error - the group coordinator has triggered a new rebalance and all members must rejoin the group
- Thread 1 again lists non-empty state directories in order to report the offset's it has in it's state stores as part of it's metadata. Prior to doing so, it clears it's in-memory store tracking the locks it has taken for the purpose of gathering rebalance metadata
- Thread 1 no longer takes a lock on 0_1 as it is empty
- However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
- All consumers re-join and sync successfully, receiving their new assignments
- Thread 2 on Instance B is assigned task 0_1
- Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is still being held by Thread 1
- Thread 2 remains in rebalancing state, and cannot make progress on task 0_1, or any other tasks it has assigned.
Attachments
Issue Links
- links to