diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e058cfd..a0655ac 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -229,6 +229,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, * Check for major compactions. */ Chore majorCompactionChecker; + Chore memoryCompactor; // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes @@ -535,6 +536,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, + ".multiplier", 1000); this.majorCompactionChecker = new MajorCompactionChecker(this, this.threadWakeFrequency * multiplier, this); + this.memoryCompactor = new MemoryCompactor(this, + 1000, this); this.leases = new Leases((int) conf.getLong( HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, @@ -1056,6 +1059,36 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } } + private static class MemoryCompactor extends Chore { + private final HRegionServer instance; + private final Random rand; + + MemoryCompactor(final HRegionServer hrs, final int sleepTime, final Stoppable stopper) { + super("MemoryCompactor", sleepTime, hrs); + this.instance = hrs; + this.rand = new Random(); + } + + protected void chore() { + long maxFrags = 0; + MemStore mostFragmented = null; + + synchronized (this.instance.onlineRegions) { + for (HRegion r : this.instance.onlineRegions.values()) { + for (Store s : r.stores.values()) { + long frags = s.memstore.insertsSinceLastMemoryCompaction.get(); + if (frags > maxFrags || mostFragmented == null) { + mostFragmented = s.memstore; + maxFrags = frags; + } + } + } + } + + mostFragmented.compactMemory(); + } + } + /** * Report the status of the server. A server is online once all the startup is * completed (setting up filesystem, starting service threads, etc.). This @@ -1240,6 +1273,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, handler); Threads.setDaemonThreadRunning(this.majorCompactionChecker, n + ".majorCompactionChecker", handler); + Threads.setDaemonThreadRunning(this.memoryCompactor, n + + ".memoryCompactor", handler); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 53ec17c..2181365 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionserver; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.rmi.UnexpectedException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -77,6 +78,8 @@ public class MemStore implements HeapSize { // Used to track own heapSize final AtomicLong size; + final AtomicLong rawDataSize; + final AtomicLong insertsSinceLastMemoryCompaction; TimeRangeTracker timeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker; @@ -102,6 +105,59 @@ public class MemStore implements HeapSize { timeRangeTracker = new TimeRangeTracker(); snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); + this.rawDataSize = new AtomicLong(0); + this.insertsSinceLastMemoryCompaction = new AtomicLong(0); + } + + void compactMemory() { + KeyValueSkipListSet kvsetNew = new KeyValueSkipListSet(comparator); + + // First scan to estimate how much space we need + LOG.info("Memory compactor scanning to estimate space"); + int sizeThreshold = (int)this.rawDataSize.get() / 3; + int accumulatedSpace = 0; + for (KeyValue kv : kvset) { + int bufLen = kv.getBuffer().length; + if (bufLen < sizeThreshold) { + accumulatedSpace += kv.getLength(); + } + } + + LOG.info("Planning to compact " + accumulatedSpace + " bytes from chunks " + + "less than " + sizeThreshold + "bytes"); + ByteBuffer newData = ByteBuffer.allocate(accumulatedSpace); + byte newDataBytes[] = newData.array(); + + this.lock.readLock().lock(); + long startTime = System.nanoTime(); + try { + int offset = 0; + int numCompacted = 0, numKvs = 0; + + for (KeyValue kv : kvset) { + KeyValue newKv; + if (kv.getBuffer().length < sizeThreshold && + offset + kv.getLength() <= accumulatedSpace) { + newData.put(kv.getBuffer(), kv.getOffset(), kv.getLength()); + newKv = new KeyValue(newDataBytes, offset, kv.getLength()); + offset += (long)kv.getLength(); + numCompacted++; + } else { + newKv = kv; + } + numKvs++; + kvsetNew.add(newKv); + } + long actualLen = offset; + long endTime = System.nanoTime(); + insertsSinceLastMemoryCompaction.set(0); + LOG.info("Compacted " + actualLen + "bytes of " + rawDataSize + + " (" + numCompacted + " small kvs out of " + numKvs + ")" + + " in " + ((endTime - startTime)/1000000) + "ms"); + this.kvset = kvsetNew; + } finally { + this.lock.readLock().unlock(); + } } void dump() { @@ -134,6 +190,8 @@ public class MemStore implements HeapSize { this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); + this.rawDataSize.set(0); + this.insertsSinceLastMemoryCompaction.set(0); } } } finally { @@ -187,9 +245,14 @@ public class MemStore implements HeapSize { long s = -1; this.lock.readLock().lock(); try { - s = heapSizeChange(kv, this.kvset.add(kv)); + boolean notPresent = this.kvset.add(kv); + s = heapSizeChange(kv, notPresent); timeRangeTracker.includeTimestamp(kv); this.size.addAndGet(s); + if (notPresent) { + rawDataSize.addAndGet(kv.getLength()); + insertsSinceLastMemoryCompaction.incrementAndGet(); + } } finally { this.lock.readLock().unlock(); } @@ -205,8 +268,13 @@ public class MemStore implements HeapSize { long s = 0; this.lock.readLock().lock(); try { - s += heapSizeChange(delete, this.kvset.add(delete)); + boolean notPresent = this.kvset.add(delete); + s = heapSizeChange(delete, notPresent); timeRangeTracker.includeTimestamp(delete); + if (notPresent) { + this.rawDataSize.addAndGet(delete.getLength()); + insertsSinceLastMemoryCompaction.incrementAndGet(); + } } finally { this.lock.readLock().unlock(); }