Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
When a partition is moved from one directory to another, their checkpoint entry in cleaner-offset-checkpoint file is not removed from the source directory.
As a consequence when we read the last firstDirtyOffset, we might get a stale value from the old checkpoint file.
Basically, we need clean up the entry from the check point file in the source directory when the move is completed
The current issue is that the code in LogCleanerManager:
/** * @return the position processed for all logs. */ def allCleanerCheckpoints: Map[TopicPartition, Long] = { inLock(lock) { checkpoints.values.flatMap(checkpoint => { try { checkpoint.read() } catch { case e: KafkaStorageException => error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) Map.empty[TopicPartition, Long] } }).toMap } }
collapses the offsets when multiple entries exist for the topicPartition