diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 71ae640..8c83e65 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -24,6 +24,7 @@ import scala.collection.immutable import collection.mutable.HashMap import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock +import kafka.utils.Utils.inLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} import kafka.common.TopicAndPartition @@ -110,14 +111,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") leaderFinderThread.start() - lock.lock() - try { + inLock(lock) { partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap this.cluster = cluster noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) cond.signalAll() - } finally { - lock.unlock() } } @@ -145,14 +143,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) { debug("adding partitions with error %s".format(partitionList)) - lock.lock() - try { + inLock(lock) { if (partitionMap != null) { noLeaderPartitionSet ++= partitionList cond.signalAll() } - } finally { - lock.unlock() } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e3a6420..e7957ea 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID import kafka.serializer._ import kafka.utils.ZkUtils._ +import kafka.utils.Utils.inLock import kafka.common._ import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge @@ -338,12 +339,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - lock.lock() - try { + inLock(lock) { isWatcherTriggered = true cond.signalAll() - } finally { - lock.unlock() } } diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index fbc728c..aa654e8 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -21,8 +21,10 @@ import scala.math._ import java.io._ import java.nio._ import java.nio.channels._ +import java.util.concurrent.locks._ import java.util.concurrent.atomic._ import kafka.utils._ +import kafka.utils.Utils.inLock import kafka.common.InvalidOffsetException /** @@ -52,6 +54,8 @@ import kafka.common.InvalidOffsetException */ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { + private val lock = new ReentrantLock + /* initialize the memory mapping for this index */ private var mmap: MappedByteBuffer = { @@ -88,6 +92,12 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /* the number of eight-byte entries currently in the index */ private var size = new AtomicInteger(mmap.position / 8) + /** + * The maximum number of eight-byte entries this index can hold + */ + @volatile + var maxEntries = mmap.limit / 8 + /* the last offset in the index */ var lastOffset = readLastOffset() @@ -98,18 +108,15 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * The last offset written to the index */ private def readLastOffset(): Long = { - val offset = - size.get match { - case 0 => 0 - case s => relativeOffset(this.mmap, s-1) - } - baseOffset + offset + inLock(lock) { + val offset = + size.get match { + case 0 => 0 + case s => relativeOffset(this.mmap, s-1) + } + baseOffset + offset + } } - - /** - * The maximum number of eight-byte entries this index can hold - */ - def maxEntries = mmap.limit / 8 /** * Find the largest offset less than or equal to the given targetOffset @@ -122,12 +129,14 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * the pair (baseOffset, 0) is returned. */ def lookup(targetOffset: Long): OffsetPosition = { - val idx = mmap.duplicate - val slot = indexSlotFor(idx, targetOffset) - if(slot == -1) - OffsetPosition(baseOffset, 0) - else - OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + maybeLock(lock) { + val idx = mmap.duplicate + val slot = indexSlotFor(idx, targetOffset) + if(slot == -1) + OffsetPosition(baseOffset, 0) + else + OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) + } } /** @@ -179,17 +188,19 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * @return The offset/position pair at that entry */ def entry(n: Int): OffsetPosition = { - if(n >= entries) - throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) - val idx = mmap.duplicate - OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + maybeLock(lock) { + if(n >= entries) + throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) + val idx = mmap.duplicate + OffsetPosition(relativeOffset(idx, n), physical(idx, n)) + } } /** * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. */ def append(offset: Long, position: Int) { - this synchronized { + inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + size + ").") if (size.get == 0 || offset > lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) @@ -198,8 +209,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi this.size.incrementAndGet() this.lastOffset = offset require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") - } - else { + } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, lastOffset, file.getName)) } @@ -221,7 +231,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Truncating to an offset larger than the largest in the index has no effect. */ def truncateTo(offset: Long) { - this synchronized { + inLock(lock) { val idx = mmap.duplicate val slot = indexSlotFor(idx, offset) @@ -245,9 +255,11 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * Truncates index to a known number of entries. */ private def truncateToEntries(entries: Int) { - this.size.set(entries) - mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset + inLock(lock) { + this.size.set(entries) + mmap.position(this.size.get * 8) + this.lastOffset = readLastOffset + } } /** @@ -255,7 +267,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * the file. */ def trimToValidSize() { - this synchronized { + inLock(lock) { resize(entries * 8) } } @@ -267,14 +279,18 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * we want to reset the index size to maximum index size to avoid rolling new segment. */ def resize(newSize: Int) { - this synchronized { - flush() + inLock(lock) { val raf = new RandomAccessFile(file, "rws") val roundedNewSize = roundToExactMultiple(newSize, 8) + val position = this.mmap.position + + /* Windows won't let us modify the file length while the file is mmapped :-( */ + if(Os.isWindows) + forceUnmap(this.mmap) try { raf.setLength(roundedNewSize) - val position = this.mmap.position this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + this.maxEntries = this.mmap.limit / 8 this.mmap.position(position) } finally { Utils.swallow(raf.close()) @@ -283,10 +299,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } /** + * Forcefully free the buffer's mmap. We do this only on windows. + */ + def forceUnmap(m: MappedByteBuffer) { + try { + if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) + (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() + } catch { + case t: Throwable => warn("Error when freeing index buffer", t) + } + } + + /** * Flush the data in the index to disk */ def flush() { - this synchronized { + inLock(lock) { mmap.force() } } @@ -327,4 +355,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi * E.g. roundToExactMultiple(67, 8) == 64 */ private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor) + + /** + * Execute the given function in a lock only if we are running on windows. We do this + * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it + * and this requires synchronizing reads. + */ + private def maybeLock[T](lock: Lock)(fun: => T): T = { + if(Os.isWindows) + lock.lock() + try { + return fun + } finally { + if(Os.isWindows) + lock.unlock() + } + } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 7663fac..3e368e0 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} +import kafka.utils.Utils.inLock /** @@ -70,8 +71,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { - partitionMapLock.lock() - try { + inLock(partitionMapLock) { if (partitionMap.isEmpty) partitionMapCond.await(200L, TimeUnit.MILLISECONDS) partitionMap.foreach { @@ -79,8 +79,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, offset, fetchSize) } - } finally { - partitionMapLock.unlock() } val fetchRequest = fetchRequestBuilder.build() @@ -107,8 +105,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke if (response != null) { // process fetched data - partitionMapLock.lock() - try { + inLock(partitionMapLock) { response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple @@ -160,8 +157,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } } - } finally { - partitionMapLock.unlock() } } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 4eaeae8..4c8088c 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -21,6 +21,7 @@ import java.io._ import java.nio._ import charset.Charset import java.nio.channels._ +import java.util.concurrent.locks.Lock import java.lang.management._ import javax.management._ import scala.collection._ @@ -587,4 +588,16 @@ object Utils extends Logging { (bytes(offset + 3) & 0xFF) } + /** + * Execute the given function inside the lock + */ + def inLock[T](lock: Lock)(fun: => T): T = { + lock.lock() + try { + return fun + } finally { + lock.unlock() + } + } + } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 84744eb..3afdb2f 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -34,6 +34,7 @@ import kafka.controller.PartitionAndReplica import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition +import kafka.utils.Utils.inLock object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -712,8 +713,7 @@ class LeaderExistsOrChangedListener(topic: String, def handleDataChange(dataPath: String, data: Object) { val t = dataPath.split("/").takeRight(3).head val p = dataPath.split("/").takeRight(2).head.toInt - leaderLock.lock() - try { + inLock(leaderLock) { if(t == topic && p == partition){ if(oldLeaderOpt == None){ trace("In leader existence listener on partition [%s, %d], leader has been created".format(topic, partition)) @@ -728,18 +728,12 @@ class LeaderExistsOrChangedListener(topic: String, } } } - finally { - leaderLock.unlock() - } } @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { - leaderLock.lock() - try { + inLock(leaderLock) { leaderExistsOrChanged.signal() - }finally { - leaderLock.unlock() } } } diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index ce1679a..920f318 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -18,12 +18,14 @@ package kafka.utils import java.util.Arrays +import java.util.concurrent.locks.ReentrantLock import java.nio.ByteBuffer import java.io._ import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Assert._ import kafka.common.KafkaException +import kafka.utils.Utils.inLock import org.junit.Test @@ -94,4 +96,16 @@ class UtilsTest extends JUnitSuite { assertTrue(emptyStringList.equals(emptyListFromNullString)) assertTrue(emptyStringList.equals(emptyList)) } + + @Test + def testInLock() { + val lock = new ReentrantLock() + val result = inLock(lock) { + assertTrue("Should be in lock", lock.isHeldByCurrentThread) + 1 + 1 + } + assertEquals(2, result) + assertFalse("Should be unlocked", lock.isLocked) + + } }