In the current code base, we restore state stores in the main thread (in contrast to older code that did restore state stored in the rebalance call back). This has multiple advantages and allows us the further simplify restore code.
In the original code base, during a long restore phase, it was possible that a instance misses a rebalance and drops out of the consumer group. To detect this case, we apply a check during the restore phase, that the end-offset of the changelog topic does not change. A changed offset implies a missed rebalance as another thread started to write into the changelog topic (ie, the current thread does not own the task/store/changelog-topic anymore).
With the new code, that restores in the main-loop, it's ensured that `poll()` is called regularly and thus, a rebalance will be detected automatically. This make the check about an changing changelog-end-offset unnecessary.
We can simplify the restore logic, to just consuming until `poll()` does not return any data. For this case, we fetch the end-offset to see if we did fully restore. If yes, we resume processing, if not, we continue the restore.