isAutoCommit argument works exactly the other way around, apparently it is "false" from the scheduled auto commit and to "true" from zkConsConn.commitOffsets()?
So the migration of offsets from zk to kafka is to : set dual commit and kafka storage, restart consumers, wait for kafka to be copied on the offset commits, and take out dual commit.
So currently kafka is copied with the offsets only when data flows, and for the purpose of this task, we need to add one or 2 more cases when it is getting the offset: when shutting down, or perhaps periodically.
So this task applies only when storage==kafka and dualCommit ==true, right?
I would first ask why the write to zookeeper the new offsets, only if the write to kafka was ok? My assumption is To make sure only one write to zookeeper, even though the process of writing to kafka may involve retries.
I would write both directions at all time, and perhaps keep 2 checkpoint structures, one kafka one zookeeper.
I create a patch now with: a forceCommit that will make that all offsets are commited to both kafka and zookeeper when shutting down in dual commit mode.
The usefulness of committing all offsets not only to kafka but to zookeeper as well comes at least from one reason: the one I mentioned above, that if kafka offset write fails completely, zookeeper is never copied on that.
Forcing all offsets to zk on shutdown too does indeed have the drawback that it will typically copy the same offsets again, and not only once but potentially several times (if kafka is retried).
However the alternative is to commit to both kafka and zookeeper unconditionally in the normal flow (right now, the commit to zk happens only after a successful commit to kafka if any). That too poses the same risk of committing multiple times to a system (zk) if the other (kafka) needs retries. So a clean way here would be a completely different OffsetDAO implementation, one on kafka , one on zookeeper, and one on dual mode, and read, as now max(both), while write goes to the 2 implementations, each of them doing retries without affecting the other!