.../hadoop/hbase/io/util/MemorySizeUtil.java | 12 +- .../hbase/regionserver/DefaultHeapMemoryTuner.java | 21 ++- .../hadoop/hbase/regionserver/HRegionServer.java | 13 +- .../hbase/regionserver/HeapMemoryManager.java | 70 ++++++- .../hbase/regionserver/MemStoreChunkPool.java | 39 ++-- .../hadoop/hbase/regionserver/MemStoreFlusher.java | 132 ++++++++----- .../hbase/regionserver/RegionServerAccounting.java | 113 +++++++++++ .../hbase/regionserver/wal/AbstractFSWAL.java | 12 +- .../regionserver/TestHRegionReplayEvents.java | 2 +- .../hbase/regionserver/TestHeapMemoryManager.java | 208 +++++++++++++++------ .../regionserver/TestRegionServerAccounting.java | 120 ++++++++++++ 11 files changed, 596 insertions(+), 146 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java index 6c5d8fd..a0fe334 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java @@ -141,9 +141,19 @@ public class MemorySizeUtil { long globalMemStoreLimit = (long) (offheapMSGlobal * 1024 * 1024); // Size in bytes return new Pair(globalMemStoreLimit, MemoryType.NON_HEAP); } + return new Pair(getOnheapAllowedGlobalMemstoreSize(conf), MemoryType.HEAP); + } + + /** + * Returns the onheap global memstore limit based on the config + * 'hbase.regionserver.global.memstore.size'. + * @param conf + * @return the onheap global memstore limt + */ + public static long getOnheapAllowedGlobalMemstoreSize(Configuration conf) { long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); float globalMemStorePercent = getGlobalMemStoreHeapPercent(conf, true); - return new Pair((long) (max * globalMemStorePercent), MemoryType.HEAP); + return ((long) (max * globalMemStorePercent)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java index 1c7dfe2..eeaede1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java @@ -139,6 +139,13 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { } StepDirection newTuneDirection = getTuneDirection(context); + long blockedFlushCount = context.getBlockedFlushCount(); + long unblockedFlushCount = context.getUnblockedFlushCount(); + long blockedOffheapFlushCount = context.getBlockedOffheapFlushCount(); + long unblockedOffheapFlushCount = context.getUnblockedFlushCount(); + long totalOnheapFlushCount = blockedFlushCount + unblockedFlushCount; + long totalOffheapFlushCount = blockedOffheapFlushCount + unblockedOffheapFlushCount; + boolean offheapMemstore = context.isOffheapMemstore(); float newMemstoreSize; float newBlockCacheSize; @@ -167,6 +174,14 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { step = 0.0f; newTuneDirection = StepDirection.NEUTRAL; } + if (totalOnheapFlushCount == 0 && (totalOffheapFlushCount > 0 || offheapMemstore) + && newTuneDirection == StepDirection.INCREASE_BLOCK_CACHE_SIZE) { + // we are sure that there are flushes only due to offheap pressure + // So don't do the memstore decrease equal to the step size. Instead do minimum stepSize + // decrease. But even if we have some flushes due to heap then it is better we tune + // the existing way. + step = minimumStepSize; + } // Increase / decrease the memstore / block cahce sizes depending on new tuner step. // We don't want to exert immediate pressure on memstore. So, we decrease its size gracefully; // we set a minimum bar in the middle of the total memstore size and the lower limit. @@ -222,7 +237,7 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { long unblockedFlushCount = context.getUnblockedFlushCount(); long evictCount = context.getEvictCount(); long cacheMissCount = context.getCacheMissCount(); - long totalFlushCount = blockedFlushCount+unblockedFlushCount; + long totalFlushCount = blockedFlushCount + unblockedFlushCount; float curMemstoreSize = context.getCurMemStoreSize(); float curBlockCacheSize = context.getCurBlockCacheSize(); StringBuilder tunerLog = new StringBuilder(); @@ -342,8 +357,8 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { */ private void addToRollingStats(TunerContext context) { rollingStatsForCacheMisses.insertDataValue(context.getCacheMissCount()); - rollingStatsForFlushes.insertDataValue(context.getBlockedFlushCount() + - context.getUnblockedFlushCount()); + rollingStatsForFlushes + .insertDataValue(context.getBlockedFlushCount() + context.getUnblockedFlushCount()); rollingStatsForEvictions.insertDataValue(context.getEvictCount()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e7cf899..f86d504 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -100,7 +100,6 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; @@ -171,7 +170,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JSONBean; import org.apache.hadoop.hbase.util.JvmPauseMonitor; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; @@ -580,7 +578,7 @@ public class HRegionServer extends HasThread implements // or process owner as default super user. Superusers.initialize(conf); - regionServerAccounting = new RegionServerAccounting(); + regionServerAccounting = new RegionServerAccounting(conf); cacheConfig = new CacheConfig(conf); mobCacheConfig = new MobCacheConfig(conf); uncaughtExceptionHandler = new UncaughtExceptionHandler() { @@ -1469,16 +1467,15 @@ public class HRegionServer extends HasThread implements // MSLAB is enabled. So initialize MemStoreChunkPool // By this time, the MemstoreFlusher is already initialized. We can get the global limits from // it. - Pair pair = MemorySizeUtil.getGlobalMemstoreSize(conf); - long globalMemStoreSize = pair.getFirst(); - boolean offheap = pair.getSecond() == MemoryType.NON_HEAP; + long globalMemStoreLimit = this.regionServerAccounting.getGlobalMemstoreLimit(); + boolean offheap = this.regionServerAccounting.getMemoryType() == MemoryType.NON_HEAP; // When off heap memstore in use, take full area for chunk pool. float poolSizePercentage = offheap ? 1.0F : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); - MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage, + MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreLimit, poolSizePercentage, initialCountPercentage, chunkSize, offheap); if (pool != null && this.hMemManager != null) { // Register with Heap Memory manager @@ -3593,7 +3590,7 @@ public class HRegionServer extends HasThread implements return 0.0; } return getRegionServerAccounting().getGlobalMemstoreSize() * 1.0 - / cacheFlusher.globalMemStoreLimitLowMark; + / getRegionServerAccounting().getGlobalMemstoreLimitLowMark(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index 280bc9a..d087cbd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; import java.lang.management.ManagementFactory; +import java.lang.management.MemoryType; import java.lang.management.MemoryUsage; import java.util.ArrayList; import java.util.List; @@ -75,6 +76,7 @@ public class HeapMemoryManager { private float heapOccupancyPercent; private final ResizableBlockCache blockCache; + // TODO : remove this and mark regionServerAccounting as the observer directly private final FlushRequester memStoreFlusher; private final Server server; private final RegionServerAccounting regionServerAccounting; @@ -228,6 +230,8 @@ public class HeapMemoryManager { private HeapMemoryTuner heapMemTuner; private AtomicLong blockedFlushCount = new AtomicLong(); private AtomicLong unblockedFlushCount = new AtomicLong(); + private AtomicLong blockedOffheapFlushCount = new AtomicLong(); + private AtomicLong unblockedOffheapFlushCount = new AtomicLong(); private long evictCount = 0L; private long cacheMissCount = 0L; private TunerContext tunerContext = new TunerContext(); @@ -238,6 +242,8 @@ public class HeapMemoryManager { Class tunerKlass = server.getConfiguration().getClass( HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class); heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration()); + tunerContext + .setIsOffheapMemstore(regionServerAccounting.getMemoryType() == MemoryType.NON_HEAP); } @Override @@ -284,6 +290,8 @@ public class HeapMemoryManager { long curCacheMisCount; long blockedFlushCnt; long unblockedFlushCnt; + long blockedOffheapFlushCnt; + long unblockedOffheapFlushCnt; curEvictCount = blockCache.getStats().getEvictedCount(); tunerContext.setEvictCount(curEvictCount - evictCount); evictCount = curEvictCount; @@ -296,10 +304,22 @@ public class HeapMemoryManager { unblockedFlushCnt = unblockedFlushCount.getAndSet(0); tunerContext.setUnblockedFlushCount(unblockedFlushCnt); metricsHeapMemoryManager.updateUnblockedFlushCount(unblockedFlushCnt); + // TODO : add support for offheap metrics + blockedOffheapFlushCnt = blockedOffheapFlushCount.getAndSet(0); + tunerContext.setBlockOffheapFlushCount(blockedOffheapFlushCnt); + unblockedOffheapFlushCnt = unblockedFlushCount.getAndSet(0); + tunerContext.setUnblockedOffheapFlushCount(unblockedOffheapFlushCnt); + tunerContext.setCurBlockCacheUsed((float) blockCache.getCurrentSize() / maxHeapSize); metricsHeapMemoryManager.setCurBlockCacheSizeGauge(blockCache.getCurrentSize()); - long globalMemstoreHeapSize = regionServerAccounting.getGlobalMemstoreSize() - + regionServerAccounting.getGlobalMemstoreHeapOverhead(); + long globalMemstoreHeapSize; + if (regionServerAccounting.getMemoryType() == MemoryType.HEAP) { + globalMemstoreHeapSize = regionServerAccounting.getGlobalMemstoreSize() + + regionServerAccounting.getGlobalMemstoreHeapOverhead(); + } else { + // get only the heap overhead for offheap memstore + globalMemstoreHeapSize = regionServerAccounting.getGlobalMemstoreHeapOverhead(); + } tunerContext.setCurMemStoreUsed((float) globalMemstoreHeapSize / maxHeapSize); metricsHeapMemoryManager.setCurMemStoreSizeGauge(globalMemstoreHeapSize); tunerContext.setCurBlockCacheSize(blockCachePercent); @@ -352,14 +372,20 @@ public class HeapMemoryManager { metricsHeapMemoryManager.updateMemStoreDeltaSizeHistogram(memStoreDeltaSize); metricsHeapMemoryManager.updateBlockCacheDeltaSizeHistogram(blockCacheDeltaSize); long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize); + // we could have got an increase or decrease in size for the offheap memstore + // also if the flush had happened due to heap overhead. In that case it is ok + // to adjust the onheap memstore limit configs long newMemstoreSize = (long) (maxHeapSize * memstoreSize); LOG.info("Setting block cache heap size to " + newBlockCacheSize + " and memstore heap size to " + newMemstoreSize); blockCachePercent = blockCacheSize; blockCache.setMaxSize(newBlockCacheSize); globalMemStorePercent = memstoreSize; + // Internally sets it to RegionServerAccounting + // TODO : Set directly on RSAccounting?? memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize); for (HeapMemoryTuneObserver observer : tuneObservers) { + // Risky.. If this newMemstoreSize decreases we reduce the count in offheap chunk pool observer.onHeapMemoryTune(newMemstoreSize, newBlockCacheSize); } } @@ -374,14 +400,22 @@ public class HeapMemoryManager { @Override public void flushRequested(FlushType type, Region region) { switch (type) { - case ABOVE_HIGHER_MARK: + case ABOVE_ONHEAP_HIGHER_MARK: blockedFlushCount.incrementAndGet(); break; - case ABOVE_LOWER_MARK: + case ABOVE_ONHEAP_LOWER_MARK: unblockedFlushCount.incrementAndGet(); break; + // See if we really want this?? Will change or leave it + // after discussion + case ABOVE_OFFHEAP_HIGHER_MARK: + blockedOffheapFlushCount.incrementAndGet(); + break; + case ABOVE_OFFHEAP_LOWER_MARK: + unblockedOffheapFlushCount.incrementAndGet(); + break; default: - // In case of normal flush don't do any action. + // In case of any other flush don't do any action. break; } } @@ -395,12 +429,15 @@ public class HeapMemoryManager { public static final class TunerContext { private long blockedFlushCount; private long unblockedFlushCount; + private long blockedOffheapFlushCount; + private long unblockedOffheapFlushCount; private long evictCount; private long cacheMissCount; private float curBlockCacheUsed; private float curMemStoreUsed; private float curMemStoreSize; private float curBlockCacheSize; + private boolean offheapMemstore; public long getBlockedFlushCount() { return blockedFlushCount; @@ -418,6 +455,21 @@ public class HeapMemoryManager { this.unblockedFlushCount = unblockedFlushCount; } + public long getBlockedOffheapFlushCount() { + return this.blockedOffheapFlushCount; + } + + public void setBlockOffheapFlushCount(long blockedOffheapFlushCount) { + this.blockedOffheapFlushCount = blockedOffheapFlushCount; + } + + public long getUnblockedOffheapFlushCount() { + return this.unblockedOffheapFlushCount; + } + + public void setUnblockedOffheapFlushCount(long unblockedOFfheapFlushCount) { + this.unblockedOffheapFlushCount = unblockedOFfheapFlushCount; + } public long getEvictCount() { return evictCount; } @@ -465,6 +517,14 @@ public class HeapMemoryManager { public void setCurMemStoreUsed(float d) { this.curMemStoreUsed = d; } + + public void setIsOffheapMemstore(boolean offheapMemstore) { + this.offheapMemstore = offheapMemstore; + } + + public boolean isOffheapMemstore() { + return this.offheapMemstore; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index eb38a55..a5a6bc3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -238,23 +238,28 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { @Override public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { - int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); - if (newMaxCount != this.maxCount) { - // We need an adjustment in the chunks numbers - if (newMaxCount > this.maxCount) { - // Max chunks getting increased. Just change the variable. Later calls to getChunk() would - // create and add them to Q - LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - } else { - // Max chunks getting decreased. We may need to clear off some of the pooled chunks now - // itself. If the extra chunks are serving already, do not pool those when we get them back - LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - if (this.reclaimedChunks.size() > newMaxCount) { - synchronized (this) { - while (this.reclaimedChunks.size() > newMaxCount) { - this.reclaimedChunks.poll(); + if (!this.offheap) { + // Change this only for onheap memstore chunk pool. + // TODO : For offheap we don't do it now + int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); + if (newMaxCount != this.maxCount) { + // We need an adjustment in the chunks numbers + if (newMaxCount > this.maxCount) { + // Max chunks getting increased. Just change the variable. Later calls to getChunk() would + // create and add them to Q + LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + } else { + // Max chunks getting decreased. We may need to clear off some of the pooled chunks now + // itself. If the extra chunks are serving already, do not pool those when we get them + // back + LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + if (this.reclaimedChunks.size() > newMaxCount) { + synchronized (this) { + while (this.reclaimedChunks.size() > newMaxCount) { + this.reclaimedChunks.poll(); + } } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 15cf97c..d86b379 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -49,12 +49,10 @@ import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; @@ -90,10 +88,6 @@ class MemStoreFlusher implements FlushRequester { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Object blockSignal = new Object(); - protected long globalMemStoreLimit; - protected float globalMemStoreLimitLowMarkPercent; - protected long globalMemStoreLimitLowMark; - private long blockingWaitTime; private final LongAdder updatesBlockedMsHighWater = new LongAdder(); @@ -111,32 +105,18 @@ class MemStoreFlusher implements FlushRequester { this.server = server; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); - Pair pair = MemorySizeUtil.getGlobalMemstoreSize(conf); - this.globalMemStoreLimit = pair.getFirst(); - boolean onheap = pair.getSecond() == MemoryType.HEAP; - // When off heap memstore in use we configure the global off heap space for memstore as bytes - // not as % of max memory size. In such case, the lower water mark should be specified using the - // key "hbase.regionserver.global.memstore.size.lower.limit" which says % of the global upper - // bound and defaults to 95%. In on heap case also specifying this way is ideal. But in the past - // we used to take lower bound also as the % of xmx (38% as default). For backward compatibility - // for this deprecated config,we will fall back to read that config when new one is missing. - // Only for on heap case, do this fallback mechanism. For off heap it makes no sense. - // TODO When to get rid of the deprecated config? ie - // "hbase.regionserver.global.memstore.lowerLimit". Can get rid of this boolean passing then. - this.globalMemStoreLimitLowMarkPercent = MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf, - onheap); - this.globalMemStoreLimitLowMark = - (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); - this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); this.flushHandlers = new FlushHandler[handlerCount]; LOG.info("globalMemStoreLimit=" - + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1) + + TraditionalBinaryPrefix + .long2String(this.server.getRegionServerAccounting().getGlobalMemstoreLimit(), "", 1) + ", globalMemStoreLimitLowMark=" - + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1) - + ", Offheap=" + !onheap); + + TraditionalBinaryPrefix.long2String( + this.server.getRegionServerAccounting().getGlobalMemstoreLimitLowMark(), "", 1) + + ", Offheap=" + + !(this.server.getRegionServerAccounting().getMemoryType() == MemoryType.HEAP)); } public LongAdder getUpdatesBlockedMsHighWater() { @@ -251,9 +231,15 @@ class MemStoreFlusher implements FlushRequester { wakeupPending.set(false); // allow someone to wake us up again fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); if (fqe == null || fqe instanceof WakeupFlushThread) { - if (isAboveLowWaterMark()) { + FlushType type = isAboveLowWaterMark(); + if (type != null) { LOG.debug("Flush thread woke up because memory above low water=" - + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1)); + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemstoreLimitLowMark(), "", 1)); + // For offheap memstore, even if the lower water mark was breached due to heap overhead + // we still select the regions based on the region's memstore data size. + // TODO : If we want to decide based on heap over head it can be done without tracking + // it per region. if (!flushOneForGlobalPressure()) { // Wasn't able to flush any region, but we're above low water mark // This is unlikely to happen, but might happen when closing the @@ -355,17 +341,15 @@ class MemStoreFlusher implements FlushRequester { /** * Return true if global memory usage is above the high watermark */ - private boolean isAboveHighWaterMark() { - return server.getRegionServerAccounting().getGlobalMemstoreSize() - + server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimit; + private FlushType isAboveHighWaterMark() { + return server.getRegionServerAccounting().isAboveMemstoreHighWaterMark(); } /** - * Return true if we're above the high watermark + * Return true if we're above the low watermark */ - private boolean isAboveLowWaterMark() { - return server.getRegionServerAccounting().getGlobalMemstoreSize() + server - .getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimitLowMark; + private FlushType isAboveLowWaterMark() { + return server.getRegionServerAccounting().isAboveMemstoreLowWaterMark(); } @Override @@ -548,9 +532,16 @@ class MemStoreFlusher implements FlushRequester { } private void notifyFlushRequest(Region region, boolean emergencyFlush) { - FlushType type = FlushType.NORMAL; + FlushType type = null; if (emergencyFlush) { - type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK; + type = isAboveHighWaterMark(); + if (type == null) { + type = isAboveLowWaterMark(); + } + } + // if still type is null, then it is normal + if (type == null) { + type = FlushType.NORMAL; } for (FlushRequestListener listener : flushRequestListeners) { listener.flushRequested(type, region); @@ -586,7 +577,8 @@ class MemStoreFlusher implements FlushRequester { */ public void reclaimMemStoreMemory() { TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory"); - if (isAboveHighWaterMark()) { + FlushType flushType = isAboveHighWaterMark(); + if (flushType != null) { if (Trace.isTracing()) { scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark."); } @@ -596,17 +588,50 @@ class MemStoreFlusher implements FlushRequester { long startTime = 0; boolean interrupted = false; try { - while (isAboveHighWaterMark() && !server.isStopped()) { + flushType = isAboveHighWaterMark(); + while (flushType != null && !server.isStopped()) { if (!blocked) { startTime = EnvironmentEdgeManager.currentTime(); - LOG.info("Blocking updates on " + server.toString() + ": the global memstore size " - + TraditionalBinaryPrefix.long2String( + if (server.getRegionServerAccounting().getMemoryType() == MemoryType.HEAP) { + LOG.info("Blocking updates on " + server.toString() + ": the global memstore size " + + TraditionalBinaryPrefix.long2String( server.getRegionServerAccounting().getGlobalMemstoreSize(), "", 1) - + " + global memstore heap overhead " - + TraditionalBinaryPrefix.long2String( + + " + global memstore heap overhead " + + TraditionalBinaryPrefix.long2String( server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead(), "", 1) - + " is >= than blocking " - + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size"); + + " is >= than blocking " + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemstoreLimit(), "", 1) + + " size"); + } else { + switch (flushType) { + case ABOVE_OFFHEAP_HIGHER_MARK: + LOG.info("Blocking updates on " + server.toString() + + ": the global offheap memstore size " + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemstoreSize(), "", 1) + + " + global memstore heap overhead " + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead(), "", 1) + + " is >= than blocking " + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemstoreLimit(), "", 1) + + " size"); + break; + case ABOVE_ONHEAP_HIGHER_MARK: + LOG.info( + "Blocking updates on " + server.toString() + " : global memstore heap overhead " + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead(), "", 1) + + " is >= than blocking " + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getOnheapGlobalMemstoreLimit(), "", 1) + + " size"); + break; + default: + break; + } + } } blocked = true; wakeupFlushThread(); @@ -620,6 +645,7 @@ class MemStoreFlusher implements FlushRequester { } long took = EnvironmentEdgeManager.currentTime() - start; LOG.warn("Memstore is above high water mark and block " + took + "ms"); + flushType = isAboveHighWaterMark(); } } finally { if (interrupted) { @@ -635,7 +661,7 @@ class MemStoreFlusher implements FlushRequester { LOG.info("Unblocking updates for server " + server.toString()); } } - } else if (isAboveLowWaterMark()) { + } else if (isAboveLowWaterMark() != null) { wakeupFlushThread(); } scope.close(); @@ -685,14 +711,12 @@ class MemStoreFlusher implements FlushRequester { */ @Override public void setGlobalMemstoreLimit(long globalMemStoreSize) { - this.globalMemStoreLimit = globalMemStoreSize; - this.globalMemStoreLimitLowMark = - (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize); + this.server.getRegionServerAccounting().setGlobalMemstoreLimits(globalMemStoreSize); reclaimMemStoreMemory(); } public long getMemoryLimit() { - return this.globalMemStoreLimit; + return this.server.getRegionServerAccounting().getGlobalMemstoreLimit(); } interface FlushQueueEntry extends Delayed { @@ -821,5 +845,11 @@ class MemStoreFlusher implements FlushRequester { } enum FlushType { - NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK; + NORMAL, + ABOVE_ONHEAP_LOWER_MARK, /* happens due to lower mark breach of onheap memstore settings + An offheap memstore can even breach the onheap_lower_mark*/ + ABOVE_ONHEAP_HIGHER_MARK,/* happens due to higher mark breach of onheap memstore settings + An offheap memstore can even breach the onheap_higher_mark*/ + ABOVE_OFFHEAP_LOWER_MARK,/* happens due to lower mark breach of offheap memstore settings*/ + ABOVE_OFFHEAP_HIGHER_MARK;/*/* happens due to higer mark breach of offheap memstore settings*/ } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index cb8551f..011584d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -18,12 +18,16 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.lang.management.MemoryType; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * RegionServerAccounting keeps record of some basic real time information about @@ -40,6 +44,71 @@ public class RegionServerAccounting { private final ConcurrentMap replayEditsPerRegion = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + private final Configuration conf; + + private long globalMemStoreLimit; + private final float globalMemStoreLimitLowMarkPercent; + private long globalMemStoreLimitLowMark; + private final MemoryType memType; + private long globalOnHeapMemstoreLimit; + private long globalOnHeapMemstoreLimitLowMark; + + public RegionServerAccounting(Configuration conf) { + this.conf = conf; + Pair globalMemstoreSizePair = MemorySizeUtil.getGlobalMemstoreSize(conf); + this.globalMemStoreLimit = globalMemstoreSizePair.getFirst(); + this.memType = globalMemstoreSizePair.getSecond(); + this.globalMemStoreLimitLowMarkPercent = + MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf, this.memType == MemoryType.HEAP); + // When off heap memstore in use we configure the global off heap space for memstore as bytes + // not as % of max memory size. In such case, the lower water mark should be specified using the + // key "hbase.regionserver.global.memstore.size.lower.limit" which says % of the global upper + // bound and defaults to 95%. In on heap case also specifying this way is ideal. But in the past + // we used to take lower bound also as the % of xmx (38% as default). For backward compatibility + // for this deprecated config,we will fall back to read that config when new one is missing. + // Only for on heap case, do this fallback mechanism. For off heap it makes no sense. + // TODO When to get rid of the deprecated config? ie + // "hbase.regionserver.global.memstore.lowerLimit". Can get rid of this boolean passing then. + this.globalMemStoreLimitLowMark = + (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); + this.globalOnHeapMemstoreLimit = MemorySizeUtil.getOnheapAllowedGlobalMemstoreSize(conf); + this.globalOnHeapMemstoreLimitLowMark = + (long) (this.globalOnHeapMemstoreLimit * this.globalMemStoreLimitLowMarkPercent); + } + + public long getGlobalMemstoreLimit() { + return this.globalMemStoreLimit; + } + + public long getOnheapGlobalMemstoreLimit() { + return this.globalOnHeapMemstoreLimit; + } + + // Called by the tuners. + public void setGlobalMemstoreLimits(long newGlobalMemstoreLimit) { + if (this.memType == MemoryType.HEAP) { + this.globalMemStoreLimit = newGlobalMemstoreLimit; + this.globalMemStoreLimitLowMark = + (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); + } else { + this.globalOnHeapMemstoreLimit = newGlobalMemstoreLimit; + this.globalOnHeapMemstoreLimitLowMark = + (long) (this.globalOnHeapMemstoreLimit * this.globalMemStoreLimitLowMarkPercent); + } + } + + public MemoryType getMemoryType() { + return this.memType; + } + + public long getGlobalMemstoreLimitLowMark() { + return this.globalMemStoreLimitLowMark; + } + + public float getGlobalMemstoreLimitLowMarkPercent() { + return this.globalMemStoreLimitLowMarkPercent; + } + /** * @return the global Memstore size in the RegionServer */ @@ -65,6 +134,50 @@ public class RegionServerAccounting { globalMemstoreHeapOverhead.addAndGet(-memStoreSize.getHeapOverhead()); } + /** + * Return true if we are above the memstore high water mark + * @return + */ + public FlushType isAboveMemstoreHighWaterMark() { + if (memType == MemoryType.HEAP) { + if (getGlobalMemstoreSize() + getGlobalMemstoreHeapOverhead() >= globalMemStoreLimit) { + return FlushType.ABOVE_ONHEAP_HIGHER_MARK; + } + } else { + // If the configured memstore is offheap, check for two things + // 1) If the global memstore data size is greater than the configured + // 'hbase.regionserver.offheap.global.memstore.size' + // 2) If the global memstore heap size is greater than the configured onheap + // global memstore limit 'hbase.regionserver.global.memstore.size'. + // We do this to avoid OOME incase of scenarios where the heap is occupied with + // lot of onheap references to the cells in memstore + if (getGlobalMemstoreSize() >= globalMemStoreLimit) { + return FlushType.ABOVE_OFFHEAP_HIGHER_MARK; + } else if (getGlobalMemstoreHeapOverhead() >= this.globalOnHeapMemstoreLimit) { + return FlushType.ABOVE_ONHEAP_HIGHER_MARK; + } + } + return null; + } + + /** + * Return true if we're above the low watermark + */ + public FlushType isAboveMemstoreLowWaterMark() { + if (memType == MemoryType.HEAP) { + if (getGlobalMemstoreSize() + getGlobalMemstoreHeapOverhead() >= globalMemStoreLimitLowMark) { + return FlushType.ABOVE_ONHEAP_LOWER_MARK; + } + } else { + if (getGlobalMemstoreSize() >= globalMemStoreLimitLowMark) { + return FlushType.ABOVE_OFFHEAP_LOWER_MARK; + } else if (getGlobalMemstoreHeapOverhead() >= globalOnHeapMemstoreLimitLowMark) { + return FlushType.ABOVE_ONHEAP_LOWER_MARK; + } + } + return null; + } + /*** * Add memStoreSize to replayEditsPerRegion. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index fb30d07..9c12ece 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -21,8 +21,7 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE import java.io.IOException; import java.io.InterruptedIOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; +import java.lang.management.MemoryType; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; @@ -289,9 +288,9 @@ public abstract class AbstractFSWAL implements WAL { return Long.parseLong(chompedPath); } - private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) { - MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - return Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize); + private int calculateMaxLogFiles(Configuration conf, long logRollSize) { + Pair globalMemstoreSize = MemorySizeUtil.getGlobalMemstoreSize(conf); + return Math.round(globalMemstoreSize.getFirst() * 2 / logRollSize); } protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, @@ -370,14 +369,13 @@ public abstract class AbstractFSWAL implements WAL { this.logrollsize = (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); - float memstoreRatio = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false); boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; if (maxLogsDefined) { LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); } this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", - Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); + Math.max(32, calculateMaxLogFiles(conf, logrollsize))); LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index fe0d350..a3cbec7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -170,7 +170,7 @@ public class TestHRegionReplayEvents { rss = mock(RegionServerServices.class); when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); when(rss.getConfiguration()).thenReturn(CONF); - when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting()); + when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER .toString(); ExecutorService es = new ExecutorService(string); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index f620eb0..29f4f81 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -65,8 +65,10 @@ public class TestHeapMemoryManager { conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.02f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.03f); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0), - new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub()); + new MemstoreFlusherStub(0), new RegionServerStub(conf), + regionServerAccounting); assertFalse(manager.isTunerOn()); } @@ -76,21 +78,24 @@ public class TestHeapMemoryManager { conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.02f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.03f); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0), - new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub()); + new MemstoreFlusherStub(0), new RegionServerStub(conf), + regionServerAccounting); assertFalse(manager.isTunerOn()); } @Test public void testWhenMemstoreAndBlockCacheMaxMinChecksFails() throws Exception { BlockCacheStub blockCache = new BlockCacheStub(0); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub(0); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub(0); try { new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), regionServerAccounting); fail(); } catch (RuntimeException e) { } @@ -99,7 +104,7 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); try { new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), regionServerAccounting); fail(); } catch (RuntimeException e) { } @@ -107,18 +112,19 @@ public class TestHeapMemoryManager { @Test public void testWhenClusterIsWriteHeavyWithEmptyMemstore() throws Exception { - BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Empty block cache and memstore - blockCache.setTestBlockSize(0); - regionServerAccounting.setTestMemstoreSize(0); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty block cache and memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize(0); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, @@ -127,11 +133,11 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); - memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); // Allow the tuner to run once and do necessary memory up Thread.sleep(1500); @@ -141,13 +147,60 @@ public class TestHeapMemoryManager { } @Test - public void testWhenClusterIsReadHeavyWithEmptyBlockCache() throws Exception { + public void testHeapMemoryManagerWhenOffheapFlushesHappenUnderReadHeavyCase() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Empty block cache and memstore - blockCache.setTestBlockSize(0); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty memstore and but nearly filled block cache + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); regionServerAccounting.setTestMemstoreSize(0); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + long oldMemstoreLowerMarkSize = 7 * oldMemstoreHeapSize / 10; + long maxTuneSize = oldMemstoreHeapSize - (oldMemstoreLowerMarkSize + oldMemstoreHeapSize) / 2; + float maxStepValue = DefaultHeapMemoryTuner.DEFAULT_MIN_STEP_VALUE; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + // do some offheap flushes also. So there should be decrease in memstore but + // not as that when we don't have offheap flushes + memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + // Allow the tuner to run once and do necessary memory up + waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(maxStepValue, oldBlockCacheSize, blockCache.maxSize); + oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + oldBlockCacheSize = blockCache.maxSize; + oldMemstoreLowerMarkSize = 7 * oldMemstoreHeapSize / 10; + // Do some more evictions before the next run of HeapMemoryTuner + blockCache.evictBlock(null); + // Allow the tuner to run once and do necessary memory up + waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(maxStepValue, oldBlockCacheSize, blockCache.maxSize); + } + + @Test + public void testWhenClusterIsReadHeavyWithEmptyBlockCache() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -155,6 +208,12 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty block cache and memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize(0); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf), regionServerAccounting); @@ -175,11 +234,6 @@ public class TestHeapMemoryManager { @Test public void testWhenClusterIsWriteHeavy() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Empty block cache and but nearly filled memstore - blockCache.setTestBlockSize(0); - regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -187,6 +241,12 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty block cache and but nearly filled memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf), regionServerAccounting); @@ -194,7 +254,7 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); @@ -208,7 +268,7 @@ public class TestHeapMemoryManager { oldMemstoreHeapSize = memStoreFlusher.memstoreSize; oldBlockCacheSize = blockCache.maxSize; // Do some more flushes before the next run of HeapMemoryTuner - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); // Allow the tuner to run once and do necessary memory up @@ -220,13 +280,44 @@ public class TestHeapMemoryManager { } @Test + public void testWhenClusterIsWriteHeavyWithOffheapMemstore() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty block cache and but nearly filled memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), regionServerAccounting); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); + // this should not change anything with onheap memstore + memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + // Allow the tuner to run once and do necessary memory up + Thread.sleep(1500); + // No changes should be made by tuner as we already have lot of empty space + assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertEquals(oldBlockCacheSize, blockCache.maxSize); + } + + @Test public void testWhenClusterIsReadHeavy() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Empty memstore and but nearly filled block cache - blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); - regionServerAccounting.setTestMemstoreSize(0); Configuration conf = HBaseConfiguration.create(); conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); @@ -235,9 +326,15 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty memstore and but nearly filled block cache + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); + regionServerAccounting.setTestMemstoreSize(0); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; long oldMemstoreLowerMarkSize = 7 * oldMemstoreHeapSize / 10; @@ -272,12 +369,6 @@ public class TestHeapMemoryManager { @Test public void testWhenClusterIsHavingMoreWritesThanReads() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Both memstore and block cache are nearly filled - blockCache.setTestBlockSize(0); - regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); - blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -285,6 +376,13 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Both memstore and block cache are nearly filled + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf), regionServerAccounting); @@ -292,7 +390,7 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); @@ -303,7 +401,7 @@ public class TestHeapMemoryManager { assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize); assertEquals(oldBlockCacheSize, blockCache.maxSize); // Do some more flushes before the next run of HeapMemoryTuner - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); @@ -319,11 +417,6 @@ public class TestHeapMemoryManager { public void testBlockedFlushesIncreaseMemstoreInSteadyState() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Both memstore and block cache are nearly filled - blockCache.setTestBlockSize(0); - regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); - blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -331,6 +424,11 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + // Both memstore and block cache are nearly filled + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf), regionServerAccounting); @@ -338,7 +436,7 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); @@ -350,7 +448,7 @@ public class TestHeapMemoryManager { assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize); assertEquals(oldBlockCacheSize, blockCache.maxSize); // Flushes that block updates - memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; memStoreFlusher.requestFlush(null, false); blockCache.evictBlock(null); blockCache.evictBlock(null); @@ -379,7 +477,7 @@ public class TestHeapMemoryManager { HeapMemoryTuner.class); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); // Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner @@ -412,7 +510,7 @@ public class TestHeapMemoryManager { conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class, HeapMemoryTuner.class); HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); CustomHeapMemoryTuner.memstoreSize = 0.78f; @@ -438,7 +536,7 @@ public class TestHeapMemoryManager { conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class, HeapMemoryTuner.class); HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); long oldMemstoreSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); @@ -473,7 +571,7 @@ public class TestHeapMemoryManager { try { heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); fail("Should have failed as the collective heap memory need is above 80%"); } catch (Exception e) { } @@ -482,7 +580,7 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.6f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.6f); heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); long oldMemstoreSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); @@ -490,7 +588,7 @@ public class TestHeapMemoryManager { CustomHeapMemoryTuner.memstoreSize = 0.4f; CustomHeapMemoryTuner.blockCacheSize = 0.4f; // Allow the tuner to run once and do necessary memory up - Thread.sleep(1500); + Thread.sleep(1500); // The size should not get changes as the collection of memstore size and L1 and L2 block cache // size will cross the ax allowed 80% mark assertEquals(oldMemstoreSize, memStoreFlusher.memstoreSize); @@ -508,7 +606,8 @@ public class TestHeapMemoryManager { assertEquals(expected, currentHeapSpace); } - private void assertHeapSpaceDelta(double expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) { + private void assertHeapSpaceDelta(double expectedDeltaPercent, long oldHeapSpace, + long newHeapSpace) { double expctedMinDelta = (double) (this.maxHeapSize * expectedDeltaPercent); // Tolerable error double error = 0.95; @@ -757,6 +856,9 @@ public class TestHeapMemoryManager { } private static class RegionServerAccountingStub extends RegionServerAccounting { + public RegionServerAccountingStub(Configuration conf) { + super(conf); + } private long testMemstoreSize = 0; @Override public long getGlobalMemstoreSize() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java new file mode 100644 index 0000000..4debdc7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java @@ -0,0 +1,120 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRegionServerAccounting { + + @Test + public void testOnheapMemstoreHigherWaterMarkLimits() { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK, + regionServerAccounting.isAboveMemstoreHighWaterMark()); + } + + @Test + public void testOnheapMemstoreLowerWaterMarkLimits() { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK, + regionServerAccounting.isAboveMemstoreLowWaterMark()); + } + + @Test + public void testOffheapMemstoreHigherWaterMarkLimitsDueToDataSize() { + Configuration conf = HBaseConfiguration.create(); + // setting 1G as offheap data size + conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l)); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + // this will breach offheap limit as data size is higher and not due to heap size + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_OFFHEAP_HIGHER_MARK, + regionServerAccounting.isAboveMemstoreHighWaterMark()); + } + + @Test + public void testOffheapMemstoreHigherWaterMarkLimitsDueToHeapSize() { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f); + // setting 1G as offheap data size + conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l)); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + // this will breach higher limit as heap size is higher and not due to offheap size + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l), (long) (2l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK, + regionServerAccounting.isAboveMemstoreHighWaterMark()); + } + + @Test + public void testOffheapMemstoreLowerWaterMarkLimitsDueToDataSize() { + Configuration conf = HBaseConfiguration.create(); + // setting 1G as offheap data size + conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l)); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + // this will breach offheap limit as data size is higher and not due to heap size + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_OFFHEAP_LOWER_MARK, + regionServerAccounting.isAboveMemstoreLowWaterMark()); + } + + @Test + public void testOffheapMemstoreLowerWaterMarkLimitsDueToHeapSize() { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f); + // setting 1G as offheap data size + conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l)); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + // this will breach higher limit as heap size is higher and not due to offheap size + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l), (long) (2l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK, + regionServerAccounting.isAboveMemstoreLowWaterMark()); + } +}