diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 257e724..49be0fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1439,6 +1439,7 @@ public class HRegionServer extends HasThread implements startServiceThreads(); startHeapMemoryManager(); + initGlobalMSLABChunkPool(); LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x" + @@ -1458,6 +1459,15 @@ public class HRegionServer extends HasThread implements } } + private void initGlobalMSLABChunkPool() { + // Try init Global MSLAB chunkpool. When it is disabled no impact of this init call + MemStoreChunkPool chunkPool = MemStoreChunkPool.init(conf); + if (chunkPool != null) { + // Register it as HeapMemoryTuneObserver + this.hMemManager.registerTuneObserver(chunkPool); + } + } + private void startHeapMemoryManager() { this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this, this.regionServerAccounting); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 3ca4b0c..6326445 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -92,7 +92,7 @@ public class HeapMemStoreLAB implements MemStoreLAB { public HeapMemStoreLAB(Configuration conf) { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); - this.chunkPool = MemStoreChunkPool.getPool(conf); + this.chunkPool = MemStoreChunkPool.getPool(); // currently chunkQueue is only used for chunkPool if (this.chunkPool != null) { // set queue length to chunk pool max count to avoid keeping reference of diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index f90125e..c360a60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -87,6 +89,8 @@ public class HeapMemoryManager { private MetricsHeapMemoryManager metricsHeapMemoryManager; + private List tuneObservers = new ArrayList(); + public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher, Server server, RegionServerAccounting regionServerAccounting) { BlockCache blockCache = CacheConfig.instantiateBlockCache(conf); @@ -206,6 +210,10 @@ public class HeapMemoryManager { this.heapMemTunerChore.cancel(true); } + public void registerTuneObserver(HeapMemoryTuneObserver observer) { + this.tuneObservers.add(observer); + } + // Used by the test cases. boolean isTunerOn() { return this.tunerOn; @@ -351,6 +359,9 @@ public class HeapMemoryManager { blockCache.setMaxSize(newBlockCacheSize); globalMemStorePercent = memstoreSize; memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize); + for (HeapMemoryTuneObserver observer : tuneObservers) { + observer.onHeapMemoryTune(newMemstoreSize, newBlockCacheSize); + } } } else { metricsHeapMemoryManager.increaseTunerDoNothingCounter(); @@ -489,4 +500,17 @@ public class HeapMemoryManager { return needsTuning; } } + + /** + * Every class that wants to observe heap memory tune actions must implement this interface. + */ + public static interface HeapMemoryTuneObserver { + + /** + * This method would be called by HeapMemoryManger when a heap memory tune action took place. + * @param newMemstoreSize The newly calculated global memstore size + * @param newBlockCacheSize The newly calculated global blockcache size + */ + void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index 6b34d75..390ebc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -31,28 +31,29 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * A pool of {@link HeapMemStoreLAB.Chunk} instances. + * A pool of {@link Chunk} instances. * - * MemStoreChunkPool caches a number of retired chunks for reusing, it could - * decrease allocating bytes when writing, thereby optimizing the garbage - * collection on JVM. + * MemStoreChunkPool caches a number of retired chunks for reusing, it could decrease allocating + * bytes when writing, thereby optimizing the garbage collection on JVM. * - * The pool instance is globally unique and could be obtained through - * {@link MemStoreChunkPool#getPool(Configuration)} + * The pool instance is globally unique. It should be initialized using + * {@link #init(Configuration)}. The single instance can be obtained through + * {@link MemStoreChunkPool#getPool()} * - * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating - * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called - * when MemStore clearing snapshot for flush + * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating bytes, and + * {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called when MemStore clearing snapshot + * for flush */ @SuppressWarnings("javadoc") @InterfaceAudience.Private -public class MemStoreChunkPool { +public class MemStoreChunkPool implements HeapMemoryTuneObserver { private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize"; final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize"; @@ -64,30 +65,32 @@ public class MemStoreChunkPool { /** Boolean whether we have disabled the memstore chunk pool entirely. */ static boolean chunkPoolDisabled = false; - private final int maxCount; + private volatile int maxCount; // A queue of reclaimed chunks private final BlockingQueue reclaimedChunks; private final int chunkSize; + private final float poolSizePercentage; /** Statistics thread schedule pool */ private final ScheduledExecutorService scheduleThreadPool; /** Statistics thread */ private static final int statThreadPeriod = 60 * 5; - private final AtomicLong createdChunkCount = new AtomicLong(); + private final AtomicLong chunkCount = new AtomicLong(); private final AtomicLong reusedChunkCount = new AtomicLong(); MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount, - int initialCount) { + int initialCount, float poolSizePercentage) { this.maxCount = maxCount; this.chunkSize = chunkSize; + this.poolSizePercentage = poolSizePercentage; this.reclaimedChunks = new LinkedBlockingQueue(); for (int i = 0; i < initialCount; i++) { PooledChunk chunk = new PooledChunk(chunkSize); chunk.init(); reclaimedChunks.add(chunk); } - createdChunkCount.set(initialCount); + chunkCount.set(initialCount); final String n = Thread.currentThread().getName(); scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); @@ -113,10 +116,10 @@ public class MemStoreChunkPool { } else { // Make a chunk iff we have not yet created the maxCount chunks while (true) { - long created = this.createdChunkCount.get(); + long created = this.chunkCount.get(); if (created < this.maxCount) { chunk = new PooledChunk(chunkSize); - if (this.createdChunkCount.compareAndSet(created, created + 1)) { + if (this.chunkCount.compareAndSet(created, created + 1)) { break; } } else { @@ -132,11 +135,12 @@ public class MemStoreChunkPool { * skip the remaining chunks * @param chunks */ - void putbackChunks(BlockingQueue chunks) { - assert reclaimedChunks.size() < this.maxCount; + synchronized void putbackChunks(BlockingQueue chunks) { + int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); PooledChunk chunk = null; - while ((chunk = chunks.poll()) != null) { + while ((chunk = chunks.poll()) != null && toAdd > 0) { reclaimedChunks.add(chunk); + toAdd--; } } @@ -145,9 +149,10 @@ public class MemStoreChunkPool { * skip it * @param chunk */ - void putbackChunk(PooledChunk chunk) { - assert reclaimedChunks.size() < this.maxCount; - reclaimedChunks.add(chunk); + synchronized void putbackChunk(PooledChunk chunk) { + if (reclaimedChunks.size() < this.maxCount) { + reclaimedChunks.add(chunk); + } } int getPoolSize() { @@ -174,7 +179,7 @@ public class MemStoreChunkPool { private void logStats() { if (!LOG.isDebugEnabled()) return; - long created = createdChunkCount.get(); + long created = chunkCount.get(); long reused = reusedChunkCount.get(); long total = created + reused; LOG.debug("Stats: current pool size=" + reclaimedChunks.size() @@ -185,46 +190,41 @@ public class MemStoreChunkPool { } } + public static MemStoreChunkPool init(Configuration conf) { + float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT); + if (poolSizePercentage <= 0) { + chunkPoolDisabled = true; + return null; + } + if (poolSizePercentage > 1.0) { + throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); + } + long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + long globalMemStoreLimit = (long) (heapMax + * HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false)); + int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY, HeapMemStoreLAB.CHUNK_SIZE_DEFAULT); + int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize); + + float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY, + POOL_INITIAL_SIZE_DEFAULT); + if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { + throw new IllegalArgumentException( + CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); + } + + int initialCount = (int) (initialCountPercentage * maxCount); + LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) + + ", max count " + maxCount + ", initial count " + initialCount); + GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, + poolSizePercentage); + return GLOBAL_INSTANCE; + } + /** - * @param conf * @return the global MemStoreChunkPool instance */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DC_DOUBLECHECK", - justification="Intentional") - static MemStoreChunkPool getPool(Configuration conf) { - if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; - - synchronized (MemStoreChunkPool.class) { - if (chunkPoolDisabled) return null; - if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; - float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT); - if (poolSizePercentage <= 0) { - chunkPoolDisabled = true; - return null; - } - if (poolSizePercentage > 1.0) { - throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); - } - long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); - long globalMemStoreLimit = (long) (heapMax * HeapMemorySizeUtil.getGlobalMemStorePercent(conf, - false)); - int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY, - HeapMemStoreLAB.CHUNK_SIZE_DEFAULT); - int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize); - - float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY, - POOL_INITIAL_SIZE_DEFAULT); - if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { - throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY - + " must be between 0.0 and 1.0"); - } - - int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) - + ", max count " + maxCount + ", initial count " + initialCount); - GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount); - return GLOBAL_INSTANCE; - } + static MemStoreChunkPool getPool() { + return GLOBAL_INSTANCE; } int getMaxCount() { @@ -241,4 +241,30 @@ public class MemStoreChunkPool { super(size); } } + + @Override + public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { + int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); + if (newMaxCount != this.maxCount) { + // We need an adjustment in the chunks numbers + if (newMaxCount > this.maxCount) { + // Max chunks getting increased. Just change the variable. Later calls to getChunk() would + // create and add them to Q + LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + } else { + // Max chunks getting decreased. We may need to clear off some of the pooled chunks now + // itself. If the extra chunks are serving already, do not pool those when we get them back + LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + if (this.reclaimedChunks.size() > newMaxCount) { + synchronized (this) { + while (this.reclaimedChunks.size() > newMaxCount) { + this.reclaimedChunks.poll(); + } + } + } + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index db0205e..e4bc9f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -95,8 +95,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { this.region = hbaseUtility.createTestRegion("foobar", hcd); this.regionServicesForStores = region.getRegionServicesForStores(); this.store = new HStore(region, hcd, conf); - - chunkPool = MemStoreChunkPool.getPool(conf); + MemStoreChunkPool.init(conf); + chunkPool = MemStoreChunkPool.getPool(); assertTrue(chunkPool != null); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index c53ce80..f5869ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -44,21 +44,21 @@ import static org.junit.Assert.assertTrue; public class TestMemStoreChunkPool { private final static Configuration conf = new Configuration(); private static MemStoreChunkPool chunkPool; - private static boolean chunkPoolDisabledBeforeTest; + private static MemStoreChunkPool chunkPoolBeforeTest; @BeforeClass public static void setUpBeforeClass() throws Exception { conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true); conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; - MemStoreChunkPool.chunkPoolDisabled = false; - chunkPool = MemStoreChunkPool.getPool(conf); + chunkPoolBeforeTest = MemStoreChunkPool.GLOBAL_INSTANCE; + MemStoreChunkPool.init(conf); + chunkPool = MemStoreChunkPool.getPool(); assertTrue(chunkPool != null); } @AfterClass public static void tearDownAfterClass() throws Exception { - MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest; + MemStoreChunkPool.GLOBAL_INSTANCE = chunkPoolBeforeTest; } @Before @@ -202,7 +202,7 @@ public class TestMemStoreChunkPool { final int maxCount = 10; final int initialCount = 5; final int chunkSize = 10; - MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount); + MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, 1); assertEquals(initialCount, pool.getPoolSize()); assertEquals(maxCount, pool.getMaxCount()); MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 34caf97..35703ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -168,38 +168,43 @@ public class TestMemStoreLAB { conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1); // set chunk size to default max alloc size, so we could easily trigger chunk retirement conf.setLong(HeapMemStoreLAB.CHUNK_SIZE_KEY, HeapMemStoreLAB.MAX_ALLOC_DEFAULT); - // reconstruct mslab - MemStoreChunkPool.clearDisableFlag(); - mslab = new HeapMemStoreLAB(conf); - // launch multiple threads to trigger frequent chunk retirement - List threads = new ArrayList(); - for (int i = 0; i < 10; i++) { - threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i)); - } - for (Thread thread : threads) { - thread.start(); - } - // let it run for some time - Thread.sleep(1000); - for (Thread thread : threads) { - thread.interrupt(); - } - boolean threadsRunning = true; - while (threadsRunning) { + MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE; + MemStoreChunkPool.init(conf); + try { + // reconstruct mslab + mslab = new HeapMemStoreLAB(conf); + // launch multiple threads to trigger frequent chunk retirement + List threads = new ArrayList(); + for (int i = 0; i < 10; i++) { + threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i)); + } + for (Thread thread : threads) { + thread.start(); + } + // let it run for some time + Thread.sleep(1000); for (Thread thread : threads) { - if (thread.isAlive()) { - threadsRunning = true; - break; + thread.interrupt(); + } + boolean threadsRunning = true; + while (threadsRunning) { + for (Thread thread : threads) { + if (thread.isAlive()) { + threadsRunning = true; + break; + } } + threadsRunning = false; } - threadsRunning = false; + // close the mslab + mslab.close(); + // make sure all chunks reclaimed or removed from chunk queue + int queueLength = mslab.getChunkQueue().size(); + assertTrue("All chunks in chunk queue should be reclaimed or removed" + + " after mslab closed but actually: " + queueLength, queueLength == 0); + } finally { + MemStoreChunkPool.GLOBAL_INSTANCE = oldPool; } - // close the mslab - mslab.close(); - // make sure all chunks reclaimed or removed from chunk queue - int queueLength = mslab.getChunkQueue().size(); - assertTrue("All chunks in chunk queue should be reclaimed or removed" - + " after mslab closed but actually: " + queueLength, queueLength == 0); } private Thread getChunkQueueTestThread(final HeapMemStoreLAB mslab, String threadName) {