Description
In KIP-441, the new HA assignor makes an informed decision about stateful task placement based on the offset sums reported by each instance. Offset sums are computed one of two ways: for assigned tasks (ie those in the TaskManager's "tasks" map), it will just sum up the tasks' changelog offsets directly. For tasks that are not assigned but whose directory remains on disk, it reads the changelog offsets from the checkpoint file. This is encoded with the subscription userdata sent during the JoinGroup phase of a rebalance.
The problem here is that it's possible for the instance to rejoin the group after having been assigned a new task, but before that task is initialized. In this case it would not compute the offset sum from the checkpoint file but instead from the uninitialized task, causing it to skip reporting any offsets for that task whatsoever.
This results in a particularly nefarious interaction between HA and cooperative rebalancing. An instance may read from the checkpoint file of a caught-up (but unassigned) task and report this in its subscription, leading the assignor to compute a small lag and place this task on the instance. After placing all stateful tasks in this way, it will distribute the stateless tasks across the group to balance the overall workload. It does this without considering the previous owner of the stateless tasks, so odds are good that moving the stateful task to this instance will result in a different assortment of stateless tasks in this rebalance.
Any time owned tasks are moved around, the current owner will have to revoke them and trigger a followup cooperative rebalance. Within the Consumer client, this actually happens immediately: that is, within an invocation of poll() it will loop inside joinGroupIfNeeded() as long as a rejoin is needed. And at the end of the last rebalance, if any partitions are revoked then a rejoin will indeed be needed. So the Consumer will send out it's next JoinGroup – including the userdata with computed task offset sums – without first exiting from the current poll(). Streams never gets the chance to initialize its new tasks, and ends up excluding them from the offset sums it reports in the following rebalance.
And since it doesn't report any offsets for this task, the assignor now believes the instance does not have any caught up state for this task, and assigns the task elsewhere. This causes a shuffling of stateless tasks once more, which in turn results in another cooperative rebalance. This time the task is no longer assigned so the instance reports offsets based on the checkpoint file again, and we're back at the beginning.
Given the deterministic assignment, once a group is caught up in this cycle it will be impossible to escape it without manual intervention (ie deleting some or all of the task directories and forcing it to restore from scratch)