From 82b8c05b43bfe359d42454121b17e56538b60163 Mon Sep 17 00:00:00 2001 From: "REDMOND\\bhkonde" Date: Wed, 27 Mar 2019 21:36:39 -0700 Subject: [PATCH] Fix to close the handlers before renaming files and directories and open the handlers if required --- .../kafka/common/record/FileRecords.java | 11 +++++++++- .../main/scala/kafka/log/AbstractIndex.scala | 17 ++++++++++++++-- core/src/main/scala/kafka/log/Log.scala | 20 +++++++++++++++++++ .../src/main/scala/kafka/log/LogManager.scala | 9 ++++++++- .../src/main/scala/kafka/log/LogSegment.scala | 10 ++++++++++ .../scala/kafka/log/TransactionIndex.scala | 8 ++++++++ .../server/checkpoints/CheckpointFile.scala | 1 + .../state/internals/OffsetCheckpoint.java | 2 ++ 8 files changed, 74 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 3537fc34b..881a606d4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -47,7 +47,7 @@ public class FileRecords extends AbstractRecords implements Closeable { // mutable state private final AtomicInteger size; - private final FileChannel channel; + private volatile FileChannel channel; private volatile File file; /** @@ -187,6 +187,15 @@ public class FileRecords extends AbstractRecords implements Closeable { channel.close(); } + /** + * Re-opens the channel if it not already open. + */ + public void reopenHandler() throws IOException { + if(!channel.isOpen()) { + channel = openChannel(this.file, true, true, 0, false); + } + } + /** * Delete this message set from the filesystem * @throws IOException if deletion fails due to an I/O error diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index af28f8375..975f13639 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -108,7 +108,9 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon protected val lock = new ReentrantLock @volatile - protected var mmap: MappedByteBuffer = { + protected var mmap: MappedByteBuffer = initMemoryMap() + + protected def initMemoryMap(): MappedByteBuffer = { val newlyCreated = file.createNewFile() val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") try { @@ -139,6 +141,16 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon } } + /** + * Re-opens the memory map file + */ + def reopenHandler(): Unit = { + inLock(lock) { + if(mmap == null) + mmap = initMemoryMap() + } + } + /** * The maximum number of entries this index can hold */ @@ -254,6 +266,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon /** Close the index */ def close() { trimToValidSize() + closeHandler() } def closeHandler(): Unit = { @@ -309,7 +322,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon } protected def safeForceUnmap(): Unit = { - try forceUnmap() + try if(mmap != null) forceUnmap() catch { case t: Throwable => error(s"Error unmapping index $file", t) } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b56b26f52..461804c9e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -780,6 +780,17 @@ class Log(@volatile var dir: File, } } + /** + * Re-Open handlers + */ + def reopenHandlers() { + debug("Re-Opening handlers") + lock synchronized { + logSegments.foreach(_.reopenHandlers()) + isMemoryMappedBufferClosed = false + } + } + /** * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs * @@ -1908,6 +1919,8 @@ class Log(@volatile var dir: File, * @throws IOException if the file can't be renamed and still exists */ private def asyncDeleteSegment(segment: LogSegment) { + // Since we are deleting the segment, lets close its handlers + segment.closeHandlers() segment.changeFileSuffixes("", Log.DeletedFileSuffix) def deleteSeg() { info(s"Deleting segment ${segment.baseOffset}") @@ -1959,6 +1972,10 @@ class Log(@volatile var dir: File, val sortedOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset) checkIfMemoryMappedBufferClosed() + + // Close the handlers of the new segments as we are renaming the files + sortedNewSegments.foreach(_.closeHandlers()) + // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() if (!isRecoveredSwapFile) @@ -1975,6 +1992,9 @@ class Log(@volatile var dir: File, } // okay we are safe now, remove the swap suffix sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) + + // Re-Open the handlers since we closed them for renaming + sortedNewSegments.foreach(_.reopenHandlers()) } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 127ecdc77..efba20ccf 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -786,7 +786,11 @@ class LogManager(logDirs: Seq[File], if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") + // Since we are renaming the folder, close the handlers and re-open it after renaming + destLog.closeHandlers() destLog.renameDir(Log.logDirName(topicPartition)) + destLog.reopenHandlers() + // Now that future replica has been successfully renamed to be the current replica // Update the cached map and log cleaner as appropriate. futureLogs.remove(topicPartition) @@ -798,10 +802,11 @@ class LogManager(logDirs: Seq[File], } try { + //Closing the log as we are deleting it + sourceLog.close() sourceLog.renameDir(Log.logDeleteDirName(topicPartition)) // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. - sourceLog.close() checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile) checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile) addLogToBeDeleted(sourceLog) @@ -839,6 +844,8 @@ class LogManager(logDirs: Seq[File], cleaner.abortCleaning(topicPartition) cleaner.updateCheckpoints(removedLog.dir.getParentFile) } + // Closing the log as we are deleting it + removedLog.close() removedLog.renameDir(Log.logDeleteDirName(topicPartition)) checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile) checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 5ce9e191a..5e0f69af8 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -578,6 +578,16 @@ class LogSegment private[log] (val log: FileRecords, CoreUtils.swallow(txnIndex.close(), this) } + /** + * Re-Open the file handlers + */ + def reopenHandlers() { + offsetIndex.reopenHandler() + timeIndex.reopenHandler() + log.reopenHandler() + txnIndex.reopenHandler() + } + /** * Delete this log segment from the filesystem. */ diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala index e730fdba9..4a3c0836a 100644 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -102,6 +102,14 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends maybeChannel = None } + /** + * Re-opens the channel if it not already open. + */ + def reopenHandler(): Unit = { + if(maybeChannel == None) + openChannel() + } + def renameTo(f: File): Unit = { try { if (file.exists) diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala index 1878ae263..6569d2f9f 100644 --- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala @@ -67,6 +67,7 @@ class CheckpointFile[T](val file: File, fileOutputStream.getFD().sync() } finally { writer.close() + Utils.closeQuietly(fileOutputStream, tempPath.toString) } Utils.atomicMoveWithFallback(tempPath, path) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java index 405831a33..72b49ba6b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java @@ -88,6 +88,8 @@ public class OffsetCheckpoint { writer.flush(); fileOutputStream.getFD().sync(); + } finally { + Utils.closeQuietly(fileOutputStream, temp.getName()); } Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); -- 2.19.0.windows.1