From 0249e92aac1dac24ccee7ef6a9b2f92ae7440229 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Wed, 28 Aug 2013 17:19:40 -0700 Subject: [PATCH] KAFKA-1008 Unmap offset indexes before resizing. --- .../kafka/consumer/ConsumerFetcherManager.scala | 11 +-- .../consumer/ZookeeperConsumerConnector.scala | 6 +- core/src/main/scala/kafka/log/OffsetIndex.scala | 108 +++++++++++++++------ .../scala/kafka/server/AbstractFetcherThread.scala | 11 +-- core/src/main/scala/kafka/utils/Os.scala | 23 +++++ core/src/main/scala/kafka/utils/Utils.scala | 13 +++ core/src/main/scala/kafka/utils/ZkUtils.scala | 12 +-- .../test/scala/unit/kafka/utils/UtilsTest.scala | 14 +++ 8 files changed, 138 insertions(+), 60 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/Os.scala diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b286312..8431fbd 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -24,6 +24,7 @@ import scala.collection.immutable import collection.mutable.HashMap import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock +import kafka.utils.Utils.inLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} import kafka.common.TopicAndPartition @@ -110,14 +111,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") leaderFinderThread.start() - lock.lock() - try { + inLock(lock) { partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap this.cluster = cluster noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) cond.signalAll() - } finally { - lock.unlock() } } @@ -145,14 +143,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) { debug("adding partitions with error %s".format(partitionList)) - lock.lock() - try { + inLock(lock) { if (partitionMap != null) { noLeaderPartitionSet ++= partitionList cond.signalAll() } - } finally { - lock.unlock() } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e7a692a..e5bb9e4 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID import kafka.serializer._ import kafka.utils.ZkUtils._ +import kafka.utils.Utils.inLock import kafka.common._ import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge @@ -342,12 +343,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - lock.lock() - try { + inLock(lock) { isWatcherTriggered = true cond.signalAll() - } finally { - lock.unlock() } } diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 9de3d31..b6dad06 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -21,8 +21,10 @@ import scala.math._ import java.io._ import java.nio._ import java.nio.channels._ +import java.util.concurrent.locks._ import java.util.concurrent.atomic._ import kafka.utils._ +import kafka.utils.Utils.inLock import kafka.common.InvalidOffsetException /** @@ -52,6 +54,8 @@ import kafka.common.InvalidOffsetException */ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { + private val lock = new ReentrantLock + /* the memory mapping */ private var mmap: MappedByteBuffer = { @@ -88,25 +92,30 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = /* the number of entries in the index */ private var size = new AtomicInteger(mmap.position / 8) + /** + * The maximum number of eight-byte entries this index can hold + */ + @volatile + var maxEntries = mmap.limit / 8 + /* the last offset in the index */ var lastOffset = readLastOffset() debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) - /* the maximum number of entries this index can hold */ - def maxEntries = mmap.limit / 8 - /** * The last offset written to the index */ private def readLastOffset(): Long = { - val offset = - size.get match { - case 0 => 0 - case s => relativeOffset(this.mmap, s-1) - } - baseOffset + offset + inLock(lock) { + val offset = + size.get match { + case 0 => 0 + case s => relativeOffset(this.mmap, s-1) + } + baseOffset + offset + } } /** @@ -116,12 +125,14 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * the pair (baseOffset, 0) is returned. */ def lookup(targetOffset: Long): OffsetPosition = { - val idx = mmap.duplicate - val slot = indexSlotFor(idx, targetOffset) - if(slot == -1) - OffsetPosition(baseOffset, 0) - else - OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + maybeLock(lock) { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, targetOffset) + if(slot == -1) + OffsetPosition(baseOffset, 0) + else + OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + } } /** @@ -167,17 +178,19 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * Get the nth offset mapping from the index */ def entry(n: Int): OffsetPosition = { - if(n >= entries) - throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) - val idx = mmap.duplicate - OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + maybeLock(lock) { + if(n >= entries) + throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) + val idx = mmap.duplicate + OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + } } /** * Append entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { - this synchronized { + inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) @@ -186,8 +199,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = this.size.incrementAndGet() this.lastOffset = offset require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") - } - else { + } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, lastOffset, file.getName)) } @@ -209,7 +221,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * Truncating to an offset larger than the largest in the index has no effect. */ def truncateTo(offset: Long) { - this synchronized { + inLock(lock) { val idx = mmap.duplicate val slot = indexSlotFor(idx, offset) @@ -233,9 +245,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * Truncates index to a known number of entries. */ private def truncateToEntries(entries: Int) { - this.size.set(entries) - mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset + inLock(lock) { + this.size.set(entries) + mmap.position(this.size.get * 8) + this.lastOffset = readLastOffset + } } /** @@ -243,7 +257,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * the file. */ def trimToValidSize() { - this synchronized { + inLock(lock) { resize(entries * 8) } } @@ -255,14 +269,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * we want to reset the index size to maximum index size to avoid rolling new segment. */ def resize(newSize: Int) { - this synchronized { - flush() + inLock(lock) { val raf = new RandomAccessFile(file, "rws") val roundedNewSize = roundToExactMultiple(newSize, 8) + val position = this.mmap.position + + /* Windows won't let us modify the file length while the file is mmapped :-( */ + if(Os.isWindows) + forceUnmap(this.mmap) try { raf.setLength(roundedNewSize) - val position = this.mmap.position this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + this.maxEntries = this.mmap.limit / 8 this.mmap.position(position) } finally { Utils.swallow(raf.close()) @@ -271,10 +289,22 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = } /** + * Forcefully free the buffer's mmap. We do this only on windows. + */ + def forceUnmap(m: MappedByteBuffer) { + try { + if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) + (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + } catch { + case t: Throwable => warn("Error when freeing index buffer", t) + } + } + + /** * Flush the data in the index to disk */ def flush() { - this synchronized { + inLock(lock) { mmap.force() } } @@ -300,4 +330,20 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * E.g. roundToExactMultiple(67, 8) == 64 */ private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor) + + /** + * Execute the given function in a lock only if we are running on windows. We do this + * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it + * and this requires synchronizing reads. + */ + private def maybeLock[T](lock: Lock)(fun: => T): T = { + if(Os.isWindows) + lock.lock() + try { + return fun + } finally { + if(Os.isWindows) + lock.unlock() + } + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index d5addb3..26ca541 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} +import kafka.utils.Utils.inLock /** @@ -70,8 +71,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { - partitionMapLock.lock() - try { + inLock(partitionMapLock) { if (partitionMap.isEmpty) partitionMapCond.await(200L, TimeUnit.MILLISECONDS) partitionMap.foreach { @@ -79,8 +79,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, offset, fetchSize) } - } finally { - partitionMapLock.unlock() } val fetchRequest = fetchRequestBuilder.build() @@ -107,8 +105,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (response != null) { // process fetched data - partitionMapLock.lock() - try { + inLock(partitionMapLock) { response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple @@ -160,8 +157,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } } - } finally { - partitionMapLock.unlock() } } diff --git a/core/src/main/scala/kafka/utils/Os.scala b/core/src/main/scala/kafka/utils/Os.scala new file mode 100644 index 0000000..cb9950e --- /dev/null +++ b/core/src/main/scala/kafka/utils/Os.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +object Os { + val name = System.getProperty("os.name").toLowerCase + val isWindows = name.startsWith("windows") +} diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index e83eb5f..3ef3158 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -21,6 +21,7 @@ import java.io._ import java.nio._ import charset.Charset import java.nio.channels._ +import java.util.concurrent.locks.Lock import java.lang.management._ import java.util.zip.CRC32 import javax.management._ @@ -554,4 +555,16 @@ object Utils extends Logging { * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ def abs(n: Int) = n & 0x7fffffff + + /** + * Execute the given function inside the lock + */ + def inLock[T](lock: Lock)(fun: => T): T = { + lock.lock() + try { + return fun + } finally { + lock.unlock() + } + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index ca1ce12..dca4d09 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -35,6 +35,7 @@ import kafka.controller.KafkaController import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition +import kafka.utils.Utils.inLock object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -774,8 +775,7 @@ class LeaderExistsOrChangedListener(topic: String, def handleDataChange(dataPath: String, data: Object) { val t = dataPath.split("/").takeRight(3).head val p = dataPath.split("/").takeRight(2).head.toInt - leaderLock.lock() - try { + inLock(leaderLock) { if(t == topic && p == partition){ if(oldLeaderOpt == None){ trace("In leader existence listener on partition [%s, %d], leader has been created".format(topic, partition)) @@ -790,18 +790,12 @@ class LeaderExistsOrChangedListener(topic: String, } } } - finally { - leaderLock.unlock() - } } @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { - leaderLock.lock() - try { + inLock(leaderLock) { leaderExistsOrChanged.signal() - }finally { - leaderLock.unlock() } } } diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index 6b21554..96b5d42 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -19,10 +19,12 @@ package kafka.utils import java.util.Arrays import java.nio.ByteBuffer +import java.util.concurrent.locks._ import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Assert._ import kafka.common.KafkaException +import kafka.utils.Utils.inLock import org.junit.Test @@ -74,4 +76,16 @@ class UtilsTest extends JUnitSuite { assertTrue(emptyStringList.equals(emptyListFromNullString)) assertTrue(emptyStringList.equals(emptyList)) } + + @Test + def testInLock() { + val lock = new ReentrantLock() + val result = inLock(lock) { + assertTrue("Should be in lock", lock.isHeldByCurrentThread) + 1 + 1 + } + assertEquals(2, result) + assertFalse("Should be unlocked", lock.isLocked) + + } } -- 1.7.12.4 (Apple Git-37)