From 1804a072e592da5f96763d2d4a6dc5bd6146ca9e Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Wed, 9 Oct 2013 13:50:09 -0700 Subject: [PATCH 1/2] KAFKA-1008 Lock around unmap on windows. --- .../kafka/consumer/ConsumerFetcherManager.scala | 11 +-- .../consumer/ZookeeperConsumerConnector.scala | 6 +- core/src/main/scala/kafka/log/OffsetIndex.scala | 109 +++++++++++++++------ .../scala/kafka/server/AbstractFetcherThread.scala | 11 +-- core/src/main/scala/kafka/utils/Os.scala | 23 +++++ core/src/main/scala/kafka/utils/Utils.scala | 13 +++ core/src/main/scala/kafka/utils/ZkUtils.scala | 12 +-- .../test/scala/unit/kafka/utils/UtilsTest.scala | 14 +++ 8 files changed, 138 insertions(+), 61 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/Os.scala diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 8c03308..30a9c97 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -24,6 +24,7 @@ import scala.collection.immutable import collection.mutable.HashMap import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock +import kafka.utils.Utils.inLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} import kafka.common.TopicAndPartition @@ -123,14 +124,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") leaderFinderThread.start() - lock.lock() - try { + inLock(lock) { partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap this.cluster = cluster noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) cond.signalAll() - } finally { - lock.unlock() } } @@ -158,14 +156,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) { debug("adding partitions with error %s".format(partitionList)) - lock.lock() - try { + inLock(lock) { if (partitionMap != null) { noLeaderPartitionSet ++= partitionList cond.signalAll() } - } finally { - lock.unlock() } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 77449f5..c0350cd 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID import kafka.serializer._ import kafka.utils.ZkUtils._ +import kafka.utils.Utils.inLock import kafka.common._ import com.yammer.metrics.core.Gauge import kafka.metrics._ @@ -366,12 +367,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - lock.lock() - try { + inLock(lock) { isWatcherTriggered = true cond.signalAll() - } finally { - lock.unlock() } } diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index afab848..aa654e8 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -21,8 +21,10 @@ import scala.math._ import java.io._ import java.nio._ import java.nio.channels._ +import java.util.concurrent.locks._ import java.util.concurrent.atomic._ import kafka.utils._ +import kafka.utils.Utils.inLock import kafka.common.InvalidOffsetException /** @@ -52,6 +54,8 @@ import kafka.common.InvalidOffsetException */ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { + private val lock = new ReentrantLock + /* initialize the memory mapping for this index */ private var mmap: MappedByteBuffer = { @@ -88,6 +92,12 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /* the number of eight-byte entries currently in the index */ private var size = new AtomicInteger(mmap.position / 8) + /** + * The maximum number of eight-byte entries this index can hold + */ + @volatile + var maxEntries = mmap.limit / 8 + /* the last offset in the index */ var lastOffset = readLastOffset() @@ -98,18 +108,15 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * The last offset written to the index */ private def readLastOffset(): Long = { - val offset = - size.get match { - case 0 => 0 - case s => relativeOffset(this.mmap, s-1) - } - baseOffset + offset + inLock(lock) { + val offset = + size.get match { + case 0 => 0 + case s => relativeOffset(this.mmap, s-1) + } + baseOffset + offset + } } - - /** - * The maximum number of eight-byte entries this index can hold - */ - def maxEntries = mmap.limit / 8 /** * Find the largest offset less than or equal to the given targetOffset @@ -122,12 +129,14 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * the pair (baseOffset, 0) is returned. */ def lookup(targetOffset: Long): OffsetPosition = { - val idx = mmap.duplicate - val slot = indexSlotFor(idx, targetOffset) - if(slot == -1) - OffsetPosition(baseOffset, 0) - else - OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + maybeLock(lock) { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, targetOffset) + if(slot == -1) + OffsetPosition(baseOffset, 0) + else + OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + } } /** @@ -179,17 +188,19 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * @return The offset/position pair at that entry */ def entry(n: Int): OffsetPosition = { - if(n >= entries) - throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) - val idx = mmap.duplicate - OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + maybeLock(lock) { + if(n >= entries) + throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) + val idx = mmap.duplicate + OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + } } /** * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { - this synchronized { + inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) @@ -198,8 +209,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi this.size.incrementAndGet() this.lastOffset = offset require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") - } - else { + } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, lastOffset, file.getName)) } @@ -221,7 +231,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Truncating to an offset larger than the largest in the index has no effect. */ def truncateTo(offset: Long) { - this synchronized { + inLock(lock) { val idx = mmap.duplicate val slot = indexSlotFor(idx, offset) @@ -245,9 +255,11 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Truncates index to a known number of entries. */ private def truncateToEntries(entries: Int) { - this.size.set(entries) - mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset + inLock(lock) { + this.size.set(entries) + mmap.position(this.size.get * 8) + this.lastOffset = readLastOffset + } } /** @@ -255,7 +267,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * the file. */ def trimToValidSize() { - this synchronized { + inLock(lock) { resize(entries * 8) } } @@ -267,13 +279,18 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * we want to reset the index size to maximum index size to avoid rolling new segment. */ def resize(newSize: Int) { - this synchronized { + inLock(lock) { val raf = new RandomAccessFile(file, "rws") val roundedNewSize = roundToExactMultiple(newSize, 8) + val position = this.mmap.position + + /* Windows won't let us modify the file length while the file is mmapped :-( */ + if(Os.isWindows) + forceUnmap(this.mmap) try { raf.setLength(roundedNewSize) - val position = this.mmap.position this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + this.maxEntries = this.mmap.limit / 8 this.mmap.position(position) } finally { Utils.swallow(raf.close()) @@ -282,10 +299,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } /** + * Forcefully free the buffer's mmap. We do this only on windows. + */ + def forceUnmap(m: MappedByteBuffer) { + try { + if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) + (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + } catch { + case t: Throwable => warn("Error when freeing index buffer", t) + } + } + + /** * Flush the data in the index to disk */ def flush() { - this synchronized { + inLock(lock) { mmap.force() } } @@ -326,4 +355,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * E.g. roundToExactMultiple(67, 8) == 64 */ private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor) + + /** + * Execute the given function in a lock only if we are running on windows. We do this + * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it + * and this requires synchronizing reads. + */ + private def maybeLock[T](lock: Lock)(fun: => T): T = { + if(Os.isWindows) + lock.lock() + try { + return fun + } finally { + if(Os.isWindows) + lock.unlock() + } + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index a5fc96d..c64260f 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} +import kafka.utils.Utils.inLock /** @@ -70,8 +71,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { - partitionMapLock.lock() - try { + inLock(partitionMapLock) { if (partitionMap.isEmpty) partitionMapCond.await(200L, TimeUnit.MILLISECONDS) partitionMap.foreach { @@ -79,8 +79,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, offset, fetchSize) } - } finally { - partitionMapLock.unlock() } val fetchRequest = fetchRequestBuilder.build() @@ -107,8 +105,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (response != null) { // process fetched data - partitionMapLock.lock() - try { + inLock(partitionMapLock) { response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple @@ -160,8 +157,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } } - } finally { - partitionMapLock.unlock() } } diff --git a/core/src/main/scala/kafka/utils/Os.scala b/core/src/main/scala/kafka/utils/Os.scala new file mode 100644 index 0000000..6574f08 --- /dev/null +++ b/core/src/main/scala/kafka/utils/Os.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +object Os { + val name = System.getProperty("os.name").toLowerCase + val isWindows = name.startsWith("windows") +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 405c7ae..c9ca95f 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -21,6 +21,7 @@ import java.io._ import java.nio._ import charset.Charset import java.nio.channels._ +import java.util.concurrent.locks.Lock import java.lang.management._ import javax.management._ import scala.collection._ @@ -587,4 +588,16 @@ object Utils extends Logging { (bytes(offset + 3) & 0xFF) } + /** + * Execute the given function inside the lock + */ + def inLock[T](lock: Lock)(fun: => T): T = { + lock.lock() + try { + return fun + } finally { + lock.unlock() + } + } + } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index d1c4b3d..856d136 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -35,6 +35,7 @@ import kafka.controller.KafkaController import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition +import kafka.utils.Utils.inLock object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -781,8 +782,7 @@ class LeaderExistsOrChangedListener(topic: String, def handleDataChange(dataPath: String, data: Object) { val t = dataPath.split("/").takeRight(3).head val p = dataPath.split("/").takeRight(2).head.toInt - leaderLock.lock() - try { + inLock(leaderLock) { if(t == topic && p == partition){ if(oldLeaderOpt == None){ trace("In leader existence listener on partition [%s, %d], leader has been created".format(topic, partition)) @@ -797,18 +797,12 @@ class LeaderExistsOrChangedListener(topic: String, } } } - finally { - leaderLock.unlock() - } } @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { - leaderLock.lock() - try { + inLock(leaderLock) { leaderExistsOrChanged.signal() - }finally { - leaderLock.unlock() } } } diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index ce1679a..920f318 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -18,12 +18,14 @@ package kafka.utils import java.util.Arrays +import java.util.concurrent.locks.ReentrantLock import java.nio.ByteBuffer import java.io._ import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Assert._ import kafka.common.KafkaException +import kafka.utils.Utils.inLock import org.junit.Test @@ -94,4 +96,16 @@ class UtilsTest extends JUnitSuite { assertTrue(emptyStringList.equals(emptyListFromNullString)) assertTrue(emptyStringList.equals(emptyList)) } + + @Test + def testInLock() { + val lock = new ReentrantLock() + val result = inLock(lock) { + assertTrue("Should be in lock", lock.isHeldByCurrentThread) + 1 + 1 + } + assertEquals(2, result) + assertFalse("Should be unlocked", lock.isLocked) + + } } -- 1.7.12.4 (Apple Git-37) From a986bae9e6c75b5ad202d565bf7a520557f47049 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Wed, 9 Oct 2013 16:55:49 -0700 Subject: [PATCH 2/2] KAFKA-1036 Close checkpoint file before renaming it for Windows compatibility. --- core/src/main/scala/kafka/log/OffsetIndex.scala | 2 +- core/src/main/scala/kafka/server/OffsetCheckpoint.scala | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index aa654e8..80dd430 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -301,7 +301,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /** * Forcefully free the buffer's mmap. We do this only on windows. */ - def forceUnmap(m: MappedByteBuffer) { + private def forceUnmap(m: MappedByteBuffer) { try { if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 815c90d..b5719f8 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -53,17 +53,17 @@ class OffsetCheckpoint(val file: File) extends Logging { // flush and overwrite old file writer.flush() - // swap new offset checkpoint file with previous one - if(!temp.renameTo(file)) { - // renameTo() fails on Windows if the destination file exists. - file.delete() - if(!temp.renameTo(file)) { - throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath)) - } - } } finally { writer.close() } + + // swap new offset checkpoint file with previous one + if(!temp.renameTo(file)) { + // renameTo() fails on Windows if the destination file exists. + file.delete() + if(!temp.renameTo(file)) + throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath)) + } } } -- 1.7.12.4 (Apple Git-37)