Affects Version/s: None
Fix Version/s: 2.1.0
LogCleanerManger provides APIs abortAndPauseCleaning(TopicPartition) and resumeCleaning(Iterable[TopicPartition]). The abortAndPauseCleaning(...) will do nothing if the partition is already in paused state. And resumeCleaning(..) will always clear the state for the partition if the partition is in paused state. Also, resumeCleaning(...) will throw IllegalStateException if the partition does not have any state (e.g. its state is cleared).
This will cause problem in the following scenario:
1) Background thread invokes LogManager.cleanupLogs() which in turn does abortAndPauseCleaning(...) for a given partition. Now this partition is in paused state.
2) User requests deletion for this partition. Controller sends StopReplicaRequest with delete=true for this partition. RequestHanderThread calls abortAndPauseCleaning(...) followed by resumeCleaning(...) for the same partition. Now there is no state for this partition.
3) Background thread invokes resumeCleaning(...) as part of LogManager.cleanupLogs(). Because there is no state for this partition, it causes IllegalStateException.
This issue can also happen before
KAFKA-7322 if unclean leader election triggers log truncation for a partition at the same time that the partition is deleted upon user request. But unclean leader election is very rare. The fix made in https://issues.apache.org/jira/browse/KAFKA-7322 makes this issue much more frequent.
The solution is to record the number of pauses.