Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1517456)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy)
@@ -699,7 +699,7 @@
public static final String HFILE_BLOCK_CACHE_SIZE_KEY =
"hfile.block.cache.size";
- public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.25f;
+ public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f;
/*
* Minimum percentage of free heap necessary for a successful cluster startup.
Index: hbase-common/src/main/resources/hbase-default.xml
===================================================================
--- hbase-common/src/main/resources/hbase-default.xml (revision 1517456)
+++ hbase-common/src/main/resources/hbase-default.xml (working copy)
@@ -237,19 +237,19 @@
The HLog file writer implementation.
- hbase.regionserver.global.memstore.upperLimit
+ hbase.regionserver.global.memstore.size
0.4
Maximum size of all memstores in a region server before new
updates are blocked and flushes are forced. Defaults to 40% of heap.
Updates are blocked and flushes are forced until size of all memstores
- in a region server hits hbase.regionserver.global.memstore.lowerLimit.
+ in a region server hits hbase.regionserver.global.memstore.size.lower.limit.
- hbase.regionserver.global.memstore.lowerLimit
- 0.38
+ hbase.regionserver.global.memstore.size.lower.limit
+ 0.95
Maximum size of all memstores in a region server before
- flushes are forced. Defaults to 38% of heap.
- This value equal to hbase.regionserver.global.memstore.upperLimit causes
+ flushes are forced. Defaults to 95% of hbase.regionserver.global.memstore.size.
+ A 100% value for this value causes
the minimum possible flushing to occur when updates are blocked due to
memstore limiting.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (revision 1517456)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (working copy)
@@ -347,7 +347,7 @@
* @param conf The current configuration.
* @return The block cache or null.
*/
- private static synchronized BlockCache instantiateBlockCache(Configuration conf) {
+ public static synchronized BlockCache instantiateBlockCache(Configuration conf) {
if (globalBlockCache != null) return globalBlockCache;
if (blockCacheDisabled) return null;
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (revision 1517456)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (working copy)
@@ -38,7 +38,7 @@
*
**/
@InterfaceAudience.Private
-public class DoubleBlockCache implements BlockCache, HeapSize {
+public class DoubleBlockCache implements ResizableBlockCache, HeapSize {
static final Log LOG = LogFactory.getLog(DoubleBlockCache.class.getName());
@@ -172,4 +172,9 @@
return onHeapCache.getBlockCount() + offHeapCache.getBlockCount();
}
+ @Override
+ public void setMaxSize(long size) {
+ this.onHeapCache.setMaxSize(size);
+ }
+
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (revision 1517456)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (working copy)
@@ -95,7 +95,7 @@
* to the relative sizes and usage.
*/
@InterfaceAudience.Private
-public class LruBlockCache implements BlockCache, HeapSize {
+public class LruBlockCache implements ResizableBlockCache, HeapSize {
static final Log LOG = LogFactory.getLog(LruBlockCache.class);
@@ -272,6 +272,7 @@
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
}
+ @Override
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
if(this.size.get() > acceptableSize() && !evictionInProgress) {
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java (working copy)
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface ResizableBlockCache extends BlockCache {
+
+ /**
+ * Sets the max heap size that can be used by the BlockCache.
+ * @param size The max heap size.
+ */
+ void setMaxSize(long size);
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryBalancerImpl.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryBalancerImpl.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryBalancerImpl.java (working copy)
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.AutoTunerContext;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.AutoTunerResult;
+
+public class DefaultHeapMemoryBalancerImpl implements HeapMemoryBalancer {
+
+ private static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step";
+ private static final float STEP_DEFAULT_VALUE = 0.2f;
+
+ private Configuration conf;
+ private float step = STEP_DEFAULT_VALUE;
+
+ @Override
+ public AutoTunerResult balance(AutoTunerContext context) {
+ long blockedFlushCount = context.getBlockedFlushCount();
+ long unblockedFlushCount = context.getUnblockedFlushCount();
+ long evictCount = context.getEvictCount();
+ boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0;
+ boolean blockCacheSufficient = evictCount == 0;
+ if (memstoreSufficient && blockCacheSufficient) {
+ return new AutoTunerResult(false);
+ }
+ AutoTunerResult result = new AutoTunerResult(true);
+ if (memstoreSufficient) {
+ // Increase the block cache size and corresponding decrease in memstore size
+ result.setBlockCacheSize(context.getCurBlockCacheSize() + step);
+ result.setMemstoreSize(context.getCurMemStoreSize() - step);
+ } else if (blockCacheSufficient) {
+ // Increase the block cache size and corresponding decrease in memstore size
+ result.setBlockCacheSize(context.getCurBlockCacheSize() - step);
+ result.setMemstoreSize(context.getCurMemStoreSize() + step);
+ } else {
+ return new AutoTunerResult(false);
+ // As of now not making any tuning in write/read heavy scenario.
+ }
+ return result;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ this.step = conf.getFloat(STEP_KEY, STEP_DEFAULT_VALUE);
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryBalancer.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryBalancer.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryBalancer.java (working copy)
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.AutoTunerContext;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.AutoTunerResult;
+
+/**
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface HeapMemoryBalancer extends Configurable {
+
+ /**
+ *
+ * @param context
+ * @return
+ */
+ AutoTunerResult balance(AutoTunerContext context);
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java (working copy)
@@ -0,0 +1,316 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ *
+ */
+@InterfaceAudience.Private
+public class HeapMemoryManager {
+ private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class);
+ private static final int CONVERT_TO_PERCENTAGE = 100;
+
+ private static final String MEMSTORE_SIZE_MAX_RANGE_KEY =
+ "hbase.regionserver.global.memstore.size.max.range";
+ private static final String MEMSTORE_SIZE_MIN_RANGE_KEY =
+ "hbase.regionserver.global.memstore.size.min.range";
+
+ private static final String BLOCK_CACHE_SIZE_MAX_RANGE_KEY =
+ "hfile.block.cache.size.max.range";
+ private static final String BLOCK_CACHE_SIZE_MIN_RANGE_KEY =
+ "hfile.block.cache.size.min.range";
+
+ private float globalMemStorePercent;
+ private float globalMemStorePercentMinRange;
+ private float globalMemStorePercentMaxRange;
+
+ private float blockCachePercent;
+ private float blockCachePercentMinRange;
+ private float blockCachePercentMaxRange;
+
+ private final ResizableBlockCache blockCache;
+ private final MemStoreFlusher memStoreFlusher;
+ private final HRegionServer hrs;
+
+ private HeapMemoryAutoTuner autoTuner = null;
+ private boolean autoTunerOn = true;
+
+ private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
+
+ public HeapMemoryManager(ResizableBlockCache blockCache, MemStoreFlusher memStoreFlusher,
+ HRegionServer hrs) {
+ this.blockCache = blockCache;
+ this.memStoreFlusher = memStoreFlusher;
+ this.hrs = hrs;
+ initMemstoreSizeRanges(hrs.getConfiguration());
+ initBlockCacheSizes(hrs.getConfiguration());
+ doInitChecks();
+ }
+
+ private void initMemstoreSizeRanges(Configuration conf) {
+ globalMemStorePercent = MemStoreFlusher.getGlobalMemStorePercent(conf);
+ globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
+ globalMemStorePercent);
+ globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
+ globalMemStorePercent);
+ if (globalMemStorePercent < globalMemStorePercentMinRange) {
+ LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent
+ + ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY
+ + " because supplied value greater than initial memstore size value.");
+ globalMemStorePercentMinRange = globalMemStorePercent;
+ }
+ if (globalMemStorePercent > globalMemStorePercentMaxRange) {
+ LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent
+ + ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY
+ + " because supplied value less than initial memstore size value.");
+ globalMemStorePercentMaxRange = globalMemStorePercent;
+ }
+ if (globalMemStorePercent == globalMemStorePercentMinRange
+ && globalMemStorePercent == globalMemStorePercentMaxRange) {
+ autoTunerOn = false;
+ }
+ }
+
+ private void doInitChecks() {
+ int gml = (int)(globalMemStorePercent * CONVERT_TO_PERCENTAGE);
+ int bcul = (int)(blockCachePercent * CONVERT_TO_PERCENTAGE);
+ if (CONVERT_TO_PERCENTAGE - (gml + bcul) <
+ (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) {
+ throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
+ + "the threshold required for successful cluster operation. "
+ + "The combined value cannot exceed 0.8. Please check " + "the settings for "
+ + MemStoreFlusher.MEMSTORE_SIZE_KEY + " and " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY
+ + " in your configuration. " + MemStoreFlusher.MEMSTORE_SIZE_KEY + " is "
+ + globalMemStorePercent + " and " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + " is "
+ + blockCachePercent);
+ }
+ gml = (int)(globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
+ bcul = (int)(globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
+ if (CONVERT_TO_PERCENTAGE - (gml + bcul) <
+ (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) {
+ throw new RuntimeException();
+ }
+ gml = (int)(globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
+ bcul = (int)(globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
+ if (CONVERT_TO_PERCENTAGE - (gml + bcul) <
+ (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) {
+ throw new RuntimeException();
+ }
+ }
+
+ private void initBlockCacheSizes(Configuration conf) {
+ blockCachePercent = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
+ HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+ blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercent);
+ blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercent);
+ if (blockCachePercent < blockCachePercentMinRange) {
+ LOG.warn("Setting " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " to " + blockCachePercent
+ + ", same value as " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY
+ + " because supplied value greater than initial block cache size.");
+ blockCachePercentMinRange = blockCachePercent;
+ }
+ if (blockCachePercent > blockCachePercentMaxRange) {
+ LOG.warn("Setting " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " to " + blockCachePercent
+ + ", same value as " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY
+ + " because supplied value less than initial block cache size.");
+ blockCachePercentMaxRange = blockCachePercent;
+ }
+ if (blockCachePercent == blockCachePercentMinRange
+ && blockCachePercent == blockCachePercentMaxRange) {
+ autoTunerOn = false;
+ }
+ }
+
+ public void start() {
+ if (autoTunerOn) {
+ LOG.info("Starting HeapMemoryAutoTuner.");
+ this.autoTuner = new HeapMemoryAutoTuner();
+ Threads.setDaemonThreadRunning(autoTuner.getThread());
+ // Register HeapMemoryAutoTuner as a memstore flush listener
+ memStoreFlusher.registerFlushListener(autoTuner);
+ }
+ }
+
+ private class HeapMemoryAutoTuner extends Chore implements MemstoreFlushListener {
+ private static final String PERIOD = "hbase.regionserver.heapmemory.autotuner.period";
+ private static final String HBASE_RS_MEMORY_BALANCER_CLASS = "hbase.regionserver.memory.balancer.class";
+ private static final int DEFAULT_PERIOD = 5 * 60 * 1000; // Default is 5 mins
+
+ private HeapMemoryBalancer heapMemoryBalancer;
+
+ private AtomicLong blockedFlushCount = new AtomicLong();
+ private AtomicLong unblockedFlushCount = new AtomicLong();
+ private long evictCount = 0L;
+
+ public HeapMemoryAutoTuner() {
+ super(hrs.getServerName() + "-HeapMemoryAutoTuner", hrs.getConfiguration().getInt(PERIOD,
+ DEFAULT_PERIOD), hrs);
+ Class extends HeapMemoryBalancer> balancerKlass = hrs.getConfiguration().getClass(
+ HBASE_RS_MEMORY_BALANCER_CLASS, DefaultHeapMemoryBalancerImpl.class,
+ HeapMemoryBalancer.class);
+ heapMemoryBalancer = ReflectionUtils.newInstance(balancerKlass, hrs.getConfiguration());
+ }
+
+ @Override
+ protected void chore() {
+ evictCount = blockCache.getStats().getEvictedCount() - evictCount;
+ AutoTunerContext context = createTunerContext();
+ AutoTunerResult result = this.heapMemoryBalancer.balance(context);
+ if (result.needsTuning()) {
+ float memstoreSize = result.getMemstoreSize();
+ float blockCacheSize = result.getBlockCacheSize();
+ LOG.debug("From HeapMemoryBalancer new memstoreSize: " + memstoreSize + "%. new blockCacheSize: " + blockCacheSize + "%");
+ if (1 - (memstoreSize + blockCacheSize) < HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
+ LOG.warn("Current heap configuration from HeapMemoryBalancer exceeds "
+ + "the threshold required for successful cluster operation. "
+ + "The combined value cannot exceed 0.8. " + MemStoreFlusher.MEMSTORE_SIZE_KEY
+ + " is " + memstoreSize + " and " + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY + " is "
+ + blockCacheSize);
+ } else {
+ long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize);
+ LOG.info("Setting block cache heap size to " + newBlockCacheSize);
+ blockCachePercent = blockCacheSize;
+ blockCache.setMaxSize(newBlockCacheSize);
+ long newMemstoreSize = (long) (maxHeapSize * memstoreSize);
+ LOG.info("Setting memstore heap size to " + newMemstoreSize);
+ globalMemStorePercent = memstoreSize;
+ memStoreFlusher.setGlobalMemstoreSize(newMemstoreSize);
+ }
+ }
+ }
+
+ private AutoTunerContext createTunerContext() {
+ AutoTunerContext context = new AutoTunerContext();
+ context.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
+ context.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
+ context.setEvictCount(evictCount);
+ context.setCurBlockCacheSize(blockCachePercent);
+ context.setCurMemStoreSize(globalMemStorePercent);
+ return context;
+ }
+
+ @Override
+ public void flushCompleted(FlushType type, HRegion region) {
+ switch (type) {
+ case ABOVE_HIGHER_MARK:
+ blockedFlushCount.incrementAndGet();
+ break;
+ case ABOVE_LOWER_MARK:
+ unblockedFlushCount.incrementAndGet();
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public static final class AutoTunerContext {
+ private long blockedFlushCount;
+ private long unblockedFlushCount;
+ private long evictCount;
+ private float curMemStoreSize;
+ private float curBlockCacheSize;
+
+ public AutoTunerContext() {
+ }
+
+ public long getBlockedFlushCount() {
+ return blockedFlushCount;
+ }
+
+ public void setBlockedFlushCount(long blockedFlushCount) {
+ this.blockedFlushCount = blockedFlushCount;
+ }
+
+ public long getUnblockedFlushCount() {
+ return unblockedFlushCount;
+ }
+
+ public void setUnblockedFlushCount(long unblockedFlushCount) {
+ this.unblockedFlushCount = unblockedFlushCount;
+ }
+
+ public long getEvictCount() {
+ return evictCount;
+ }
+
+ public void setEvictCount(long evictCount) {
+ this.evictCount = evictCount;
+ }
+
+ public float getCurMemStoreSize() {
+ return curMemStoreSize;
+ }
+
+ public void setCurMemStoreSize(float curMemStoreSize) {
+ this.curMemStoreSize = curMemStoreSize;
+ }
+
+ public float getCurBlockCacheSize() {
+ return curBlockCacheSize;
+ }
+
+ public void setCurBlockCacheSize(float curBlockCacheSize) {
+ this.curBlockCacheSize = curBlockCacheSize;
+ }
+ }
+
+ public static final class AutoTunerResult {
+ private float memstoreSize;
+ private float blockCacheSize;
+ private final boolean needsTuning;
+
+ public AutoTunerResult(boolean needsTuning) {
+ this.needsTuning = needsTuning;
+ }
+
+ public float getMemstoreSize() {
+ return memstoreSize;
+ }
+
+ public void setMemstoreSize(float memstoreSize) {
+ this.memstoreSize = memstoreSize;
+ }
+
+ public float getBlockCacheSize() {
+ return blockCacheSize;
+ }
+
+ public void setBlockCacheSize(float blockCacheSize) {
+ this.blockCacheSize = blockCacheSize;
+ }
+
+ public boolean needsTuning() {
+ return needsTuning;
+ }
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1517456)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -59,7 +59,6 @@
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -101,7 +100,9 @@
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
@@ -259,6 +260,8 @@
// Cache flushing
protected MemStoreFlusher cacheFlusher;
+ protected HeapMemoryManager hMemManager;
+
// catalog tracker
protected CatalogTracker catalogTracker;
@@ -1197,6 +1200,7 @@
spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
startServiceThreads();
+ startHeapMemoryManager();
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RpcServer on " + this.isa +
", sessionid=0x" +
@@ -1579,6 +1583,15 @@
splitLogWorker.start();
}
+ private void startHeapMemoryManager() {
+ BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
+ if (blockCache instanceof ResizableBlockCache) {
+ this.hMemManager = new HeapMemoryManager((ResizableBlockCache) blockCache, this.cacheFlusher,
+ this);
+ this.hMemManager.start();
+ }
+ }
+
/**
* Puts up the webui.
* @return Returns final port -- maybe different from what we started with.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (revision 1517456)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (working copy)
@@ -194,8 +194,7 @@
}
long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax();
- long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax,
- MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf);
+ long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf));
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY,
MemStoreLAB.CHUNK_SIZE_DEFAULT);
int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1517456)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy)
@@ -21,9 +21,11 @@
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
@@ -65,6 +67,18 @@
@InterfaceAudience.Private
class MemStoreFlusher implements FlushRequester {
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
+ static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size";
+ private static final String MEMSTORE_SIZE_OLD_KEY =
+ "hbase.regionserver.global.memstore.upperLimit";
+ private static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY =
+ "hbase.regionserver.global.memstore.size.lower.limit";
+ private static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY =
+ "hbase.regionserver.global.memstore.lowerLimit";
+
+ private static final float DEFAULT_MEMSTORE_SIZE = 0.4f;
+ // Default lower water mark limit is 95% size of memstore size.
+ private static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f;
+
// These two data members go together. Any entry in the one must have
// a corresponding entry in the other.
private final BlockingQueue flushQueue =
@@ -78,19 +92,15 @@
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Object blockSignal = new Object();
- protected final long globalMemStoreLimit;
- protected final long globalMemStoreLimitLowMark;
-
- static final float DEFAULT_UPPER = 0.4f;
- private static final float DEFAULT_LOWER = 0.35f;
- static final String UPPER_KEY =
- "hbase.regionserver.global.memstore.upperLimit";
- private static final String LOWER_KEY =
- "hbase.regionserver.global.memstore.lowerLimit";
+ protected long globalMemStoreLimit;
+ protected float globalMemStoreLimitLowMarkPercent;
+ protected long globalMemStoreLimitLowMark;
+
private long blockingWaitTime;
private final Counter updatesBlockedMsHighWater = new Counter();
private final FlushHandler[] flushHandlers;
+ private List flushListeners = new ArrayList(1);
/**
* @param conf
@@ -103,15 +113,12 @@
this.threadWakeFrequency =
conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
- this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
- UPPER_KEY, conf);
- long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
- if (lower > this.globalMemStoreLimit) {
- lower = this.globalMemStoreLimit;
- LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " +
- "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
- }
- this.globalMemStoreLimitLowMark = lower;
+ float globalMemStorePercent = getGlobalMemStorePercent(conf);
+ this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
+ this.globalMemStoreLimitLowMarkPercent = getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
+ this.globalMemStoreLimitLowMark =
+ (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
+
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000);
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
@@ -124,29 +131,38 @@
}
/**
- * Calculate size using passed key for configured
- * percentage of max.
+ * Calculate global memstore size for configured percentage of max.
* @param max
- * @param defaultLimit
- * @param key
* @param c
* @return Limit.
*/
- static long globalMemStoreLimit(final long max,
- final float defaultLimit, final String key, final Configuration c) {
- float limit = c.getFloat(key, defaultLimit);
- return getMemStoreLimit(max, limit, defaultLimit);
+ static float getGlobalMemStorePercent(final Configuration c) {
+ float limit = c.getFloat(MEMSTORE_SIZE_KEY,
+ c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE));
+ if (limit > 0.8f || limit < 0.05f) {
+ LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE
+ + " because supplied value outside allowed range of 0.05 -> 0.8");
+ limit = DEFAULT_MEMSTORE_SIZE;
+ }
+ return limit;
}
- static long getMemStoreLimit(final long max, final float limit,
- final float defaultLimit) {
- float effectiveLimit = limit;
- if (limit >= 0.9f || limit < 0.1f) {
- LOG.warn("Setting global memstore limit to default of " + defaultLimit +
- " because supplied value outside allowed range of 0.1 -> 0.9");
- effectiveLimit = defaultLimit;
+ private static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) {
+ float lowMarkPercent = c.getFloat(MEMSTORE_SIZE_LOWER_LIMIT_KEY,
+ DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT);
+ String lowerWaterMarkOldValStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY);
+ if (null != lowerWaterMarkOldValStr) {
+ LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use "
+ + MEMSTORE_SIZE_LOWER_LIMIT_KEY);
+ float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr);
+ if (lowerWaterMarkOldVal > globalMemStorePercent) {
+ lowerWaterMarkOldVal = globalMemStorePercent;
+ LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + "because supplied "
+ + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " was > " + MEMSTORE_SIZE_OLD_KEY);
+ }
+ lowMarkPercent = lowerWaterMarkOldVal / globalMemStorePercent;
}
- return (long)(max * effectiveLimit);
+ return lowMarkPercent;
}
public Counter getUpdatesBlockedMsHighWater() {
@@ -454,6 +470,7 @@
lock.readLock().lock();
try {
boolean shouldCompact = region.flushcache();
+ notifyFlushCompleted(region, emergencyFlush);
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
if (shouldSplit) {
@@ -485,6 +502,16 @@
return true;
}
+ private void notifyFlushCompleted(HRegion region, boolean emergencyFlush) {
+ FlushType type = FlushType.NORMAL;
+ if (emergencyFlush) {
+ type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
+ }
+ for (MemstoreFlushListener listener : flushListeners) {
+ listener.flushCompleted(type, region);
+ }
+ }
+
private void wakeUpIfBlocking() {
synchronized (blockSignal) {
blockSignal.notifyAll();
@@ -570,6 +597,38 @@
return queueList.toString();
}
+ /**
+ * Register a MemstoreFlushListener
+ * @param listener
+ */
+ public void registerFlushListener(final MemstoreFlushListener listener) {
+ this.flushListeners.add(listener);
+ }
+
+ /**
+ * Unregister the listener from MemstoreFlushListeners
+ * @param listener
+ * @return
+ */
+ public boolean unregisterFlushListeners(final MemstoreFlushListener listener) {
+ return this.flushListeners.remove(listener);
+ }
+
+ /**
+ * Sets the global memstore limit to a new size.
+ * @param globalMemStoreSize
+ */
+ public void setGlobalMemstoreSize(long globalMemStoreSize) {
+ this.globalMemStoreLimit = globalMemStoreSize;
+ this.globalMemStoreLimitLowMark =
+ (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
+ reclaimMemStoreMemory();
+ }
+
+ public long getMemoryLimit() {
+ return this.globalMemStoreLimit;
+ }
+
interface FlushQueueEntry extends Delayed {}
/**
@@ -672,3 +731,7 @@
}
}
}
+
+enum FlushType {
+ NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
+}
\ No newline at end of file
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreFlushListener.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreFlushListener.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreFlushListener.java (working copy)
@@ -0,0 +1,28 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface MemstoreFlushListener {
+
+ void flushCompleted(FlushType type, HRegion region);
+
+}