diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 257e724..bcd0c3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1463,6 +1463,11 @@ public class HRegionServer extends HasThread implements this, this.regionServerAccounting); if (this.hMemManager != null) { this.hMemManager.start(getChoreService()); + MemStoreChunkPool chunkPool = MemStoreChunkPool.getPool(this.conf); + if (chunkPool != null) { + // Register it as HeapMemoryTuneObserver + this.hMemManager.registerTuneObserver(chunkPool); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index f90125e..c360a60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -87,6 +89,8 @@ public class HeapMemoryManager { private MetricsHeapMemoryManager metricsHeapMemoryManager; + private List tuneObservers = new ArrayList(); + public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher, Server server, RegionServerAccounting regionServerAccounting) { BlockCache blockCache = CacheConfig.instantiateBlockCache(conf); @@ -206,6 +210,10 @@ public class HeapMemoryManager { this.heapMemTunerChore.cancel(true); } + public void registerTuneObserver(HeapMemoryTuneObserver observer) { + this.tuneObservers.add(observer); + } + // Used by the test cases. boolean isTunerOn() { return this.tunerOn; @@ -351,6 +359,9 @@ public class HeapMemoryManager { blockCache.setMaxSize(newBlockCacheSize); globalMemStorePercent = memstoreSize; memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize); + for (HeapMemoryTuneObserver observer : tuneObservers) { + observer.onHeapMemoryTune(newMemstoreSize, newBlockCacheSize); + } } } else { metricsHeapMemoryManager.increaseTunerDoNothingCounter(); @@ -489,4 +500,17 @@ public class HeapMemoryManager { return needsTuning; } } + + /** + * Every class that wants to observe heap memory tune actions must implement this interface. + */ + public static interface HeapMemoryTuneObserver { + + /** + * This method would be called by HeapMemoryManger when a heap memory tune action took place. + * @param newMemstoreSize The newly calculated global memstore size + * @param newBlockCacheSize The newly calculated global blockcache size + */ + void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index 6b34d75..0664788 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -52,7 +53,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ @SuppressWarnings("javadoc") @InterfaceAudience.Private -public class MemStoreChunkPool { +public class MemStoreChunkPool implements HeapMemoryTuneObserver { private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize"; final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize"; @@ -64,30 +65,32 @@ public class MemStoreChunkPool { /** Boolean whether we have disabled the memstore chunk pool entirely. */ static boolean chunkPoolDisabled = false; - private final int maxCount; + private int maxCount; // A queue of reclaimed chunks private final BlockingQueue reclaimedChunks; private final int chunkSize; + private final float poolSizePercentage; /** Statistics thread schedule pool */ private final ScheduledExecutorService scheduleThreadPool; /** Statistics thread */ private static final int statThreadPeriod = 60 * 5; - private final AtomicLong createdChunkCount = new AtomicLong(); + private final AtomicLong chunkCount = new AtomicLong(); private final AtomicLong reusedChunkCount = new AtomicLong(); MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount, - int initialCount) { + int initialCount, float poolSizePercentage) { this.maxCount = maxCount; this.chunkSize = chunkSize; + this.poolSizePercentage = poolSizePercentage; this.reclaimedChunks = new LinkedBlockingQueue(); for (int i = 0; i < initialCount; i++) { PooledChunk chunk = new PooledChunk(chunkSize); chunk.init(); reclaimedChunks.add(chunk); } - createdChunkCount.set(initialCount); + chunkCount.set(initialCount); final String n = Thread.currentThread().getName(); scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); @@ -113,10 +116,10 @@ public class MemStoreChunkPool { } else { // Make a chunk iff we have not yet created the maxCount chunks while (true) { - long created = this.createdChunkCount.get(); + long created = this.chunkCount.get(); if (created < this.maxCount) { chunk = new PooledChunk(chunkSize); - if (this.createdChunkCount.compareAndSet(created, created + 1)) { + if (this.chunkCount.compareAndSet(created, created + 1)) { break; } } else { @@ -132,11 +135,12 @@ public class MemStoreChunkPool { * skip the remaining chunks * @param chunks */ - void putbackChunks(BlockingQueue chunks) { - assert reclaimedChunks.size() < this.maxCount; + synchronized void putbackChunks(BlockingQueue chunks) { + int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); PooledChunk chunk = null; - while ((chunk = chunks.poll()) != null) { + while ((chunk = chunks.poll()) != null && toAdd > 0) { reclaimedChunks.add(chunk); + toAdd--; } } @@ -145,9 +149,10 @@ public class MemStoreChunkPool { * skip it * @param chunk */ - void putbackChunk(PooledChunk chunk) { - assert reclaimedChunks.size() < this.maxCount; - reclaimedChunks.add(chunk); + synchronized void putbackChunk(PooledChunk chunk) { + if (reclaimedChunks.size() < this.maxCount) { + reclaimedChunks.add(chunk); + } } int getPoolSize() { @@ -174,7 +179,7 @@ public class MemStoreChunkPool { private void logStats() { if (!LOG.isDebugEnabled()) return; - long created = createdChunkCount.get(); + long created = chunkCount.get(); long reused = reusedChunkCount.get(); long total = created + reused; LOG.debug("Stats: current pool size=" + reclaimedChunks.size() @@ -222,7 +227,8 @@ public class MemStoreChunkPool { int initialCount = (int) (initialCountPercentage * maxCount); LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) + ", max count " + maxCount + ", initial count " + initialCount); - GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount); + GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, + poolSizePercentage); return GLOBAL_INSTANCE; } } @@ -241,4 +247,30 @@ public class MemStoreChunkPool { super(size); } } + + @Override + public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { + int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); + if (newMaxCount != this.maxCount) { + // We need an adjustment in the chunks numbers + if (newMaxCount > this.maxCount) { + // Max chunks getting increased. Just change the variable. Later calls to getChunk() would + // create and add them to Q + LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + } else { + // Max chunks getting decreased. We may need to clear off some of the pooled chunks now + // itself. If the extra chunks are serving already, do not pool those when we get them back + LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + if (this.reclaimedChunks.size() > newMaxCount) { + synchronized (this) { + while (this.reclaimedChunks.size() > newMaxCount) { + this.reclaimedChunks.poll(); + } + } + } + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index a426a07..cfbb098 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -203,7 +203,7 @@ public class TestMemStoreChunkPool { final int maxCount = 10; final int initialCount = 5; final int chunkSize = 10; - MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount); + MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, 1); assertEquals(initialCount, pool.getPoolSize()); assertEquals(maxCount, pool.getMaxCount()); MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created.