Description
When using EOS the .checkpoint file created when a stateful streams app is shutdown does not always contain changelog offsets which represent the latest state of the state store. The offsets can often be behind the end of the changelog - sometimes quite significantly.
This leads to a state restore being required when the streams app restarts after shutting down cleanly as streams thinks (based on the incorrect offsets in the checkpoint) that the state store is not up to date with the changelog.
This is increasing the time we see it takes to do a clean restart of a single instance streams app from around 10 second to sometime over 2 minutes in our case.
I suspect the bug appears because an assumption about the commitNeeded field in the following method in StreamTask:
protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { // commitNeeded indicates we may have processed some records since last commit // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not if (commitNeeded) { stateMgr.updateChangelogOffsets(checkpointableOffsets()); } super.maybeWriteCheckpoint(enforceCheckpoint); }
In a steady state case for a simple single instance single thread stream app where an app simply starts, runs and then shuts down the if (commitNeeded) test always fails when running with EOS which results in the latest checkpoint offsets never getting updated into the stateMgr.
Tracing back to the callers of maybeWriteCheckpoint it's easy to see this is the case as there's only 1 place in the code which calls maybeWriteCheckpoint during this steady state. The postCommit(final boolean enforceCheckpoint) method, specifically the call in the RUNNING state.
case RUNNING: if (enforceCheckpoint || !eosEnabled) { maybeWriteCheckpoint(enforceCheckpoint); } log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", state(), eosEnabled, enforceCheckpoint); break;
We can see from this code that maybeWriteCheckpoint will only ever to called if enforceCheckpoint=true because we know eosEnabled=true as we're running with EOS.
So then where does postCommit get called with enforceCheckpoint=true? Again looking only at the steady state case we find that it's only called from TaskManager.tryCloseCleanAllActiveTasks which is only called from TaskManager.shutdown.
The thing about the call in tryCloseCleanAllActiveTasks is that it happens after all active tasks have commited. Which means that StreamTask.commitNeeded=false for all tasks so it follows that the test back in maybeWriteCheckpoint always fails and we don't end up getting the latest offsets stored into the state manager.
I think the fix is to simply change the test in maybeWriteCheckpoint to be if (commitNeeded || enforceCheckpoint) { ... as we know we must always update the changelog offserts before we write the checkpoint.
Attachments
Issue Links
- relates to
-
KAFKA-12634 Should checkpoint after restore finished
- Resolved
- links to