Description
In https://issues.apache.org/jira/browse/KAFKA-3559 task reusage on rebalance was introduces as a performance optimization. Instead of closing a task on rebalance (ie, onPartitionsRevoked(), it only get's suspended for a potential reuse in onPartitionsAssigned(). Only if a task cannot be reused, it will eventually get closed in onPartitionsAssigned().
This mechanism can fail, if multiple StreamThreads run in the same host (same or different JVM). The scenario is as follows:
- assume 2 running threads A and B
- assume 3 tasks t1, t2, t3
- assignment: A-(t1,t2) and B-(t3)
- on the same host, a new single threaded Stream application (same app-id) gets started (thread C)
- on rebalance, t2 (could also be t1 – does not matter) will be moved from A to C
- as assignment is only sticky base on an heurictic t1 can sometimes be assigned to B, too – and t3 get's assigned to A (thre is a race condition if this "task flipping" happens or not)
- on revoke, A will suspend task t1 and t2 (not releasing any locks)
- on assign
- A tries to create t3 but as B did not release it yet, A dies with an "cannot get lock" exception
- B tries to create t1 but as A did not release it yet, B dies with an "cannot get lock" exception
- as A and B trie to create the task first, this will always fail if task flipping happened
- C tries to create t2 but A did not release t2 lock yet (race condition) and C dies with an exception (this could even happen without "task flipping" between A and B)
We want to fix this, by:
- first release unassigned suspended tasks in onPartitionsAssignment(), and afterward create new tasks (this fixes the "task flipping" issue)
- use a "backoff and retry mechanism" if a task cannot be created (to handle release-create race condition between different threads)
Attachments
Issue Links
- is duplicated by
-
KAFKA-4582 KStream job fails in multi-thread mode
- Resolved
- links to