That's a good change and will handle the majority of failure cases. There is another failure case, I think still needs to be fixed in the rebalancing -
Say, for the above mentioned scenario, c1 fails to rebalance due to some error/exception that exercises this code path -
done = rebalance(cluster)
case e =>
/** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
* For example, a ZK node can disappear between the time we get all children and the time we try to get
* the value of a child. Just let this go since another rebalance will be triggered.
info("exception during rebalance ", e)
After this, c1 only closes its fetcher queues and backs off (0-0 and 0-1 are already released), while c2 owns 0-1.
Then during step 4 above, c1 releases things from its topic registry again which contains 0-0 and 0-1. So it releases 0-1, which it does not own anymore