Details
Description
When calling kafkaStreams.cleanUp() before starting a stream the StateDirectory.cleanRemovedTasks() method contains this check:
... Line 240 if (lock(id, 0)) { long now = time.milliseconds(); long lastModifiedMs = taskDir.lastModified(); if (now > lastModifiedMs + cleanupDelayMs) { log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); Utils.delete(taskDir); } }
The check for lock(id,0) will create a .lock file in the directory that subsequently is going to be deleted. If the .lock file already exists from a previous run the attempt to delete the .lock file fails with AccessDeniedException.
This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will then attempt to remove the taskDir path calling Files.delete(path).
The call to files.delete(path) in postVisitDirectory will then fail java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory : stream-thread [restartedMain] Failed to lock the state directory due to an unexpected exception)
This seems to then cause issues using streams from a topic to an inMemory store.
Attachments
Issue Links
- is related to
-
KAFKA-6655 CleanupThread: Failed to lock the state directory due to an unexpected exception (Windows OS)
- Resolved
-
KAFKA-13666 Tests should not ignore exceptions for supported OS
- Resolved
- links to