diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java index 5e97b80..64e02d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java @@ -24,6 +24,8 @@ import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY; import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -44,42 +46,165 @@ import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult; class DefaultHeapMemoryTuner implements HeapMemoryTuner { public static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step"; + public static final String MAX_ALLOWED_LOSS_KEY = + "hbase.regionserver.heapmemory.autotuner.step.max.loss"; + public static final String MIN_EXPECTED_GAIN_KEY = + "hbase.regionserver.heapmemory.autotuner.step.min.gain"; + public static final String SUFFICIENT_MEMORY_LEVEL_KEY = + "hbase.regionserver.heapmemory.autotuner.sufficient.memory.level"; public static final float DEFAULT_STEP_VALUE = 0.02f; // 2% - - private static final TunerResult TUNER_RESULT = new TunerResult(true); + // Fraction change less than this will not be considered harmful + // TODO dynamically calculate tolerance from standard deviation of percent changes + public static final float DEFAULT_MAX_ALLOWED_LOSS_VALUE = 0.01f; // 1% + // Fraction change less than this will not be considered useful + public static final float DEFAULT_MIN_EXPECTED_GAIN_VALUE = 0.005f; // 0.5% + // If current block cache size or memstore size in use is below this level relative to memory + // provided to it then corresponding component will be considered to have sufficient memory + public static final float DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE = 0.5f; // 50% + // Large constant value assigned to percent changes when its undefined + public static final float undefinedPercentChange = 1.0f; // 100% private static final TunerResult NO_OP_TUNER_RESULT = new TunerResult(false); + private Log LOG = LogFactory.getLog(DefaultHeapMemoryTuner.class); + private TunerResult TUNER_RESULT = new TunerResult(true); private Configuration conf; private float step = DEFAULT_STEP_VALUE; + private float maximumAllowedLoss = DEFAULT_MAX_ALLOWED_LOSS_VALUE; + private float minimumExpectedGain = DEFAULT_MIN_EXPECTED_GAIN_VALUE; + private float sufficientMemoryLevel = DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE; private float globalMemStorePercentMinRange; private float globalMemStorePercentMaxRange; private float blockCachePercentMinRange; private float blockCachePercentMaxRange; + private StepDirection prevTuneDirection = StepDirection.NEUTRAL; + private long prevFlushCount = 0; + private long prevEvictCount = 0; + private long prevCacheMissCount = 0; + @Override public TunerResult tune(TunerContext context) { long blockedFlushCount = context.getBlockedFlushCount(); long unblockedFlushCount = context.getUnblockedFlushCount(); long evictCount = context.getEvictCount(); - boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0; - boolean blockCacheSufficient = evictCount == 0; - if (memstoreSufficient && blockCacheSufficient) { + long cacheMissCount = context.getCacheMissCount(); + StepDirection newTuneDirection = StepDirection.NEUTRAL; + String tunerLog = ""; + // We can consider memstore or block cache to be sufficient if + // we are using only a minor fraction of what have been already provided to it + boolean earlyMemstoreSufficientCheck = (blockedFlushCount == 0 && unblockedFlushCount == 0) + || context.getCurMemStoreUsed() < context.getCurMemStoreSize()*sufficientMemoryLevel; + boolean earlyBlockCacheSufficientCheck = evictCount == 0 || + context.getCurBlockCacheUsed() < context.getCurBlockCacheSize()*sufficientMemoryLevel; + if (earlyMemstoreSufficientCheck && earlyBlockCacheSufficientCheck) { + prevFlushCount = blockedFlushCount + unblockedFlushCount; + prevEvictCount = evictCount; + prevCacheMissCount = cacheMissCount; + prevTuneDirection = StepDirection.NEUTRAL; return NO_OP_TUNER_RESULT; } float newMemstoreSize; float newBlockCacheSize; - if (memstoreSufficient) { + if (earlyMemstoreSufficientCheck) { // Increase the block cache size and corresponding decrease in memstore size - newBlockCacheSize = context.getCurBlockCacheSize() + step; - newMemstoreSize = context.getCurMemStoreSize() - step; - } else if (blockCacheSufficient) { + newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE; + } else if (earlyBlockCacheSufficientCheck) { // Increase the memstore size and corresponding decrease in block cache size - newBlockCacheSize = context.getCurBlockCacheSize() - step; - newMemstoreSize = context.getCurMemStoreSize() + step; + newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE; } else { - return NO_OP_TUNER_RESULT; - // As of now not making any tuning in write/read heavy scenario. + float percentChangeInEvictCount; + float percentChangeInFlushes; + float percentChangeInMisses; + if (prevEvictCount != 0) { + percentChangeInEvictCount = (float)(evictCount-prevEvictCount)/(float)(prevEvictCount); + } else { + // current and previous are both cannot be zero, assigning large percent change + percentChangeInEvictCount = undefinedPercentChange; + } + if (prevFlushCount != 0) { + percentChangeInFlushes = (float)(blockedFlushCount + unblockedFlushCount - + prevFlushCount)/(float)(prevFlushCount); + } else { + percentChangeInFlushes = undefinedPercentChange; + } + if (prevCacheMissCount != 0) { + percentChangeInMisses = (float)(cacheMissCount-prevCacheMissCount)/ + (float)(prevCacheMissCount); + } else { + percentChangeInMisses = undefinedPercentChange; + } + boolean isReverting = false; + switch (prevTuneDirection) { + case INCREASE_BLOCK_CACHE_SIZE: + if (percentChangeInEvictCount > -minimumExpectedGain || + percentChangeInFlushes > maximumAllowedLoss) { + // reverting previous step as it was not useful + newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE; + tunerLog += "Reverting previous tuning."; + if (percentChangeInEvictCount > -minimumExpectedGain) { + tunerLog += " As could not decrease evctions sufficiently."; + } else { + tunerLog += " As number of flushes rose significantly."; + } + isReverting = true; + } + break; + case INCREASE_MEMSTORE_SIZE: + if (percentChangeInEvictCount > maximumAllowedLoss || + percentChangeInFlushes > -minimumExpectedGain) { + // reverting previous step as it was not useful + newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE; + tunerLog += "Reverting previous tuning."; + if (percentChangeInEvictCount > -minimumExpectedGain) { + tunerLog += " As could not decrease flushes sufficiently."; + } else { + tunerLog += " As number of evictions rose significantly."; + } + isReverting = true; + } + break; + default: + // last step was neutral, revert doesn't not apply here + break; + } + if (!isReverting){ + if (percentChangeInEvictCount < maximumAllowedLoss && + percentChangeInFlushes < maximumAllowedLoss) { + // Everything is fine no tuning required + newTuneDirection = StepDirection.NEUTRAL; + } else if (percentChangeInMisses > maximumAllowedLoss && + percentChangeInFlushes < maximumAllowedLoss) { + // more misses , increasing cahce size + newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE; + tunerLog += + "Increasing block cache size as observed increase in number of cache misses."; + } else if (percentChangeInFlushes > maximumAllowedLoss && + percentChangeInMisses < maximumAllowedLoss) { + // more flushes , increasing memstore size + newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE; + tunerLog += "Increasing memstore size as observed increase in number of flushes."; + } else { + // Default. Not enough facts to do tuning. + newTuneDirection = StepDirection.NEUTRAL; + } + } + } + switch (newTuneDirection) { + case INCREASE_BLOCK_CACHE_SIZE: + newBlockCacheSize = context.getCurBlockCacheSize() + step; + newMemstoreSize = context.getCurMemStoreSize() - step; + break; + case INCREASE_MEMSTORE_SIZE: + newBlockCacheSize = context.getCurBlockCacheSize() - step; + newMemstoreSize = context.getCurMemStoreSize() + step; + break; + default: + prevFlushCount = blockedFlushCount + unblockedFlushCount; + prevEvictCount = evictCount; + prevCacheMissCount = cacheMissCount; + prevTuneDirection = StepDirection.NEUTRAL; + return NO_OP_TUNER_RESULT; } if (newMemstoreSize > globalMemStorePercentMaxRange) { newMemstoreSize = globalMemStorePercentMaxRange; @@ -93,6 +218,11 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { } TUNER_RESULT.setBlockCacheSize(newBlockCacheSize); TUNER_RESULT.setMemstoreSize(newMemstoreSize); + LOG.info(tunerLog); + prevFlushCount = blockedFlushCount + unblockedFlushCount; + prevEvictCount = evictCount; + prevCacheMissCount = cacheMissCount; + prevTuneDirection = newTuneDirection; return TUNER_RESULT; } @@ -105,6 +235,11 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { public void setConf(Configuration conf) { this.conf = conf; this.step = conf.getFloat(STEP_KEY, DEFAULT_STEP_VALUE); + this.minimumExpectedGain = conf.getFloat(MIN_EXPECTED_GAIN_KEY, + DEFAULT_MIN_EXPECTED_GAIN_VALUE); + this.maximumAllowedLoss = conf.getFloat(MAX_ALLOWED_LOSS_KEY, DEFAULT_MAX_ALLOWED_LOSS_VALUE); + this.sufficientMemoryLevel = conf.getFloat(SUFFICIENT_MEMORY_LEVEL_KEY, + DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE); this.blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT)); this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, @@ -114,4 +249,13 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false)); } + + private enum StepDirection{ + // block cache size was increased + INCREASE_BLOCK_CACHE_SIZE, + // memstore size was increased + INCREASE_MEMSTORE_SIZE, + // no operation was performed + NEUTRAL + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index fb998ea..95663e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1378,7 +1378,8 @@ public class HRegionServer extends HasThread implements } private void startHeapMemoryManager() { - this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this); + this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, + this, this.regionServerAccounting); if (this.hMemManager != null) { this.hMemManager.start(getChoreService()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index 5448025..a1c0497 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -76,6 +76,7 @@ public class HeapMemoryManager { private final ResizableBlockCache blockCache; private final FlushRequester memStoreFlusher; private final Server server; + private final RegionServerAccounting regionServerAccounting; private HeapMemoryTunerChore heapMemTunerChore = null; private final boolean tunerOn; @@ -85,21 +86,23 @@ public class HeapMemoryManager { private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher, - Server server) { + Server server, RegionServerAccounting regionServerAccounting) { BlockCache blockCache = CacheConfig.instantiateBlockCache(conf); if (blockCache instanceof ResizableBlockCache) { - return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server); + return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server, + regionServerAccounting); } return null; } @VisibleForTesting HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher, - Server server) { + Server server, RegionServerAccounting regionServerAccounting) { Configuration conf = server.getConfiguration(); this.blockCache = blockCache; this.memStoreFlusher = memStoreFlusher; this.server = server; + this.regionServerAccounting = regionServerAccounting; this.tunerOn = doInit(conf); this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD); @@ -217,6 +220,7 @@ public class HeapMemoryManager { private AtomicLong blockedFlushCount = new AtomicLong(); private AtomicLong unblockedFlushCount = new AtomicLong(); private long evictCount = 0L; + private long cacheMissCount = 0L; private TunerContext tunerContext = new TunerContext(); private boolean alarming = false; @@ -264,11 +268,21 @@ public class HeapMemoryManager { } private void tune() { - long curEvictCount = blockCache.getStats().getEvictedCount(); + // TODO check if we can increase the memory boundaries + // while remaining in the limits + long curEvictCount; + long curCacheMisCount; + curEvictCount = blockCache.getStats().getEvictedCount(); tunerContext.setEvictCount(curEvictCount - evictCount); evictCount = curEvictCount; + curCacheMisCount = blockCache.getStats().getMissCachingCount(); + tunerContext.setCacheMissCount(curCacheMisCount-cacheMissCount); + cacheMissCount = curCacheMisCount; tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0)); tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0)); + tunerContext.setCurBlockCacheUsed((float)blockCache.getCurrentSize() / maxHeapSize); + tunerContext.setCurMemStoreUsed( + (float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize); tunerContext.setCurBlockCacheSize(blockCachePercent); tunerContext.setCurMemStoreSize(globalMemStorePercent); TunerResult result = null; @@ -321,6 +335,8 @@ public class HeapMemoryManager { globalMemStorePercent = memstoreSize; memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize); } + } else { + LOG.info("No changes made by HeapMemoryTuner."); } } @@ -349,6 +365,9 @@ public class HeapMemoryManager { private long blockedFlushCount; private long unblockedFlushCount; private long evictCount; + private long cacheMissCount; + private float curBlockCacheUsed; + private float curMemStoreUsed; private float curMemStoreSize; private float curBlockCacheSize; @@ -391,6 +410,30 @@ public class HeapMemoryManager { public void setCurBlockCacheSize(float curBlockCacheSize) { this.curBlockCacheSize = curBlockCacheSize; } + + public long getCacheMissCount() { + return cacheMissCount; + } + + public void setCacheMissCount(long cacheMissCount) { + this.cacheMissCount = cacheMissCount; + } + + public float getCurBlockCacheUsed() { + return curBlockCacheUsed; + } + + public void setCurBlockCacheUsed(float curBlockCacheUsed) { + this.curBlockCacheUsed = curBlockCacheUsed; + } + + public float getCurMemStoreUsed() { + return curMemStoreUsed; + } + + public void setCurMemStoreUsed(float d) { + this.curMemStoreUsed = d; + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 2965071..bfe4a86 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -61,7 +61,7 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0), - new MemstoreFlusherStub(0), new RegionServerStub(conf)); + new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub()); assertFalse(manager.isTunerOn()); } @@ -71,7 +71,7 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f); HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0), - new MemstoreFlusherStub(0), new RegionServerStub(conf)); + new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub()); assertFalse(manager.isTunerOn()); } @@ -83,7 +83,8 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f); try { - new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf)); + new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), new RegionServerAccountingStub()); fail(); } catch (RuntimeException e) { } @@ -91,16 +92,84 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.2f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); try { - new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf)); + new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), new RegionServerAccountingStub()); fail(); } catch (RuntimeException e) { } } @Test + public void testWhenClusterIsWriteHeavyWithEmptyMemstore() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); + // Empty block cache and memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize(0); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), regionServerAccounting); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); + memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.requestFlush(null, false); + Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up + // No changes should be made by tuner as we already have lot of empty space + assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertEquals(oldBlockCacheSize, blockCache.maxSize); + } + + @Test + public void testWhenClusterIsReadHeavyWithEmptyBlockCache() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); + // Empty block cache and memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize(0); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), regionServerAccounting); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up + // No changes should be made by tuner as we already have lot of empty space + assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertEquals(oldBlockCacheSize, blockCache.maxSize); + } + + @Test public void testWhenClusterIsWriteHeavy() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); + // Empty block cache and but nearly filled memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -109,7 +178,7 @@ public class TestHeapMemoryManager { conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf)); + new RegionServerStub(conf), regionServerAccounting); long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); @@ -142,6 +211,10 @@ public class TestHeapMemoryManager { public void testWhenClusterIsReadHeavy() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); + // Empty memstore and but nearly filled block cache + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); + regionServerAccounting.setTestMemstoreSize(0); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -150,7 +223,7 @@ public class TestHeapMemoryManager { conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf)); + new RegionServerStub(conf), new RegionServerAccountingStub()); long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); @@ -175,6 +248,50 @@ public class TestHeapMemoryManager { } @Test + public void testWhenClusterIsHavingMoreWritesThanReads() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); + // Both memstore and block cache are nearly filled + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), regionServerAccounting); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); + memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + blockCache.evictBlock(null); + memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.requestFlush(null, false); + Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up + // No changes should happen as there is undefined increase in flushes and evictions + assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertEquals(oldBlockCacheSize, blockCache.maxSize); + // Do some more flushes before the next run of HeapMemoryTuner + memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + Thread.sleep(1500); + assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize, + memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldBlockCacheSize, + blockCache.maxSize); + } + + @Test public void testPluggingInHeapMemoryTuner() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); @@ -188,7 +305,7 @@ public class TestHeapMemoryManager { HeapMemoryTuner.class); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf)); + new RegionServerStub(conf), new RegionServerAccountingStub()); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); // Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner @@ -218,7 +335,7 @@ public class TestHeapMemoryManager { conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class, HeapMemoryTuner.class); HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf)); + new RegionServerStub(conf), new RegionServerAccountingStub()); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); CustomHeapMemoryTuner.memstoreSize = 0.78f; @@ -243,7 +360,7 @@ public class TestHeapMemoryManager { conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class, HeapMemoryTuner.class); HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf)); + new RegionServerStub(conf), new RegionServerAccountingStub()); long oldMemstoreSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); @@ -276,8 +393,8 @@ public class TestHeapMemoryManager { HeapMemoryTuner.class); try { - heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub( - conf)); + heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), new RegionServerAccountingStub()); fail("Should have failed as the collective heap memory need is above 80%"); } catch (Exception e) { } @@ -285,8 +402,8 @@ public class TestHeapMemoryManager { // Change the max/min ranges for memstore and bock cache so as to pass the criteria check conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.6f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.6f); - heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub( - conf)); + heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), new RegionServerAccountingStub()); long oldMemstoreSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); @@ -322,7 +439,8 @@ public class TestHeapMemoryManager { private static class BlockCacheStub implements ResizableBlockCache { CacheStats stats = new CacheStats("test"); long maxSize = 0; - + private long testBlockSize = 0; + public BlockCacheStub(long size){ this.maxSize = size; } @@ -378,7 +496,7 @@ public class TestHeapMemoryManager { @Override public long getCurrentSize() { - return 0; + return this.testBlockSize; } @Override @@ -400,6 +518,10 @@ public class TestHeapMemoryManager { public BlockCache[] getBlockCaches() { return null; } + + public void setTestBlockSize(long testBlockSize) { + this.testBlockSize = testBlockSize; + } } private static class MemstoreFlusherStub implements FlushRequester { @@ -526,4 +648,15 @@ public class TestHeapMemoryManager { return result; } } + + private static class RegionServerAccountingStub extends RegionServerAccounting { + private long testMemstoreSize = 0; + @Override + public long getGlobalMemstoreSize() { + return testMemstoreSize; + } + public void setTestMemstoreSize(long testMemstoreSize) { + this.testMemstoreSize = testMemstoreSize; + } + } }