From 27e6e068316b53d89c54eea80943c3b67f921065 Mon Sep 17 00:00:00 2001 From: lizziew Date: Fri, 23 Aug 2013 00:52:19 -0700 Subject: [PATCH 1/2] incorporates Jay's patch --- .../kafka/consumer/ConsumerFetcherManager.scala | 11 +- .../consumer/ZookeeperConsumerConnector.scala | 6 +- core/src/main/scala/kafka/log/OffsetIndex.scala | 114 ++++++++++++++------- .../scala/kafka/server/AbstractFetcherThread.scala | 11 +- core/src/main/scala/kafka/utils/Utils.scala | 13 +++ core/src/main/scala/kafka/utils/ZkUtils.scala | 12 +-- .../test/scala/unit/kafka/utils/UtilsTest.scala | 14 +++ 7 files changed, 117 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 71ae640..8c83e65 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -24,6 +24,7 @@ import scala.collection.immutable import collection.mutable.HashMap import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock +import kafka.utils.Utils.inLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} import kafka.common.TopicAndPartition @@ -110,14 +111,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") leaderFinderThread.start() - lock.lock() - try { + inLock(lock) { partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap this.cluster = cluster noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) cond.signalAll() - } finally { - lock.unlock() } } @@ -145,14 +143,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) { debug("adding partitions with error %s".format(partitionList)) - lock.lock() - try { + inLock(lock) { if (partitionMap != null) { noLeaderPartitionSet ++= partitionList cond.signalAll() } - } finally { - lock.unlock() } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e3a6420..e7957ea 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID import kafka.serializer._ import kafka.utils.ZkUtils._ +import kafka.utils.Utils.inLock import kafka.common._ import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge @@ -338,12 +339,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - lock.lock() - try { + inLock(lock) { isWatcherTriggered = true cond.signalAll() - } finally { - lock.unlock() } } diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index afab848..ea46dc6 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -21,8 +21,10 @@ import scala.math._ import java.io._ import java.nio._ import java.nio.channels._ +import java.util.concurrent.locks._ import java.util.concurrent.atomic._ import kafka.utils._ +import kafka.utils.Utils.inLock import kafka.common.InvalidOffsetException /** @@ -52,6 +54,8 @@ import kafka.common.InvalidOffsetException */ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { + private val lock = new ReentrantLock + /* initialize the memory mapping for this index */ private var mmap: MappedByteBuffer = { @@ -88,6 +92,12 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /* the number of eight-byte entries currently in the index */ private var size = new AtomicInteger(mmap.position / 8) + /** + * The maximum number of eight-byte entries this index can hold + */ + @volatile + var maxEntries = mmap.limit / 8 + /* the last offset in the index */ var lastOffset = readLastOffset() @@ -98,18 +108,15 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * The last offset written to the index */ private def readLastOffset(): Long = { - val offset = - size.get match { - case 0 => 0 - case s => relativeOffset(this.mmap, s-1) - } - baseOffset + offset + inLock(lock) { + val offset = + size.get match { + case 0 => 0 + case s => relativeOffset(this.mmap, s-1) + } + baseOffset + offset + } } - - /** - * The maximum number of eight-byte entries this index can hold - */ - def maxEntries = mmap.limit / 8 /** * Find the largest offset less than or equal to the given targetOffset @@ -122,12 +129,14 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * the pair (baseOffset, 0) is returned. */ def lookup(targetOffset: Long): OffsetPosition = { - val idx = mmap.duplicate - val slot = indexSlotFor(idx, targetOffset) - if(slot == -1) - OffsetPosition(baseOffset, 0) - else - OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + maybeLock(lock) { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, targetOffset) + if(slot == -1) + OffsetPosition(baseOffset, 0) + else + OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + } } /** @@ -179,17 +188,19 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * @return The offset/position pair at that entry */ def entry(n: Int): OffsetPosition = { - if(n >= entries) - throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) - val idx = mmap.duplicate - OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + maybeLock(lock) { + if(n >= entries) + throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) + val idx = mmap.duplicate + OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + } } /** * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { - this synchronized { + inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) @@ -198,8 +209,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi this.size.incrementAndGet() this.lastOffset = offset require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") - } - else { + } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, lastOffset, file.getName)) } @@ -221,7 +231,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Truncating to an offset larger than the largest in the index has no effect. */ def truncateTo(offset: Long) { - this synchronized { + inLock(lock) { val idx = mmap.duplicate val slot = indexSlotFor(idx, offset) @@ -245,9 +255,11 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Truncates index to a known number of entries. */ private def truncateToEntries(entries: Int) { - this.size.set(entries) - mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset + inLock(lock) { + this.size.set(entries) + mmap.position(this.size.get * 8) + this.lastOffset = readLastOffset + } } /** @@ -255,7 +267,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * the file. */ def trimToValidSize() { - this synchronized { + inLock(lock) { resize(entries * 8) } } @@ -267,13 +279,17 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * we want to reset the index size to maximum index size to avoid rolling new segment. */ def resize(newSize: Int) { - this synchronized { - val raf = new RandomAccessFile(file, "rws") - val roundedNewSize = roundToExactMultiple(newSize, 8) + inLock(lock) { + val raf = new RandomAccessFile(file, "rws") + val roundedNewSize = roundToExactMultiple(newSize, 8) + val position = this.mmap.position + /* Windows won't let us modify the file length while the file is mmapped :- */ + if(Os.isWindows) + forceUnmap(this.mmap) try { raf.setLength(roundedNewSize) - val position = this.mmap.position this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + this.maxEntries = this.mmap.limit / 8 this.mmap.position(position) } finally { Utils.swallow(raf.close()) @@ -282,10 +298,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } /** + * Forcefully free the buffer's mmap. We do this only on windows. + */ + def forceUnmap(m: MappedByteBuffer) { + try { + if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) + (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + } catch { + case t: Throwable => warn("Error when freeing index buffer", t) + } + } + + /** * Flush the data in the index to disk */ def flush() { - this synchronized { + inLock(lock) { mmap.force() } } @@ -326,4 +354,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * E.g. roundToExactMultiple(67, 8) == 64 */ private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor) -} \ No newline at end of file + + /** + * Execute the given function in a lock only if we are running on windows. We do this + * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it + * and this requires synchronizing reads. + */ + private def maybeLock[T](lock: Lock)(fun: => T): T = { + if(Os.isWindows) + lock.lock() + try { + return fun + } finally { + if(Os.isWindows) + lock.unlock() + } + } +} diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 7663fac..3e368e0 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} +import kafka.utils.Utils.inLock /** @@ -70,8 +71,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { - partitionMapLock.lock() - try { + inLock(partitionMapLock) { if (partitionMap.isEmpty) partitionMapCond.await(200L, TimeUnit.MILLISECONDS) partitionMap.foreach { @@ -79,8 +79,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, offset, fetchSize) } - } finally { - partitionMapLock.unlock() } val fetchRequest = fetchRequestBuilder.build() @@ -107,8 +105,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (response != null) { // process fetched data - partitionMapLock.lock() - try { + inLock(partitionMapLock) { response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple @@ -160,8 +157,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } } - } finally { - partitionMapLock.unlock() } } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 4eaeae8..4c8088c 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -21,6 +21,7 @@ import java.io._ import java.nio._ import charset.Charset import java.nio.channels._ +import java.util.concurrent.locks.Lock import java.lang.management._ import javax.management._ import scala.collection._ @@ -587,4 +588,16 @@ object Utils extends Logging { (bytes(offset + 3) & 0xFF) } + /** + * Execute the given function inside the lock + */ + def inLock[T](lock: Lock)(fun: => T): T = { + lock.lock() + try { + return fun + } finally { + lock.unlock() + } + } + } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 84744eb..3afdb2f 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -34,6 +34,7 @@ import kafka.controller.PartitionAndReplica import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition +import kafka.utils.Utils.inLock object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -712,8 +713,7 @@ class LeaderExistsOrChangedListener(topic: String, def handleDataChange(dataPath: String, data: Object) { val t = dataPath.split("/").takeRight(3).head val p = dataPath.split("/").takeRight(2).head.toInt - leaderLock.lock() - try { + inLock(leaderLock) { if(t == topic && p == partition){ if(oldLeaderOpt == None){ trace("In leader existence listener on partition [%s, %d], leader has been created".format(topic, partition)) @@ -728,18 +728,12 @@ class LeaderExistsOrChangedListener(topic: String, } } } - finally { - leaderLock.unlock() - } } @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { - leaderLock.lock() - try { + inLock(leaderLock) { leaderExistsOrChanged.signal() - }finally { - leaderLock.unlock() } } } diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index ce1679a..920f318 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -18,12 +18,14 @@ package kafka.utils import java.util.Arrays +import java.util.concurrent.locks.ReentrantLock import java.nio.ByteBuffer import java.io._ import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Assert._ import kafka.common.KafkaException +import kafka.utils.Utils.inLock import org.junit.Test @@ -94,4 +96,16 @@ class UtilsTest extends JUnitSuite { assertTrue(emptyStringList.equals(emptyListFromNullString)) assertTrue(emptyStringList.equals(emptyList)) } + + @Test + def testInLock() { + val lock = new ReentrantLock() + val result = inLock(lock) { + assertTrue("Should be in lock", lock.isHeldByCurrentThread) + 1 + 1 + } + assertEquals(2, result) + assertFalse("Should be unlocked", lock.isLocked) + + } } -- 1.7.12.4 (Apple Git-37) From e5e6b196a269e7237db652f68bd8256680520f09 Mon Sep 17 00:00:00 2001 From: lizziew Date: Fri, 23 Aug 2013 00:55:40 -0700 Subject: [PATCH 2/2] add missing file --- .../main/scala/kafka/log/OffsetIndex.scala.orig | 329 +++++++++++++++++++++ .../src/main/scala/kafka/log/OffsetIndex.scala.rej | 35 +++ core/src/main/scala/kafka/utils/Os.scala | 23 ++ 3 files changed, 387 insertions(+) create mode 100644 core/src/main/scala/kafka/log/OffsetIndex.scala.orig create mode 100644 core/src/main/scala/kafka/log/OffsetIndex.scala.rej create mode 100644 core/src/main/scala/kafka/utils/Os.scala diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala.orig b/core/src/main/scala/kafka/log/OffsetIndex.scala.orig new file mode 100644 index 0000000..afab848 --- /dev/null +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala.orig @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import scala.math._ +import java.io._ +import java.nio._ +import java.nio.channels._ +import java.util.concurrent.atomic._ +import kafka.utils._ +import kafka.common.InvalidOffsetException + +/** + * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: + * that is it may not hold an entry for all messages in the log. + * + * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries. + * + * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant + * to locate the offset/location pair for the greatest offset less than or equal to the target offset. + * + * Index files can be opened in two ways: either as an empty, mutable index that allows appends or + * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an + * immutable one and truncate off any extra bytes. This is done when the index file is rolled over. + * + * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. + * + * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the + * message with that offset. The offset stored is relative to the base offset of the index file. So, for example, + * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use + * only 4 bytes for the offset. + * + * The frequency of entries is up to the user of this class. + * + * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal + * storage format. + */ +class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { + + /* initialize the memory mapping for this index */ + private var mmap: MappedByteBuffer = + { + val newlyCreated = file.createNewFile() + val raf = new RandomAccessFile(file, "rw") + try { + /* pre-allocate the file if necessary */ + if(newlyCreated) { + if(maxIndexSize < 8) + throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) + raf.setLength(roundToExactMultiple(maxIndexSize, 8)) + } + + val len = raf.length() + if(len < 0 || len % 8 != 0) + throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 8.") + + /* memory-map the file */ + val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) + + /* set the position in the index for the next entry */ + if(newlyCreated) + idx.position(0) + else + // if this is a pre-existing index, assume it is all valid and set position to last entry + idx.position(roundToExactMultiple(idx.limit, 8)) + idx + } finally { + Utils.swallow(raf.close()) + } + } + + /* the number of eight-byte entries currently in the index */ + private var size = new AtomicInteger(mmap.position / 8) + + /* the last offset in the index */ + var lastOffset = readLastOffset() + + debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" + .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) + + /** + * The last offset written to the index + */ + private def readLastOffset(): Long = { + val offset = + size.get match { + case 0 => 0 + case s => relativeOffset(this.mmap, s-1) + } + baseOffset + offset + } + + /** + * The maximum number of eight-byte entries this index can hold + */ + def maxEntries = mmap.limit / 8 + + /** + * Find the largest offset less than or equal to the given targetOffset + * and return a pair holding this offset and it's corresponding physical file position. + * + * @param targetOffset The offset to look up. + * + * @return The offset found and the corresponding file position for this offset. + * If the target offset is smaller than the least entry in the index (or the index is empty), + * the pair (baseOffset, 0) is returned. + */ + def lookup(targetOffset: Long): OffsetPosition = { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, targetOffset) + if(slot == -1) + OffsetPosition(baseOffset, 0) + else + OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + } + + /** + * Find the slot in which the largest offset less than or equal to the given + * target offset is stored. + * + * @param idx The index buffer + * @param targetOffset The offset to look for + * + * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty + */ + private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = { + // we only store the difference from the base offset so calculate that + val relOffset = targetOffset - baseOffset + + // check if the index is empty + if(entries == 0) + return -1 + + // check if the target offset is smaller than the least offset + if(relativeOffset(idx, 0) > relOffset) + return -1 + + // binary search for the entry + var lo = 0 + var hi = entries-1 + while(lo < hi) { + val mid = ceil(hi/2.0 + lo/2.0).toInt + val found = relativeOffset(idx, mid) + if(found == relOffset) + return mid + else if(found < relOffset) + lo = mid + else + hi = mid - 1 + } + lo + } + + /* return the nth offset relative to the base offset */ + private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) + + /* return the nth physical offset */ + private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) + + /** + * Get the nth offset mapping from the index + * @param n The entry number in the index + * @return The offset/position pair at that entry + */ + def entry(n: Int): OffsetPosition = { + if(n >= entries) + throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) + val idx = mmap.duplicate + OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + } + + /** + * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. + */ + def append(offset: Long, position: Int) { + this synchronized { + require(!isFull, "Attempt to append to a full index (size = " + size + ").") + if (size.get == 0 || offset > lastOffset) { + debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) + this.mmap.putInt((offset - baseOffset).toInt) + this.mmap.putInt(position) + this.size.incrementAndGet() + this.lastOffset = offset + require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") + } + else { + throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." + .format(offset, entries, lastOffset, file.getName)) + } + } + } + + /** + * True iff there are no more slots available in this index + */ + def isFull: Boolean = entries >= this.maxEntries + + /** + * Truncate the entire index, deleting all entries + */ + def truncate() = truncateToEntries(0) + + /** + * Remove all entries from the index which have an offset greater than or equal to the given offset. + * Truncating to an offset larger than the largest in the index has no effect. + */ + def truncateTo(offset: Long) { + this synchronized { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, offset) + + /* There are 3 cases for choosing the new size + * 1) if there is no entry in the index <= the offset, delete everything + * 2) if there is an entry for this exact offset, delete it and everything larger than it + * 3) if there is no entry for this offset, delete everything larger than the next smallest + */ + val newEntries = + if(slot < 0) + 0 + else if(relativeOffset(idx, slot) == offset - baseOffset) + slot + else + slot + 1 + truncateToEntries(newEntries) + } + } + + /** + * Truncates index to a known number of entries. + */ + private def truncateToEntries(entries: Int) { + this.size.set(entries) + mmap.position(this.size.get * 8) + this.lastOffset = readLastOffset + } + + /** + * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from + * the file. + */ + def trimToValidSize() { + this synchronized { + resize(entries * 8) + } + } + + /** + * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in + * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at + * loading segments from disk or truncating back to an old segment where a new log segment became active; + * we want to reset the index size to maximum index size to avoid rolling new segment. + */ + def resize(newSize: Int) { + this synchronized { + val raf = new RandomAccessFile(file, "rws") + val roundedNewSize = roundToExactMultiple(newSize, 8) + try { + raf.setLength(roundedNewSize) + val position = this.mmap.position + this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + this.mmap.position(position) + } finally { + Utils.swallow(raf.close()) + } + } + } + + /** + * Flush the data in the index to disk + */ + def flush() { + this synchronized { + mmap.force() + } + } + + /** + * Delete this index file + */ + def delete(): Boolean = { + info("Deleting index " + this.file.getAbsolutePath) + this.file.delete() + } + + /** The number of entries in this index */ + def entries() = size.get + + /** + * The number of bytes actually used by this index + */ + def sizeInBytes() = 8 * entries + + /** Close the index */ + def close() { + trimToValidSize() + } + + /** + * Rename the file that backs this offset index + * @return true iff the rename was successful + */ + def renameTo(f: File): Boolean = { + val success = this.file.renameTo(f) + this.file = f + success + } + + /** + * Round a number to the greatest exact multiple of the given factor less than the given number. + * E.g. roundToExactMultiple(67, 8) == 64 + */ + private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor) +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala.rej b/core/src/main/scala/kafka/log/OffsetIndex.scala.rej new file mode 100644 index 0000000..24933fc --- /dev/null +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala.rej @@ -0,0 +1,35 @@ +*************** +*** 267,280 **** + * we want to reset the index size to maximum index size to avoid rolling new segment. + */ + def resize(newSize: Int) { +- this synchronized { +- flush() + val raf = new RandomAccessFile(file, "rws") + val roundedNewSize = roundToExactMultiple(newSize, 8) + try { + raf.setLength(roundedNewSize) +- val position = this.mmap.position + this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + this.mmap.position(position) + } finally { + Utils.swallow(raf.close()) +--- 279,296 ---- + * we want to reset the index size to maximum index size to avoid rolling new segment. + */ + def resize(newSize: Int) { ++ inLock(lock) { + val raf = new RandomAccessFile(file, "rws") + val roundedNewSize = roundToExactMultiple(newSize, 8) ++ val position = this.mmap.position ++ ++ /* Windows won't let us modify the file length while the file is mmapped :-( */ ++ if(Os.isWindows) ++ forceUnmap(this.mmap) + try { + raf.setLength(roundedNewSize) + this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) ++ this.maxEntries = this.mmap.limit / 8 + this.mmap.position(position) + } finally { + Utils.swallow(raf.close()) diff --git a/core/src/main/scala/kafka/utils/Os.scala b/core/src/main/scala/kafka/utils/Os.scala new file mode 100644 index 0000000..4c56854 --- /dev/null +++ b/core/src/main/scala/kafka/utils/Os.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +object Os { + private val osName = System.getProperty("os.name").toLowerCase + val isWindows = osName.startsWith("windows") +} -- 1.7.12.4 (Apple Git-37)