diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java index 8c14737..3dcc73b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java @@ -31,6 +31,9 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -84,7 +87,18 @@ public class OffsetCheckpoint { fileOutputStream.getFD().sync(); } - Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); + try { + Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); + } catch (IOException ioe) { + if (Files.exists(temp.toPath(), LinkOption.NOFOLLOW_LINKS)) { + Path secondary = new File(file.toPath() + ".2").toPath(); + try { + Files.move(temp.toPath(), secondary, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException ignored) { + //log.debug("Failed to rename {} as {}", temp.toPath(), secondary); + } + } + } } }