In versions(check 2.0) where standby tasks are suspended on each rebalance the checkpoint file is updated post the flush and the expected behaviour is that post assignment the same standby task gets assigned back on the machine it will start reading data from changelog from the same offset from it left off.
But there looks like a bug in the code, every time post rebalance it starts reading from the offset from where it read the first time the task was assigned on this machine. This has 2 repercussions:
- After every rebalance the standby tasks start restoring huge amount of data which they have already restored earlier(Verified this via 300x increase Network IO on all streams instances post rebalance even when no change in assignment) .
- If changelog has time retention those offsets will not be available in the changelog, which leads to offsetOutOfRange exceptions and the stores get deleted and recreated again.
I have gone through the code and I think I know the issue.
In TaskManager# updateNewAndRestoringTasks(), the function assignStandbyPartitions() gets called for all the running standby tasks where it populates the Map: checkpointedOffsets from the standbyTask.checkpointedOffsets() which is only updated at the time of initialization of a StandbyTask(i.e. in it's constructor).
This has an easy fix.
Post resumption we are reading standbyTask.checkpointedOffsets() to know the offset from where the standby task should start running and not from stateMgr.checkpointed() which gets updated on every commit to the checkpoint file. In the former case it's always reading from the same offset, even those which it had already read earlier and in cases where changelog topic has a retention time, it gives offsetOutOfRange exception. So, standbyTask.checkpointedOffsets() is quite useless and we should use stateMgr.checkpointed() instead to return offsets to task manager.