From b9559e63b22d869a4d385e0b1a8557f63fdcef55 Mon Sep 17 00:00:00 2001 From: Tao Qin Date: Tue, 14 Jan 2014 10:30:05 +0800 Subject: [PATCH] fix kakfa-1194: The kafka broker cannot delete the old log files after the configured time --- core/src/main/scala/kafka/log/LogSegment.scala | 3 +++ core/src/main/scala/kafka/log/OffsetIndex.scala | 13 +++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0d6926e..e9d21d4 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -244,6 +244,9 @@ class LogSegment(val log: FileMessageSet, * Change the suffix for the index and log file for this log segment */ def changeFileSuffixes(oldSuffix: String, newSuffix: String) { + /* close the log file and index file before rename */ + close() + val logRenamed = log.renameTo(new File(Utils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) if(!logRenamed) throw new KafkaStorageException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 8a62dfa..76f5d06 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -297,8 +297,14 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ private def forceUnmap(m: MappedByteBuffer) { try { - if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) - (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) { + var cl = (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner() + + /* if cl != null, then we call it */ + if (cl != null) { + cl.clean() + } + } } catch { case t: Throwable => warn("Error when freeing index buffer", t) } @@ -332,6 +338,9 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /** Close the index */ def close() { trimToValidSize() + + /* forcefully free the MappedByteBuffer */ + forceUnmap(this.mmap) } /** -- 1.8.4.msysgit.0