From e1e4cf6685e3b5ca7900635a11a8a0be80a569a5 Mon Sep 17 00:00:00 2001
From: Maxim Ivanov <ivanov.maxim@gmail.com>
Date: Sun, 8 Feb 2015 18:41:06 +0000
Subject: [PATCH] Finer locking in log append

This patch adds finer locking when appending to log. It breaks
global append lock into 2 sequential and 1 parallel phase.

Basic idea is to allow every thread to "reserve" offsets in non
overlapping ranges, then do compression in parallel and then
"commit" write to log in the same order offsets where reserved.
---
 core/src/main/scala/kafka/log/Log.scala | 68 ++++++++++++++++++++++++---------
 1 file changed, 51 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ec19215..5a80249 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -24,7 +24,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats}
 
 import java.io.{IOException, File}
-import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, Semaphore}
 import java.util.concurrent.atomic._
 import java.text.NumberFormat
 import scala.collection.JavaConversions
@@ -69,6 +69,14 @@ class Log(val dir: File,
   /* Calculate the offset of the next message */
   @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
 
+  /* Offset to allocate by threads appending to log */
+  private var nextOffsetToAllocate: Long = nextOffsetMetadata.messageOffset
+  private var logWriteCount: Int = 0
+  private val logAppendSemaphores: Array[Semaphore] = Array.fill[Semaphore](32)(new Semaphore(0))
+  // letting first thread to write to the log
+  logAppendSemaphores(0).release()
+  private val numActiveWriters = new AtomicLong(0)
+
   val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
 
   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
@@ -262,15 +270,35 @@ class Log(val dir: File,
       
     // trim any invalid bytes or partial messages before appending it to the on-disk log
     var validMessages = trimInvalidBytes(messages, appendInfo)
+    var semAppend: Semaphore = null
+    var semAppendNext: Semaphore = null
 
     try {
       // they are valid, insert them in the log
-      lock synchronized {
-        appendInfo.firstOffset = nextOffsetMetadata.messageOffset
+      logAppendSemaphores synchronized {
+        // under the lock we just allocate block of offsets to us and obtain our and next thread semaphores
+        appendInfo.firstOffset = nextOffsetToAllocate
+        nextOffsetToAllocate += appendInfo.shallowCount
+
+        // get our semaphore to wait on before we can write to log
+        semAppend = logAppendSemaphores(logWriteCount % logAppendSemaphores.length)
+        logWriteCount += 1
+
+        // semaphore which next thread will be waiting on
+        semAppendNext = logAppendSemaphores(logWriteCount % logAppendSemaphores.length)
+
+        numActiveWriters.incrementAndGet()
 
-        if(assignOffsets) {
+        // protection from eating our own tail. Should be an extremely rare case.
+        // If number of writers == total number of semaphore then our semAppend is used by other thread,
+        // so we spin wait for it to complete, before proceeding
+        while (numActiveWriters.get() == logAppendSemaphores.length) {}
+      }
+
+      try {
+        if (assignOffsets) {
           // assign offsets to the message set
-          val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
+          val offset = new AtomicLong(appendInfo.firstOffset)
           try {
             validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
           } catch {
@@ -301,23 +329,29 @@ class Log(val dir: File,
             .format(validMessages.sizeInBytes, config.segmentSize))
         }
 
+        // going to block until thread allocated previous block of offsets is done
+        semAppend.acquireUninterruptibly()
+        lock synchronized {
+          // maybe roll the log if this segment is full
+          val segment = maybeRoll(validMessages.sizeInBytes)
 
-        // maybe roll the log if this segment is full
-        val segment = maybeRoll(validMessages.sizeInBytes)
-
-        // now append to the log
-        segment.append(appendInfo.firstOffset, validMessages)
+          // now append to the log
+          segment.append(appendInfo.firstOffset, validMessages)
 
-        // increment the log end offset
-        updateLogEndOffset(appendInfo.lastOffset + 1)
+          // increment the log end offset
+          updateLogEndOffset(appendInfo.lastOffset + 1)
 
-        trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
-                .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
-
-        if(unflushedMessages >= config.flushInterval)
-          flush()
+          trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
+            .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
 
+          if(unflushedMessages >= config.flushInterval)
+            flush()
+        }
         appendInfo
+      } finally {
+        // let next thread to proceed with writing to the log
+        semAppendNext.release()
+        numActiveWriters.decrementAndGet()
       }
     } catch {
       case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
-- 
2.1.4

