Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1517456) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -699,7 +699,7 @@ public static final String HFILE_BLOCK_CACHE_SIZE_KEY = "hfile.block.cache.size"; - public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.25f; + public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f; /* * Minimum percentage of free heap necessary for a successful cluster startup. Index: hbase-common/src/main/resources/hbase-default.xml =================================================================== --- hbase-common/src/main/resources/hbase-default.xml (revision 1517456) +++ hbase-common/src/main/resources/hbase-default.xml (working copy) @@ -237,19 +237,19 @@ The HLog file writer implementation. - hbase.regionserver.global.memstore.upperLimit + hbase.regionserver.global.memstore.size 0.4 Maximum size of all memstores in a region server before new updates are blocked and flushes are forced. Defaults to 40% of heap. Updates are blocked and flushes are forced until size of all memstores - in a region server hits hbase.regionserver.global.memstore.lowerLimit. + in a region server hits hbase.regionserver.global.memstore.size.lower.limit. - hbase.regionserver.global.memstore.lowerLimit - 0.38 + hbase.regionserver.global.memstore.size.lower.limit + 0.95 Maximum size of all memstores in a region server before - flushes are forced. Defaults to 38% of heap. - This value equal to hbase.regionserver.global.memstore.upperLimit causes + flushes are forced. Defaults to 95% of hbase.regionserver.global.memstore.size. + A 100% value for this value causes the minimum possible flushing to occur when updates are blocked due to memstore limiting. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (revision 1517456) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (working copy) @@ -347,7 +347,7 @@ * @param conf The current configuration. * @return The block cache or null. */ - private static synchronized BlockCache instantiateBlockCache(Configuration conf) { + public static synchronized BlockCache instantiateBlockCache(Configuration conf) { if (globalBlockCache != null) return globalBlockCache; if (blockCacheDisabled) return null; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (revision 1517456) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (working copy) @@ -38,7 +38,7 @@ * **/ @InterfaceAudience.Private -public class DoubleBlockCache implements BlockCache, HeapSize { +public class DoubleBlockCache implements ResizableBlockCache, HeapSize { static final Log LOG = LogFactory.getLog(DoubleBlockCache.class.getName()); @@ -172,4 +172,9 @@ return onHeapCache.getBlockCount() + offHeapCache.getBlockCount(); } + @Override + public void setMaxSize(long size) { + this.onHeapCache.setMaxSize(size); + } + } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (revision 1517456) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (working copy) @@ -95,7 +95,7 @@ * to the relative sizes and usage. */ @InterfaceAudience.Private -public class LruBlockCache implements BlockCache, HeapSize { +public class LruBlockCache implements ResizableBlockCache, HeapSize { static final Log LOG = LogFactory.getLog(LruBlockCache.class); @@ -272,6 +272,7 @@ statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); } + @Override public void setMaxSize(long maxSize) { this.maxSize = maxSize; if(this.size.get() > acceptableSize() && !evictionInProgress) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java (working copy) @@ -0,0 +1,31 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private +public interface ResizableBlockCache extends BlockCache { + + /** + * Sets the max heap size that can be used by the BlockCache. + * @param size The max heap size. + */ + void setMaxSize(long size); +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryBalancerImpl.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryBalancerImpl.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryBalancerImpl.java (working copy) @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.AutoTunerContext; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.AutoTunerResult; + +public class DefaultHeapMemoryBalancerImpl implements HeapMemoryBalancer { + + private static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step"; + private static final float STEP_DEFAULT_VALUE = 0.2f; + + private Configuration conf; + private float step = STEP_DEFAULT_VALUE; + + @Override + public AutoTunerResult balance(AutoTunerContext context) { + long blockedFlushCount = context.getBlockedFlushCount(); + long unblockedFlushCount = context.getUnblockedFlushCount(); + long evictCount = context.getEvictCount(); + boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0; + boolean blockCacheSufficient = evictCount == 0; + if (memstoreSufficient && blockCacheSufficient) { + return new AutoTunerResult(false); + } + AutoTunerResult result = new AutoTunerResult(true); + if (memstoreSufficient) { + // Increase the block cache size and corresponding decrease in memstore size + result.setBlockCacheSize(context.getCurBlockCacheSize() + step); + result.setMemstoreSize(context.getCurMemStoreSize() - step); + } else if (blockCacheSufficient) { + // Increase the block cache size and corresponding decrease in memstore size + result.setBlockCacheSize(context.getCurBlockCacheSize() - step); + result.setMemstoreSize(context.getCurMemStoreSize() + step); + } else { + return new AutoTunerResult(false); + // As of now not making any tuning in write/read heavy scenario. + } + return result; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.step = conf.getFloat(STEP_KEY, STEP_DEFAULT_VALUE); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryBalancer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryBalancer.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryBalancer.java (working copy) @@ -0,0 +1,40 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.AutoTunerContext; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.AutoTunerResult; + +/** + * + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface HeapMemoryBalancer extends Configurable { + + /** + * + * @param context + * @return + */ + AutoTunerResult balance(AutoTunerContext context); +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java (working copy) @@ -0,0 +1,316 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * + */ +@InterfaceAudience.Private +public class HeapMemoryManager { + private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class); + private static final int CONVERT_TO_PERCENTAGE = 100; + + private static final String MEMSTORE_SIZE_MAX_RANGE_KEY = + "hbase.regionserver.global.memstore.size.max.range"; + private static final String MEMSTORE_SIZE_MIN_RANGE_KEY = + "hbase.regionserver.global.memstore.size.min.range"; + + private static final String BLOCK_CACHE_SIZE_MAX_RANGE_KEY = + "hfile.block.cache.size.max.range"; + private static final String BLOCK_CACHE_SIZE_MIN_RANGE_KEY = + "hfile.block.cache.size.min.range"; + + private float globalMemStorePercent; + private float globalMemStorePercentMinRange; + private float globalMemStorePercentMaxRange; + + private float blockCachePercent; + private float blockCachePercentMinRange; + private float blockCachePercentMaxRange; + + private final ResizableBlockCache blockCache; + private final MemStoreFlusher memStoreFlusher; + private final HRegionServer hrs; + + private HeapMemoryAutoTuner autoTuner = null; + private boolean autoTunerOn = true; + + private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + + public HeapMemoryManager(ResizableBlockCache blockCache, MemStoreFlusher memStoreFlusher, + HRegionServer hrs) { + this.blockCache = blockCache; + this.memStoreFlusher = memStoreFlusher; + this.hrs = hrs; + initMemstoreSizeRanges(hrs.getConfiguration()); + initBlockCacheSizes(hrs.getConfiguration()); + doInitChecks(); + } + + private void initMemstoreSizeRanges(Configuration conf) { + globalMemStorePercent = MemStoreFlusher.getGlobalMemStorePercent(conf); + globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, + globalMemStorePercent); + globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, + globalMemStorePercent); + if (globalMemStorePercent < globalMemStorePercentMinRange) { + LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent + + ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY + + " because supplied value greater than initial memstore size value."); + globalMemStorePercentMinRange = globalMemStorePercent; + } + if (globalMemStorePercent > globalMemStorePercentMaxRange) { + LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent + + ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY + + " because supplied value less than initial memstore size value."); + globalMemStorePercentMaxRange = globalMemStorePercent; + } + if (globalMemStorePercent == globalMemStorePercentMinRange + && globalMemStorePercent == globalMemStorePercentMaxRange) { + autoTunerOn = false; + } + } + + private void doInitChecks() { + int gml = (int)(globalMemStorePercent * CONVERT_TO_PERCENTAGE); + int bcul = (int)(blockCachePercent * CONVERT_TO_PERCENTAGE); + if (CONVERT_TO_PERCENTAGE - (gml + bcul) < + (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) { + throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds " + + "the threshold required for successful cluster operation. " + + "The combined value cannot exceed 0.8. Please check " + "the settings for " + + MemStoreFlusher.MEMSTORE_SIZE_KEY + " and " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + + " in your configuration. " + MemStoreFlusher.MEMSTORE_SIZE_KEY + " is " + + globalMemStorePercent + " and " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + " is " + + blockCachePercent); + } + gml = (int)(globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE); + bcul = (int)(globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE); + if (CONVERT_TO_PERCENTAGE - (gml + bcul) < + (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) { + throw new RuntimeException(); + } + gml = (int)(globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE); + bcul = (int)(globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE); + if (CONVERT_TO_PERCENTAGE - (gml + bcul) < + (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) { + throw new RuntimeException(); + } + } + + private void initBlockCacheSizes(Configuration conf) { + blockCachePercent = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercent); + blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercent); + if (blockCachePercent < blockCachePercentMinRange) { + LOG.warn("Setting " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " to " + blockCachePercent + + ", same value as " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + + " because supplied value greater than initial block cache size."); + blockCachePercentMinRange = blockCachePercent; + } + if (blockCachePercent > blockCachePercentMaxRange) { + LOG.warn("Setting " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " to " + blockCachePercent + + ", same value as " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + + " because supplied value less than initial block cache size."); + blockCachePercentMaxRange = blockCachePercent; + } + if (blockCachePercent == blockCachePercentMinRange + && blockCachePercent == blockCachePercentMaxRange) { + autoTunerOn = false; + } + } + + public void start() { + if (autoTunerOn) { + LOG.info("Starting HeapMemoryAutoTuner."); + this.autoTuner = new HeapMemoryAutoTuner(); + Threads.setDaemonThreadRunning(autoTuner.getThread()); + // Register HeapMemoryAutoTuner as a memstore flush listener + memStoreFlusher.registerFlushListener(autoTuner); + } + } + + private class HeapMemoryAutoTuner extends Chore implements MemstoreFlushListener { + private static final String PERIOD = "hbase.regionserver.heapmemory.autotuner.period"; + private static final String HBASE_RS_MEMORY_BALANCER_CLASS = "hbase.regionserver.memory.balancer.class"; + private static final int DEFAULT_PERIOD = 5 * 60 * 1000; // Default is 5 mins + + private HeapMemoryBalancer heapMemoryBalancer; + + private AtomicLong blockedFlushCount = new AtomicLong(); + private AtomicLong unblockedFlushCount = new AtomicLong(); + private long evictCount = 0L; + + public HeapMemoryAutoTuner() { + super(hrs.getServerName() + "-HeapMemoryAutoTuner", hrs.getConfiguration().getInt(PERIOD, + DEFAULT_PERIOD), hrs); + Class balancerKlass = hrs.getConfiguration().getClass( + HBASE_RS_MEMORY_BALANCER_CLASS, DefaultHeapMemoryBalancerImpl.class, + HeapMemoryBalancer.class); + heapMemoryBalancer = ReflectionUtils.newInstance(balancerKlass, hrs.getConfiguration()); + } + + @Override + protected void chore() { + evictCount = blockCache.getStats().getEvictedCount() - evictCount; + AutoTunerContext context = createTunerContext(); + AutoTunerResult result = this.heapMemoryBalancer.balance(context); + if (result.needsTuning()) { + float memstoreSize = result.getMemstoreSize(); + float blockCacheSize = result.getBlockCacheSize(); + LOG.debug("From HeapMemoryBalancer new memstoreSize: " + memstoreSize + "%. new blockCacheSize: " + blockCacheSize + "%"); + if (1 - (memstoreSize + blockCacheSize) < HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD) { + LOG.warn("Current heap configuration from HeapMemoryBalancer exceeds " + + "the threshold required for successful cluster operation. " + + "The combined value cannot exceed 0.8. " + MemStoreFlusher.MEMSTORE_SIZE_KEY + + " is " + memstoreSize + " and " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + " is " + + blockCacheSize); + } else { + long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize); + LOG.info("Setting block cache heap size to " + newBlockCacheSize); + blockCachePercent = blockCacheSize; + blockCache.setMaxSize(newBlockCacheSize); + long newMemstoreSize = (long) (maxHeapSize * memstoreSize); + LOG.info("Setting memstore heap size to " + newMemstoreSize); + globalMemStorePercent = memstoreSize; + memStoreFlusher.setGlobalMemstoreSize(newMemstoreSize); + } + } + } + + private AutoTunerContext createTunerContext() { + AutoTunerContext context = new AutoTunerContext(); + context.setBlockedFlushCount(blockedFlushCount.getAndSet(0)); + context.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0)); + context.setEvictCount(evictCount); + context.setCurBlockCacheSize(blockCachePercent); + context.setCurMemStoreSize(globalMemStorePercent); + return context; + } + + @Override + public void flushCompleted(FlushType type, HRegion region) { + switch (type) { + case ABOVE_HIGHER_MARK: + blockedFlushCount.incrementAndGet(); + break; + case ABOVE_LOWER_MARK: + unblockedFlushCount.incrementAndGet(); + } + } + } + + /** + * + */ + public static final class AutoTunerContext { + private long blockedFlushCount; + private long unblockedFlushCount; + private long evictCount; + private float curMemStoreSize; + private float curBlockCacheSize; + + public AutoTunerContext() { + } + + public long getBlockedFlushCount() { + return blockedFlushCount; + } + + public void setBlockedFlushCount(long blockedFlushCount) { + this.blockedFlushCount = blockedFlushCount; + } + + public long getUnblockedFlushCount() { + return unblockedFlushCount; + } + + public void setUnblockedFlushCount(long unblockedFlushCount) { + this.unblockedFlushCount = unblockedFlushCount; + } + + public long getEvictCount() { + return evictCount; + } + + public void setEvictCount(long evictCount) { + this.evictCount = evictCount; + } + + public float getCurMemStoreSize() { + return curMemStoreSize; + } + + public void setCurMemStoreSize(float curMemStoreSize) { + this.curMemStoreSize = curMemStoreSize; + } + + public float getCurBlockCacheSize() { + return curBlockCacheSize; + } + + public void setCurBlockCacheSize(float curBlockCacheSize) { + this.curBlockCacheSize = curBlockCacheSize; + } + } + + public static final class AutoTunerResult { + private float memstoreSize; + private float blockCacheSize; + private final boolean needsTuning; + + public AutoTunerResult(boolean needsTuning) { + this.needsTuning = needsTuning; + } + + public float getMemstoreSize() { + return memstoreSize; + } + + public void setMemstoreSize(float memstoreSize) { + this.memstoreSize = memstoreSize; + } + + public float getBlockCacheSize() { + return blockCacheSize; + } + + public void setBlockCacheSize(float blockCacheSize) { + this.blockCacheSize = blockCacheSize; + } + + public boolean needsTuning() { + return needsTuning; + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1517456) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -101,7 +100,9 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; @@ -259,6 +260,8 @@ // Cache flushing protected MemStoreFlusher cacheFlusher; + protected HeapMemoryManager hMemManager; + // catalog tracker protected CatalogTracker catalogTracker; @@ -1197,6 +1200,7 @@ spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); startServiceThreads(); + startHeapMemoryManager(); LOG.info("Serving as " + this.serverNameFromMasterPOV + ", RpcServer on " + this.isa + ", sessionid=0x" + @@ -1579,6 +1583,15 @@ splitLogWorker.start(); } + private void startHeapMemoryManager() { + BlockCache blockCache = CacheConfig.instantiateBlockCache(conf); + if (blockCache instanceof ResizableBlockCache) { + this.hMemManager = new HeapMemoryManager((ResizableBlockCache) blockCache, this.cacheFlusher, + this); + this.hMemManager.start(); + } + } + /** * Puts up the webui. * @return Returns final port -- maybe different from what we started with. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (revision 1517456) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (working copy) @@ -194,8 +194,7 @@ } long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax(); - long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax, - MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf); + long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf)); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1517456) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -21,9 +21,11 @@ import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; +import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -65,6 +67,18 @@ @InterfaceAudience.Private class MemStoreFlusher implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); + static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size"; + private static final String MEMSTORE_SIZE_OLD_KEY = + "hbase.regionserver.global.memstore.upperLimit"; + private static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY = + "hbase.regionserver.global.memstore.size.lower.limit"; + private static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY = + "hbase.regionserver.global.memstore.lowerLimit"; + + private static final float DEFAULT_MEMSTORE_SIZE = 0.4f; + // Default lower water mark limit is 95% size of memstore size. + private static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f; + // These two data members go together. Any entry in the one must have // a corresponding entry in the other. private final BlockingQueue flushQueue = @@ -78,19 +92,15 @@ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Object blockSignal = new Object(); - protected final long globalMemStoreLimit; - protected final long globalMemStoreLimitLowMark; - - static final float DEFAULT_UPPER = 0.4f; - private static final float DEFAULT_LOWER = 0.35f; - static final String UPPER_KEY = - "hbase.regionserver.global.memstore.upperLimit"; - private static final String LOWER_KEY = - "hbase.regionserver.global.memstore.lowerLimit"; + protected long globalMemStoreLimit; + protected float globalMemStoreLimitLowMarkPercent; + protected long globalMemStoreLimitLowMark; + private long blockingWaitTime; private final Counter updatesBlockedMsHighWater = new Counter(); private final FlushHandler[] flushHandlers; + private List flushListeners = new ArrayList(1); /** * @param conf @@ -103,15 +113,12 @@ this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); - this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER, - UPPER_KEY, conf); - long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf); - if (lower > this.globalMemStoreLimit) { - lower = this.globalMemStoreLimit; - LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + - "because supplied " + LOWER_KEY + " was > " + UPPER_KEY); - } - this.globalMemStoreLimitLowMark = lower; + float globalMemStorePercent = getGlobalMemStorePercent(conf); + this.globalMemStoreLimit = (long) (max * globalMemStorePercent); + this.globalMemStoreLimitLowMarkPercent = getGlobalMemStoreLowerMark(conf, globalMemStorePercent); + this.globalMemStoreLimitLowMark = + (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); + this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); @@ -124,29 +131,38 @@ } /** - * Calculate size using passed key for configured - * percentage of max. + * Calculate global memstore size for configured percentage of max. * @param max - * @param defaultLimit - * @param key * @param c * @return Limit. */ - static long globalMemStoreLimit(final long max, - final float defaultLimit, final String key, final Configuration c) { - float limit = c.getFloat(key, defaultLimit); - return getMemStoreLimit(max, limit, defaultLimit); + static float getGlobalMemStorePercent(final Configuration c) { + float limit = c.getFloat(MEMSTORE_SIZE_KEY, + c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE)); + if (limit > 0.8f || limit < 0.05f) { + LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE + + " because supplied value outside allowed range of 0.05 -> 0.8"); + limit = DEFAULT_MEMSTORE_SIZE; + } + return limit; } - static long getMemStoreLimit(final long max, final float limit, - final float defaultLimit) { - float effectiveLimit = limit; - if (limit >= 0.9f || limit < 0.1f) { - LOG.warn("Setting global memstore limit to default of " + defaultLimit + - " because supplied value outside allowed range of 0.1 -> 0.9"); - effectiveLimit = defaultLimit; + private static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) { + float lowMarkPercent = c.getFloat(MEMSTORE_SIZE_LOWER_LIMIT_KEY, + DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT); + String lowerWaterMarkOldValStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY); + if (null != lowerWaterMarkOldValStr) { + LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use " + + MEMSTORE_SIZE_LOWER_LIMIT_KEY); + float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr); + if (lowerWaterMarkOldVal > globalMemStorePercent) { + lowerWaterMarkOldVal = globalMemStorePercent; + LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + "because supplied " + + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " was > " + MEMSTORE_SIZE_OLD_KEY); + } + lowMarkPercent = lowerWaterMarkOldVal / globalMemStorePercent; } - return (long)(max * effectiveLimit); + return lowMarkPercent; } public Counter getUpdatesBlockedMsHighWater() { @@ -454,6 +470,7 @@ lock.readLock().lock(); try { boolean shouldCompact = region.flushcache(); + notifyFlushCompleted(region, emergencyFlush); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { @@ -485,6 +502,16 @@ return true; } + private void notifyFlushCompleted(HRegion region, boolean emergencyFlush) { + FlushType type = FlushType.NORMAL; + if (emergencyFlush) { + type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK; + } + for (MemstoreFlushListener listener : flushListeners) { + listener.flushCompleted(type, region); + } + } + private void wakeUpIfBlocking() { synchronized (blockSignal) { blockSignal.notifyAll(); @@ -570,6 +597,38 @@ return queueList.toString(); } + /** + * Register a MemstoreFlushListener + * @param listener + */ + public void registerFlushListener(final MemstoreFlushListener listener) { + this.flushListeners.add(listener); + } + + /** + * Unregister the listener from MemstoreFlushListeners + * @param listener + * @return + */ + public boolean unregisterFlushListeners(final MemstoreFlushListener listener) { + return this.flushListeners.remove(listener); + } + + /** + * Sets the global memstore limit to a new size. + * @param globalMemStoreSize + */ + public void setGlobalMemstoreSize(long globalMemStoreSize) { + this.globalMemStoreLimit = globalMemStoreSize; + this.globalMemStoreLimitLowMark = + (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize); + reclaimMemStoreMemory(); + } + + public long getMemoryLimit() { + return this.globalMemStoreLimit; + } + interface FlushQueueEntry extends Delayed {} /** @@ -672,3 +731,7 @@ } } } + +enum FlushType { + NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK; +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreFlushListener.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreFlushListener.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreFlushListener.java (working copy) @@ -0,0 +1,28 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private +public interface MemstoreFlushListener { + + void flushCompleted(FlushType type, HRegion region); + +}