From fe021eda9a8c3a14fe1863aea49cba3018399ef3 Mon Sep 17 00:00:00 2001 From: lizziew Date: Mon, 12 Aug 2013 21:49:33 -0700 Subject: [PATCH 1/5] Unmap before resizing --- core/src/main/scala/kafka/log/OffsetIndex.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index afab848..9ebf622 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -260,6 +260,11 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } } + def tryUnmap(m: MappedByteBuffer) { + if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) + (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + } + /** * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at @@ -268,11 +273,13 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ def resize(newSize: Int) { this synchronized { + flush() val raf = new RandomAccessFile(file, "rws") val roundedNewSize = roundToExactMultiple(newSize, 8) + val position = this.mmap.position try { + tryUnmap(this.mmap) raf.setLength(roundedNewSize) - val position = this.mmap.position this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) this.mmap.position(position) } finally { -- 1.7.12.4 (Apple Git-37) From eedb6d629c3b8ed90256c1ecade2a0e3c02892bd Mon Sep 17 00:00:00 2001 From: lizziew Date: Tue, 13 Aug 2013 17:56:16 -0700 Subject: [PATCH 2/5] removed call to flush --- core/src/main/scala/kafka/log/OffsetIndex.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 9ebf622..96fcc22 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -273,7 +273,6 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi */ def resize(newSize: Int) { this synchronized { - flush() val raf = new RandomAccessFile(file, "rws") val roundedNewSize = roundToExactMultiple(newSize, 8) val position = this.mmap.position -- 1.7.12.4 (Apple Git-37) From 1704dce6f1cea2a8045f18570df4cb510f6488dc Mon Sep 17 00:00:00 2001 From: lizziew Date: Wed, 14 Aug 2013 10:26:12 -0700 Subject: [PATCH 3/5] added try/catch --- core/src/main/scala/kafka/log/OffsetIndex.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 96fcc22..e02ad0c 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -261,8 +261,13 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } def tryUnmap(m: MappedByteBuffer) { + try { if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + } catch { + case ioe: IOException => info("unmap exception" + ioe) + case cnfe: ClassNotFoundException => info("non Sun JDK compatible, cannot map") + } } /** -- 1.7.12.4 (Apple Git-37) From 611e18211a01e39b96b0a9faff9754a76990091f Mon Sep 17 00:00:00 2001 From: lizziew Date: Wed, 14 Aug 2013 10:40:09 -0700 Subject: [PATCH 4/5] added try/catch --- core/src/main/scala/kafka/log/OffsetIndex.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index e02ad0c..672383c 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -266,7 +266,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() } catch { case ioe: IOException => info("unmap exception" + ioe) - case cnfe: ClassNotFoundException => info("non Sun JDK compatible, cannot map") + case cnfe: ClassNotFoundException => info("non Sun JDK compatible, cannot unmap") } } @@ -337,4 +337,4 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * E.g. roundToExactMultiple(67, 8) == 64 */ private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor) -} \ No newline at end of file +} -- 1.7.12.4 (Apple Git-37) From 2d67fa764f476e2689199fa41aebcfbffac03761 Mon Sep 17 00:00:00 2001 From: lizziew Date: Fri, 16 Aug 2013 22:32:37 -0700 Subject: [PATCH 5/5] add lock for reads --- core/src/main/scala/kafka/log/OffsetIndex.scala | 67 +++++++++++++++++-------- core/src/main/scala/kafka/utils/Os.scala | 23 +++++++++ 2 files changed, 69 insertions(+), 21 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/Os.scala diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 672383c..9cc5f95 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -24,6 +24,7 @@ import java.nio.channels._ import java.util.concurrent.atomic._ import kafka.utils._ import kafka.common.InvalidOffsetException +import java.util.concurrent.locks._ /** * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: @@ -91,6 +92,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /* the last offset in the index */ var lastOffset = readLastOffset() + private var lock = new ReentrantLock() + debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) @@ -122,12 +125,19 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * the pair (baseOffset, 0) is returned. */ def lookup(targetOffset: Long): OffsetPosition = { - val idx = mmap.duplicate - val slot = indexSlotFor(idx, targetOffset) - if(slot == -1) - OffsetPosition(baseOffset, 0) - else - OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + if (Os.isWindows) + lock.lock + try { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, targetOffset) + if(slot == -1) + OffsetPosition(baseOffset, 0) + else + OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + } finally { + if (Os.isWindows) + lock.unlock + } } /** @@ -179,17 +189,24 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * @return The offset/position pair at that entry */ def entry(n: Int): OffsetPosition = { - if(n >= entries) - throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) - val idx = mmap.duplicate - OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + if (Os.isWindows) + lock.lock + try { + if(n >= entries) + throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) + val idx = mmap.duplicate + OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + } finally { + if(Os.isWindows) + lock.unlock + } } /** * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { - this synchronized { + lock synchronized { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) @@ -221,7 +238,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Truncating to an offset larger than the largest in the index has no effect. */ def truncateTo(offset: Long) { - this synchronized { + lock synchronized { val idx = mmap.duplicate val slot = indexSlotFor(idx, offset) @@ -245,9 +262,16 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Truncates index to a known number of entries. */ private def truncateToEntries(entries: Int) { - this.size.set(entries) - mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset + if (Os.isWindows) + lock.lock + try { + this.size.set(entries) + mmap.position(this.size.get * 8) + this.lastOffset = readLastOffset + } finally { + if (Os.isWindows) + lock.unlock + } } /** @@ -255,18 +279,18 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * the file. */ def trimToValidSize() { - this synchronized { + lock synchronized { resize(entries * 8) } } def tryUnmap(m: MappedByteBuffer) { try { - if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) + if (m.isInstanceOf[sun.nio.ch.DirectBuffer]) (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() } catch { case ioe: IOException => info("unmap exception" + ioe) - case cnfe: ClassNotFoundException => info("non Sun JDK compatible, cannot unmap") + case cnfe: ClassNotFoundException => info("non Sun JDK compatible, cannot explicitly unmap") } } @@ -277,12 +301,13 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * we want to reset the index size to maximum index size to avoid rolling new segment. */ def resize(newSize: Int) { - this synchronized { + lock synchronized { val raf = new RandomAccessFile(file, "rws") val roundedNewSize = roundToExactMultiple(newSize, 8) val position = this.mmap.position try { - tryUnmap(this.mmap) + if (Os.isWindows) + tryUnmap(this.mmap) raf.setLength(roundedNewSize) this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) this.mmap.position(position) @@ -296,7 +321,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Flush the data in the index to disk */ def flush() { - this synchronized { + lock synchronized { mmap.force() } } diff --git a/core/src/main/scala/kafka/utils/Os.scala b/core/src/main/scala/kafka/utils/Os.scala new file mode 100644 index 0000000..093f89b --- /dev/null +++ b/core/src/main/scala/kafka/utils/Os.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +object Os { + private val osName = System.getProperty("os.name").toLowerCase + val isWindows = osName.startsWith("windows") +} -- 1.7.12.4 (Apple Git-37)