From 70fa0e622dfbdcf6b9fa1429348b9f13ef7de913 Mon Sep 17 00:00:00 2001 From: Manas Alekar Date: Thu, 5 May 2016 16:03:26 -0700 Subject: [PATCH] POC improving deduping segments. --- core/src/main/scala/kafka/log/LogCleaner.scala | 30 +++++++----- core/src/main/scala/kafka/log/OffsetMap.scala | 65 +++++++++++++++++++------- 2 files changed, 67 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index e23234b..416c4a0 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -190,8 +190,10 @@ class LogCleaner(val config: CleanerConfig, warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...") val cleaner = new Cleaner(id = threadId, - offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, - hashAlgorithm = config.hashAlgorithm), + offsetMap = new OffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads / 2, Int.MaxValue).toInt, + hashAlgorithm = config.hashAlgorithm), + scrachMap = new OffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads / 2, Int.MaxValue).toInt, + hashAlgorithm = config.hashAlgorithm), ioBufferSize = config.ioBufferSize / config.numThreads / 2, maxIoBufferSize = config.maxMessageSize, dupBufferLoadFactor = config.dedupeBufferLoadFactor, @@ -281,6 +283,7 @@ class LogCleaner(val config: CleanerConfig, * This class holds the actual logic for cleaning a log * @param id An identifier used for logging * @param offsetMap The map used for deduplication + * @param scratchMap An offset map used to compact per-segment information * @param ioBufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer. * @param maxIoBufferSize The maximum size of a message that can appear in the log * @param dupBufferLoadFactor The maximum percent full for the deduplication buffer @@ -290,6 +293,7 @@ class LogCleaner(val config: CleanerConfig, */ private[log] class Cleaner(val id: Int, val offsetMap: OffsetMap, + val scratchMap: OffsetMap, ioBufferSize: Int, maxIoBufferSize: Int, dupBufferLoadFactor: Double, @@ -311,6 +315,9 @@ private[log] class Cleaner(val id: Int, /* buffer used for write i/o */ private var writeBuffer = ByteBuffer.allocate(ioBufferSize) + /* location to where scratchMap deduped offsets */ + private var next_offset = 0L + /** * Clean the given log * @@ -615,17 +622,13 @@ private[log] class Cleaner(val id: Int, // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) var offset = dirty.head.baseOffset require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) + val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt - var full = false - for (segment <- dirty if !full) { + for (segment <- dirty if offsetMap.size < maxDesiredMapSize) { checkDone(log.topicAndPartition) - val segmentSize = segment.nextOffset() - segment.baseOffset - - require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize)) - if (map.size + segmentSize <= maxDesiredMapSize) - offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) - else - full = true + next_offset = buildOffsetMapForSegment(log.topicAndPartition, segment, scratchMap) + if (offsetMap.putAll(scratchMap)) + offset = next_offset } info("Offset map for log %s complete.".format(log.name)) offset @@ -640,6 +643,11 @@ private[log] class Cleaner(val id: Int, * @return The final offset covered by the map */ private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = { + // offsetMap didn't have space to compact last iteration, but we already visited this segment + if (next_offset == segment.nextOffset()) + return next_offset + + map.clear() // we are de-duping a new segment, clear scratch map var position = 0 var offset = segment.baseOffset while (position < segment.log.sizeInBytes) { diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index 3893b2c..7980cf5 100755 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -23,15 +23,6 @@ import java.nio.ByteBuffer import kafka.utils._ import org.apache.kafka.common.utils.Utils -trait OffsetMap { - def slots: Int - def put(key: ByteBuffer, offset: Long) - def get(key: ByteBuffer): Long - def clear() - def size: Int - def utilization: Double = size.toDouble / slots -} - /** * An hash table used for deduplicating the log. This hash table uses a cryptographicly secure hash of the key as a proxy for the key * for comparisons and to save space on object overhead. Collisions are resolved by probing. This hash table does not support deletes. @@ -39,7 +30,7 @@ trait OffsetMap { * @param hashAlgorithm The hash algorithm instance to use: MD2, MD5, SHA-1, SHA-256, SHA-384, SHA-512 */ @nonthreadsafe -class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extends OffsetMap { +class OffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") { private val bytes = ByteBuffer.allocate(memory) /* the hash algorithm instance to use, default is MD5 */ @@ -76,10 +67,14 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend * @param key The key * @param offset The offset */ - override def put(key: ByteBuffer, offset: Long) { + def put(key: ByteBuffer, offset: Long) { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) + putHash1(offset) + } + + private def putHash1(offset: Long) { // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) @@ -100,7 +95,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend bytes.putLong(offset) entries += 1 } - + /** * Check that there is no entry at the given position */ @@ -112,9 +107,13 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend * @param key The key * @return The offset associated with this key or -1 if the key is not found */ - override def get(key: ByteBuffer): Long = { + def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) + getHash1() + } + + private def getHash1(): Long = { // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 @@ -128,12 +127,42 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend } while(!Arrays.equals(hash1, hash2)) bytes.getLong() } - + + /** + * This copies all entries from an OffsetMap, taking all or none of the entries depending on available space. Values + * for identical keys are overwritten. + * @param from The OffsetMap to copy from + * @return If the put succeeded. + */ + def putAll(from: OffsetMap): Boolean = { + require(from.hashAlgorithm == hashAlgorithm, "cannot merge maps with different hashing algorithms") + + var common = 0 + from.bytes.rewind() + while (from.bytes.hasRemaining) { + from.bytes.get(hash1) + if (getHash1() > 0) + common += 1 + from.bytes.get() // discard offset + } + + if (slots + from.slots - common > size) + return false + + from.bytes.rewind() + while (from.bytes.hasRemaining) { + from.bytes.get(hash1) + putHash1(from.bytes.get()) + } + + return true + } + /** * Change the salt used for key hashing making all existing keys unfindable. * Doesn't actually zero out the array. */ - override def clear() { + def clear() { this.entries = 0 this.lookups = 0L this.probes = 0L @@ -143,8 +172,10 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend /** * The number of entries put into the map (note that not all may remain) */ - override def size: Int = entries - + def size: Int = entries + + def utilization: Double = size.toDouble / slots + /** * The rate of collisions in the lookups */ -- 2.8.2