diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index cc14c67..9dfe7de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -30,6 +30,10 @@ import org.slf4j.Logger; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -43,6 +47,7 @@ public class ProcessorStateManager implements StateManager { private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; static final String CHECKPOINT_FILE_NAME = ".checkpoint"; + private final LogContext logContext; private final Logger log; private final File baseDir; private final TaskId taskId; @@ -78,6 +83,7 @@ public class ProcessorStateManager implements StateManager { this.taskId = taskId; this.changelogReader = changelogReader; logPrefix = String.format("task [%s] ", taskId); + this.logContext = logContext; this.log = logContext.logger(getClass()); partitionForTopic = new HashMap<>(); @@ -95,7 +101,7 @@ public class ProcessorStateManager implements StateManager { baseDir = stateDirectory.directoryForTask(taskId); // load the checkpoint information - checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); + checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME), logContext); checkpointedOffsets = new HashMap<>(checkpoint.read()); if (eosEnabled) { @@ -313,13 +319,14 @@ public class ProcessorStateManager implements StateManager { } } // write the checkpoint file before closing, to indicate clean shutdown + File chkpointFile = new File(baseDir, CHECKPOINT_FILE_NAME); try { if (checkpoint == null) { - checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); + checkpoint = new OffsetCheckpoint(chkpointFile, logContext); } checkpoint.write(checkpointedOffsets); } catch (final IOException e) { - log.warn("Failed to write checkpoint file to {}:", new File(baseDir, CHECKPOINT_FILE_NAME), e); + log.warn("Failed to write checkpoint file to {}:", chkpointFile, e); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java index 8c14737..fee1886 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java @@ -17,7 +17,9 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -31,6 +33,9 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -54,6 +59,7 @@ public class OffsetCheckpoint { private static final int VERSION = 0; + private Logger log = null; private final File file; private final Object lock; @@ -62,6 +68,12 @@ public class OffsetCheckpoint { lock = new Object(); } + public OffsetCheckpoint(final File file, LogContext logContext) { + this.log = logContext.logger(getClass()); + this.file = file; + lock = new Object(); + } + /** * @throws IOException if any file operation fails with an IO exception */ @@ -84,7 +96,19 @@ public class OffsetCheckpoint { fileOutputStream.getFD().sync(); } - Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); + try { + Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); + } catch (IOException ioe) { + if (Files.exists(temp.toPath(), LinkOption.NOFOLLOW_LINKS)) { + Path secondary = new File(file.toPath() + ".2").toPath(); + try { + Files.move(temp.toPath(), secondary, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException ignored) { + if (log != null) + log.debug("Failed to rename {} as {}", temp.toPath(), secondary); + } + } + } } }