Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1530193)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy)
@@ -716,8 +716,18 @@
public static final String HFILE_BLOCK_CACHE_SIZE_KEY =
"hfile.block.cache.size";
- public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.25f;
+ public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f;
+ public static final String BLOCK_CACHE_SIZE_MAX_RANGE_KEY = "hfile.block.cache.size.max.range";
+
+ public static final String BLOCK_CACHE_SIZE_MIN_RANGE_KEY = "hfile.block.cache.size.min.range";
+
+ public static final String MEMSTORE_SIZE_MAX_RANGE_KEY =
+ "hbase.regionserver.global.memstore.size.max.range";
+
+ public static final String MEMSTORE_SIZE_MIN_RANGE_KEY =
+ "hbase.regionserver.global.memstore.size.min.range";
+
/*
* Minimum percentage of free heap necessary for a successful cluster startup.
*/
@@ -901,6 +911,12 @@
public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.port";
public static final int DEFAULT_STATUS_MULTICAST_PORT = 60100;
+ public static final String HBASE_RS_MEMORY_TUNER_PERIOD =
+ "hbase.regionserver.heapmemory.tuner.period";
+ public static final int HBASE_RS_MEMORY_TUNER_DEFAULT_PERIOD = 5 * 60 * 1000;// Default is 5 mins
+ public static final String HBASE_RS_MEMORY_BALANCER_CLASS =
+ "hbase.regionserver.memory.balancer.class";
+
private HConstants() {
// Can't be instantiated with this ctor.
}
Index: hbase-common/src/main/resources/hbase-default.xml
===================================================================
--- hbase-common/src/main/resources/hbase-default.xml (revision 1530193)
+++ hbase-common/src/main/resources/hbase-default.xml (working copy)
@@ -243,21 +243,20 @@
The HLog file writer implementation.
- hbase.regionserver.global.memstore.upperLimit
+ hbase.regionserver.global.memstore.size
0.4
Maximum size of all memstores in a region server before new
updates are blocked and flushes are forced. Defaults to 40% of heap.
Updates are blocked and flushes are forced until size of all memstores
- in a region server hits hbase.regionserver.global.memstore.lowerLimit.
+ in a region server hits hbase.regionserver.global.memstore.size.lower.limit.
- hbase.regionserver.global.memstore.lowerLimit
- 0.38
- Maximum size of all memstores in a region server before
- flushes are forced. Defaults to 38% of heap.
- This value equal to hbase.regionserver.global.memstore.upperLimit causes
- the minimum possible flushing to occur when updates are blocked due to
- memstore limiting.
+ hbase.regionserver.global.memstore.size.lower.limit
+ 0.95
+ Maximum size of all memstores in a region server before flushes are forced.
+ Defaults to 95% of hbase.regionserver.global.memstore.size.
+ A 100% value for this value causes the minimum possible flushing to occur when updates are
+ blocked due to memstore limiting.
hbase.regionserver.optionalcacheflushinterval
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (revision 1530193)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (working copy)
@@ -347,7 +347,7 @@
* @param conf The current configuration.
* @return The block cache or null.
*/
- private static synchronized BlockCache instantiateBlockCache(Configuration conf) {
+ public static synchronized BlockCache instantiateBlockCache(Configuration conf) {
if (globalBlockCache != null) return globalBlockCache;
if (blockCacheDisabled) return null;
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (revision 1530193)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (working copy)
@@ -38,7 +38,7 @@
*
**/
@InterfaceAudience.Private
-public class DoubleBlockCache implements BlockCache, HeapSize {
+public class DoubleBlockCache implements ResizableBlockCache, HeapSize {
static final Log LOG = LogFactory.getLog(DoubleBlockCache.class.getName());
@@ -172,4 +172,8 @@
return onHeapCache.getBlockCount() + offHeapCache.getBlockCount();
}
+ @Override
+ public void setMaxSize(long size) {
+ this.onHeapCache.setMaxSize(size);
+ }
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (revision 1530193)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (working copy)
@@ -95,7 +95,7 @@
* to the relative sizes and usage.
*/
@InterfaceAudience.Private
-public class LruBlockCache implements BlockCache, HeapSize {
+public class LruBlockCache implements ResizableBlockCache, HeapSize {
static final Log LOG = LogFactory.getLog(LruBlockCache.class);
@@ -272,6 +272,7 @@
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
}
+ @Override
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
if(this.size.get() > acceptableSize() && !evictionInProgress) {
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java (working copy)
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * BlockCache which is resizable.
+ */
+@InterfaceAudience.Private
+public interface ResizableBlockCache extends BlockCache {
+
+ /**
+ * Sets the max heap size that can be used by the BlockCache.
+ * @param size The max heap size.
+ */
+ void setMaxSize(long size);
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryBalancer.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryBalancer.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryBalancer.java (working copy)
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.BLOCK_CACHE_SIZE_MAX_RANGE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BLOCK_CACHE_SIZE_MIN_RANGE_KEY;
+import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
+import static org.apache.hadoop.hbase.HConstants.MEMSTORE_SIZE_MAX_RANGE_KEY;
+import static org.apache.hadoop.hbase.HConstants.MEMSTORE_SIZE_MIN_RANGE_KEY;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
+
+/**
+ * The default implementation for the HeapMemoryBalancer. This will do simple checks to decide
+ * whether there should be changes in the heap size of memstore/block cache. When there is no block
+ * cache eviction at all but there are flushes because of global heap pressure, it will increase the
+ * memstore heap size and decrease block cache size. The step value for this heap size change can be
+ * specified using the config hbase.regionserver.heapmemory.autotuner.step. When there is no
+ * memstore flushes because of heap pressure but there is block cache evictions it will increase the
+ * block cache heap.
+ */
+@InterfaceAudience.Private
+class DefaultHeapMemoryBalancer implements HeapMemoryBalancer {
+
+ public static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step";
+ public static final float DEFAULT_STEP_VALUE = 0.02f; // 2%
+
+ private Configuration conf;
+ private float step = DEFAULT_STEP_VALUE;
+
+ private float globalMemStorePercentMinRange;
+ private float globalMemStorePercentMaxRange;
+ private float blockCachePercentMinRange;
+ private float blockCachePercentMaxRange;
+
+ @Override
+ public TunerResult balance(TunerContext context) {
+ long blockedFlushCount = context.getBlockedFlushCount();
+ long unblockedFlushCount = context.getUnblockedFlushCount();
+ long evictCount = context.getEvictCount();
+ boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0;
+ boolean blockCacheSufficient = evictCount == 0;
+ if (memstoreSufficient && blockCacheSufficient) {
+ return new TunerResult(false);
+ }
+ TunerResult result = new TunerResult(true);
+ float newMemstoreSize;
+ float newBlockCacheSize;
+ if (memstoreSufficient) {
+ // Increase the block cache size and corresponding decrease in memstore size
+ newBlockCacheSize = context.getCurBlockCacheSize() + step;
+ newMemstoreSize = context.getCurMemStoreSize() - step;
+ } else if (blockCacheSufficient) {
+ // Increase the memstore size and corresponding decrease in block cache size
+ newBlockCacheSize = context.getCurBlockCacheSize() - step;
+ newMemstoreSize = context.getCurMemStoreSize() + step;
+ } else {
+ return new TunerResult(false);
+ // As of now not making any tuning in write/read heavy scenario.
+ }
+ if (newMemstoreSize > globalMemStorePercentMaxRange) {
+ newMemstoreSize = globalMemStorePercentMaxRange;
+ } else if (newMemstoreSize < globalMemStorePercentMinRange) {
+ newMemstoreSize = globalMemStorePercentMinRange;
+ }
+ if (newBlockCacheSize > blockCachePercentMaxRange) {
+ newBlockCacheSize = blockCachePercentMaxRange;
+ } else if (newBlockCacheSize < blockCachePercentMinRange) {
+ newBlockCacheSize = blockCachePercentMinRange;
+ }
+ result.setBlockCacheSize(newBlockCacheSize);
+ result.setMemstoreSize(newMemstoreSize);
+ return result;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ this.step = conf.getFloat(STEP_KEY, DEFAULT_STEP_VALUE);
+ this.blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY,
+ conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT));
+ this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY,
+ conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT));
+ this.globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
+ MemStoreFlusher.getGlobalMemStorePercent(conf));
+ this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
+ MemStoreFlusher.getGlobalMemStorePercent(conf));
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushListener.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushListener.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushListener.java (working copy)
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Listener which will get notified regarding flushes of table regions.
+ */
+@InterfaceAudience.Private
+public interface FlushListener {
+
+ /**
+ * Callback which will get called when a flush is happening for a region.
+ *
+ * @param type The type of flush. (ie. Whether a normal flush or flush because of global heap preassure)
+ * @param region The region which is getting flushed
+ */
+ void flushCompleted(FlushType type, HRegion region);
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (revision 1530193)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (working copy)
@@ -39,4 +39,26 @@
* @param delay after how much time should the flush happen
*/
void requestDelayedFlush(HRegion region, long delay);
+
+ /**
+ * Register a FlushListener
+ *
+ * @param listener
+ */
+ void registerFlushListener(final FlushListener listener);
+
+ /**
+ * Unregister the given FlushListener
+ *
+ * @param listener
+ * @return
+ */
+ public boolean unregisterFlushListener(final FlushListener listener);
+
+ /**
+ * Sets the global memstore limit to a new size.
+ *
+ * @param globalMemStoreSize
+ */
+ public void setGlobalMemstoreLimit(long globalMemStoreSize);
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryBalancer.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryBalancer.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryBalancer.java (working copy)
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
+
+/**
+ * Makes the decision regarding proper sizing of the heap memory. Decides what percentage of heap
+ * memory should be allocated for global memstore and BlockCache.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface HeapMemoryBalancer extends Configurable {
+
+ /**
+ * Perform the balancing operation.
+ *
+ * @param context
+ * @return TunerResult including the memstore heap percentage and block cache percentage
+ */
+ TunerResult balance(TunerContext context);
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java (revision 0)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java (working copy)
@@ -0,0 +1,366 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.BLOCK_CACHE_SIZE_MAX_RANGE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BLOCK_CACHE_SIZE_MIN_RANGE_KEY;
+import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
+import static org.apache.hadoop.hbase.HConstants.MEMSTORE_SIZE_MAX_RANGE_KEY;
+import static org.apache.hadoop.hbase.HConstants.MEMSTORE_SIZE_MIN_RANGE_KEY;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Manages balancing of Heap memory using HeapMemoryBalancer.
+ */
+@InterfaceAudience.Private
+public class HeapMemoryManager {
+ private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class);
+ private static final int CONVERT_TO_PERCENTAGE = 100;
+ private static final int CLUSTER_MINIMUM_MEMORY_THRESHOLD =
+ (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD);
+
+ private float globalMemStorePercent;
+ private float globalMemStorePercentMinRange;
+ private float globalMemStorePercentMaxRange;
+
+ private float blockCachePercent;
+ private float blockCachePercentMinRange;
+ private float blockCachePercentMaxRange;
+
+ private final ResizableBlockCache blockCache;
+ private final FlushRequester memStoreFlusher;
+ private final Server server;
+
+ private HeapMemoryTuner heapMmyTuner = null;
+ private boolean tunerOn = true;
+
+ private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
+
+ public HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
+ Server server) {
+ this.blockCache = blockCache;
+ this.memStoreFlusher = memStoreFlusher;
+ this.server = server;
+ initMemstoreSizeRanges(server.getConfiguration());
+ initBlockCacheSizes(server.getConfiguration());
+ doInitChecks();
+ }
+
+ private void initMemstoreSizeRanges(Configuration conf) {
+ globalMemStorePercent = MemStoreFlusher.getGlobalMemStorePercent(conf);
+ globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
+ globalMemStorePercent);
+ globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
+ globalMemStorePercent);
+ if (globalMemStorePercent < globalMemStorePercentMinRange) {
+ LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent
+ + ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY
+ + " because supplied value greater than initial memstore size value.");
+ globalMemStorePercentMinRange = globalMemStorePercent;
+ conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange);
+ }
+ if (globalMemStorePercent > globalMemStorePercentMaxRange) {
+ LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent
+ + ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY
+ + " because supplied value less than initial memstore size value.");
+ globalMemStorePercentMaxRange = globalMemStorePercent;
+ conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange);
+ }
+ if (globalMemStorePercent == globalMemStorePercentMinRange
+ && globalMemStorePercent == globalMemStorePercentMaxRange) {
+ tunerOn = false;
+ }
+ }
+
+ private void doInitChecks() {
+ int gml = (int) (globalMemStorePercent * CONVERT_TO_PERCENTAGE);
+ int bcul = (int) (blockCachePercent * CONVERT_TO_PERCENTAGE);
+ if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
+ throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
+ + "the threshold required for successful cluster operation. "
+ + "The combined value cannot exceed 0.8. Please check " + "the settings for "
+ + MemStoreFlusher.MEMSTORE_SIZE_KEY + " and " + HFILE_BLOCK_CACHE_SIZE_KEY
+ + " in your configuration. " + MemStoreFlusher.MEMSTORE_SIZE_KEY + " is "
+ + globalMemStorePercent + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
+ + blockCachePercent);
+ }
+ if (this.tunerOn) {
+ gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
+ bcul = (int) (blockCachePercentMinRange * CONVERT_TO_PERCENTAGE);
+ if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
+ throw new RuntimeException();
+ }
+ gml = (int) (globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
+ bcul = (int) (blockCachePercentMaxRange * CONVERT_TO_PERCENTAGE);
+ if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
+ throw new RuntimeException();
+ }
+ }
+ }
+
+ private void initBlockCacheSizes(Configuration conf) {
+ blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY,
+ HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
+ blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercent);
+ blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercent);
+ if (blockCachePercent < blockCachePercentMinRange) {
+ LOG.warn("Setting " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " to " + blockCachePercent
+ + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
+ + " because supplied value greater than initial block cache size.");
+ blockCachePercentMinRange = blockCachePercent;
+ conf.setFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercentMinRange);
+ }
+ if (blockCachePercent > blockCachePercentMaxRange) {
+ LOG.warn("Setting " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " to " + blockCachePercent
+ + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
+ + " because supplied value less than initial block cache size.");
+ blockCachePercentMaxRange = blockCachePercent;
+ conf.setFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercentMaxRange);
+ }
+ if (blockCachePercent == blockCachePercentMinRange
+ && blockCachePercent == blockCachePercentMaxRange) {
+ tunerOn = false;
+ }
+ }
+
+ public void start() {
+ if (tunerOn) {
+ LOG.info("Starting HeapMemory Tuner.");
+ this.heapMmyTuner = new HeapMemoryTuner();
+ Threads.setDaemonThreadRunning(heapMmyTuner.getThread());
+ // Register HeapMemoryTuner as a memstore flush listener
+ memStoreFlusher.registerFlushListener(heapMmyTuner);
+ }
+ }
+
+ public void stop() {
+ // The thread is Daemon. Just interrupting the ongoing process.
+ if (tunerOn) {
+ LOG.info("Stoping HeapMemory Tuner.");
+ this.heapMmyTuner.interrupt();
+ }
+ }
+
+ // Used by the test cases.
+ boolean isTunerOn() {
+ return this.tunerOn;
+ }
+
+ private class HeapMemoryTuner extends Chore implements FlushListener {
+
+ private HeapMemoryBalancer heapMemoryBalancer;
+
+ private AtomicLong blockedFlushCount = new AtomicLong();
+ private AtomicLong unblockedFlushCount = new AtomicLong();
+ private long evictCount = 0L;
+
+ private TunerContext tunerContext = new TunerContext();
+
+ public HeapMemoryTuner() {
+ super(server.getServerName() + "-HeapMemoryAutoTuner",
+ server.getConfiguration().getInt(HConstants.HBASE_RS_MEMORY_TUNER_PERIOD,
+ HConstants.HBASE_RS_MEMORY_TUNER_DEFAULT_PERIOD), server);
+ Class extends HeapMemoryBalancer> balancerKlass = server.getConfiguration().getClass(
+ HConstants.HBASE_RS_MEMORY_BALANCER_CLASS, DefaultHeapMemoryBalancer.class,
+ HeapMemoryBalancer.class);
+ heapMemoryBalancer = ReflectionUtils.newInstance(balancerKlass, server.getConfiguration());
+ }
+
+ @Override
+ protected void chore() {
+ evictCount = blockCache.getStats().getEvictedCount() - evictCount;
+ tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
+ tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
+ tunerContext.setEvictCount(evictCount);
+ tunerContext.setCurBlockCacheSize(blockCachePercent);
+ tunerContext.setCurMemStoreSize(globalMemStorePercent);
+ TunerResult result = null;
+ try {
+ result = this.heapMemoryBalancer.balance(tunerContext);
+ } catch (Throwable t) {
+ LOG.error("Exception thrown from the HeapMemoryBalancer implementation", t);
+ }
+ if (result != null && result.needsTuning()) {
+ float memstoreSize = result.getMemstoreSize();
+ float blockCacheSize = result.getBlockCacheSize();
+ LOG.debug("From HeapMemoryBalancer new memstoreSize: " + memstoreSize
+ + ". new blockCacheSize: " + blockCacheSize);
+ if (memstoreSize < globalMemStorePercentMinRange) {
+ LOG.info("New memstoreSize from HeapMemoryBalancer " + memstoreSize
+ + " is below min level " + globalMemStorePercentMinRange
+ + ". Resetting memstoreSize to min size");
+ memstoreSize = globalMemStorePercentMinRange;
+ } else if (memstoreSize > globalMemStorePercentMaxRange) {
+ LOG.info("New memstoreSize from HeapMemoryBalancer " + memstoreSize
+ + " is above max level " + globalMemStorePercentMaxRange
+ + ". Resetting memstoreSize to max size");
+ memstoreSize = globalMemStorePercentMaxRange;
+ }
+ if (blockCacheSize < blockCachePercentMinRange) {
+ LOG.info("New blockCacheSize from HeapMemoryBalancer " + blockCacheSize
+ + " is below min level " + blockCachePercentMinRange
+ + ". Resetting blockCacheSize to min size");
+ blockCacheSize = blockCachePercentMinRange;
+ } else if (blockCacheSize > blockCachePercentMaxRange) {
+ LOG.info("New blockCacheSize from HeapMemoryBalancer " + blockCacheSize
+ + " is above max level " + blockCachePercentMaxRange
+ + ". Resetting blockCacheSize to min size");
+ blockCacheSize = blockCachePercentMaxRange;
+ }
+ int gml = (int) (memstoreSize * CONVERT_TO_PERCENTAGE);
+ int bcul = (int) (blockCacheSize * CONVERT_TO_PERCENTAGE);
+ if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
+ LOG.info("Current heap configuration from HeapMemoryBalancer exceeds "
+ + "the threshold required for successful cluster operation. "
+ + "The combined value cannot exceed 0.8. " + MemStoreFlusher.MEMSTORE_SIZE_KEY
+ + " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
+ + blockCacheSize);
+ // TODO can adjust the value so as not exceed 80%. Is that correct? may be.
+ } else {
+ long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize);
+ long newMemstoreSize = (long) (maxHeapSize * memstoreSize);
+ LOG.info("Setting block cache heap size to " + newBlockCacheSize
+ + " and memstore heap size to " + newMemstoreSize);
+ blockCachePercent = blockCacheSize;
+ blockCache.setMaxSize(newBlockCacheSize);
+ globalMemStorePercent = memstoreSize;
+ memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
+ }
+ }
+ }
+
+ @Override
+ public void flushCompleted(FlushType type, HRegion region) {
+ switch (type) {
+ case ABOVE_HIGHER_MARK:
+ blockedFlushCount.incrementAndGet();
+ break;
+ case ABOVE_LOWER_MARK:
+ unblockedFlushCount.incrementAndGet();
+ break;
+ default:
+ // In case of normal flush don't do any action.
+ break;
+ }
+ }
+ }
+
+ /**
+ * POJO to pass all the relevant information required to do the heap memory tuning. It holds the
+ * flush counts and block cache evictions happened within the interval. Also holds the current
+ * heap percentage allocated for memstore and block cache.
+ */
+ public static final class TunerContext {
+ private long blockedFlushCount;
+ private long unblockedFlushCount;
+ private long evictCount;
+ private float curMemStoreSize;
+ private float curBlockCacheSize;
+
+ public TunerContext() {
+ }
+
+ public long getBlockedFlushCount() {
+ return blockedFlushCount;
+ }
+
+ public void setBlockedFlushCount(long blockedFlushCount) {
+ this.blockedFlushCount = blockedFlushCount;
+ }
+
+ public long getUnblockedFlushCount() {
+ return unblockedFlushCount;
+ }
+
+ public void setUnblockedFlushCount(long unblockedFlushCount) {
+ this.unblockedFlushCount = unblockedFlushCount;
+ }
+
+ public long getEvictCount() {
+ return evictCount;
+ }
+
+ public void setEvictCount(long evictCount) {
+ this.evictCount = evictCount;
+ }
+
+ public float getCurMemStoreSize() {
+ return curMemStoreSize;
+ }
+
+ public void setCurMemStoreSize(float curMemStoreSize) {
+ this.curMemStoreSize = curMemStoreSize;
+ }
+
+ public float getCurBlockCacheSize() {
+ return curBlockCacheSize;
+ }
+
+ public void setCurBlockCacheSize(float curBlockCacheSize) {
+ this.curBlockCacheSize = curBlockCacheSize;
+ }
+ }
+
+ /**
+ * POJO which holds the result of memory tuning done by HeapMemoryBalancer implementation.
+ * It includes the new heap percentage for memstore and block cache.
+ */
+ public static final class TunerResult {
+ private float memstoreSize;
+ private float blockCacheSize;
+ private final boolean needsTuning;
+
+ public TunerResult(boolean needsTuning) {
+ this.needsTuning = needsTuning;
+ }
+
+ public float getMemstoreSize() {
+ return memstoreSize;
+ }
+
+ public void setMemstoreSize(float memstoreSize) {
+ this.memstoreSize = memstoreSize;
+ }
+
+ public float getBlockCacheSize() {
+ return blockCacheSize;
+ }
+
+ public void setBlockCacheSize(float blockCacheSize) {
+ this.blockCacheSize = blockCacheSize;
+ }
+
+ public boolean needsTuning() {
+ return needsTuning;
+ }
+ }
+}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1530193)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -101,7 +101,9 @@
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
@@ -271,6 +273,8 @@
// Cache flushing
protected MemStoreFlusher cacheFlusher;
+ protected HeapMemoryManager hMemManager;
+
// catalog tracker
protected CatalogTracker catalogTracker;
@@ -894,6 +898,7 @@
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
+ if(this.hMemManager != null) this.hMemManager.stop();
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
@@ -1210,6 +1215,7 @@
spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
startServiceThreads();
+ startHeapMemoryManager();
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RpcServer on " + this.isa +
", sessionid=0x" +
@@ -1225,6 +1231,15 @@
}
}
+ private void startHeapMemoryManager() {
+ BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
+ if (blockCache instanceof ResizableBlockCache) {
+ this.hMemManager = new HeapMemoryManager((ResizableBlockCache) blockCache, this.cacheFlusher,
+ this);
+ this.hMemManager.start();
+ }
+ }
+
private void createMyEphemeralNode() throws KeeperException {
ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(),
HConstants.EMPTY_BYTE_ARRAY);
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (revision 1530193)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (working copy)
@@ -194,8 +194,7 @@
}
long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax();
- long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax,
- MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf);
+ long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf));
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY,
MemStoreLAB.CHUNK_SIZE_DEFAULT);
int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1530193)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy)
@@ -21,9 +21,11 @@
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
@@ -65,6 +67,18 @@
@InterfaceAudience.Private
class MemStoreFlusher implements FlushRequester {
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
+ static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size";
+ private static final String MEMSTORE_SIZE_OLD_KEY =
+ "hbase.regionserver.global.memstore.upperLimit";
+ private static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY =
+ "hbase.regionserver.global.memstore.size.lower.limit";
+ private static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY =
+ "hbase.regionserver.global.memstore.lowerLimit";
+
+ private static final float DEFAULT_MEMSTORE_SIZE = 0.4f;
+ // Default lower water mark limit is 95% size of memstore size.
+ private static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f;
+
// These two data members go together. Any entry in the one must have
// a corresponding entry in the other.
private final BlockingQueue flushQueue =
@@ -78,19 +92,15 @@
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Object blockSignal = new Object();
- protected final long globalMemStoreLimit;
- protected final long globalMemStoreLimitLowMark;
+ protected long globalMemStoreLimit;
+ protected float globalMemStoreLimitLowMarkPercent;
+ protected long globalMemStoreLimitLowMark;
- static final float DEFAULT_UPPER = 0.4f;
- private static final float DEFAULT_LOWER = 0.35f;
- static final String UPPER_KEY =
- "hbase.regionserver.global.memstore.upperLimit";
- private static final String LOWER_KEY =
- "hbase.regionserver.global.memstore.lowerLimit";
private long blockingWaitTime;
private final Counter updatesBlockedMsHighWater = new Counter();
private final FlushHandler[] flushHandlers;
+ private List flushListeners = new ArrayList(1);
/**
* @param conf
@@ -103,15 +113,13 @@
this.threadWakeFrequency =
conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
- this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
- UPPER_KEY, conf);
- long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
- if (lower > this.globalMemStoreLimit) {
- lower = this.globalMemStoreLimit;
- LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " +
- "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
- }
- this.globalMemStoreLimitLowMark = lower;
+ float globalMemStorePercent = getGlobalMemStorePercent(conf);
+ this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
+ this.globalMemStoreLimitLowMarkPercent =
+ getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
+ this.globalMemStoreLimitLowMark =
+ (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
+
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000);
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
@@ -124,29 +132,38 @@
}
/**
- * Calculate size using passed key for configured
- * percentage of max.
+ * Calculate global memstore size for configured percentage of max.
* @param max
- * @param defaultLimit
- * @param key
* @param c
* @return Limit.
*/
- static long globalMemStoreLimit(final long max,
- final float defaultLimit, final String key, final Configuration c) {
- float limit = c.getFloat(key, defaultLimit);
- return getMemStoreLimit(max, limit, defaultLimit);
+ static float getGlobalMemStorePercent(final Configuration c) {
+ float limit = c.getFloat(MEMSTORE_SIZE_KEY,
+ c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE));
+ if (limit > 0.8f || limit < 0.05f) {
+ LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE
+ + " because supplied value outside allowed range of 0.05 -> 0.8");
+ limit = DEFAULT_MEMSTORE_SIZE;
+ }
+ return limit;
}
- static long getMemStoreLimit(final long max, final float limit,
- final float defaultLimit) {
- float effectiveLimit = limit;
- if (limit >= 0.9f || limit < 0.1f) {
- LOG.warn("Setting global memstore limit to default of " + defaultLimit +
- " because supplied value outside allowed range of 0.1 -> 0.9");
- effectiveLimit = defaultLimit;
+ private static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) {
+ float lowMarkPercent = c.getFloat(MEMSTORE_SIZE_LOWER_LIMIT_KEY,
+ DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT);
+ String lowerWaterMarkOldValStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY);
+ if (null != lowerWaterMarkOldValStr) {
+ LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use "
+ + MEMSTORE_SIZE_LOWER_LIMIT_KEY);
+ float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr);
+ if (lowerWaterMarkOldVal > globalMemStorePercent) {
+ lowerWaterMarkOldVal = globalMemStorePercent;
+ LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + "because supplied "
+ + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " was > " + MEMSTORE_SIZE_OLD_KEY);
+ }
+ lowMarkPercent = lowerWaterMarkOldVal / globalMemStorePercent;
}
- return (long)(max * effectiveLimit);
+ return lowMarkPercent;
}
public Counter getUpdatesBlockedMsHighWater() {
@@ -454,6 +471,7 @@
lock.readLock().lock();
try {
boolean shouldCompact = region.flushcache();
+ notifyFlushCompleted(region, emergencyFlush);
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
if (shouldSplit) {
@@ -485,6 +503,16 @@
return true;
}
+ private void notifyFlushCompleted(HRegion region, boolean emergencyFlush) {
+ FlushType type = FlushType.NORMAL;
+ if (emergencyFlush) {
+ type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
+ }
+ for (FlushListener listener : flushListeners) {
+ listener.flushCompleted(type, region);
+ }
+ }
+
private void wakeUpIfBlocking() {
synchronized (blockSignal) {
blockSignal.notifyAll();
@@ -570,6 +598,38 @@
return queueList.toString();
}
+ /**
+ * Register a MemstoreFlushListener
+ * @param listener
+ */
+ public void registerFlushListener(final FlushListener listener) {
+ this.flushListeners.add(listener);
+ }
+
+ /**
+ * Unregister the listener from MemstoreFlushListeners
+ * @param listener
+ * @return
+ */
+ public boolean unregisterFlushListener(final FlushListener listener) {
+ return this.flushListeners.remove(listener);
+ }
+
+ /**
+ * Sets the global memstore limit to a new size.
+ * @param globalMemStoreSize
+ */
+ public void setGlobalMemstoreLimit(long globalMemStoreSize) {
+ this.globalMemStoreLimit = globalMemStoreSize;
+ this.globalMemStoreLimitLowMark =
+ (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
+ reclaimMemStoreMemory();
+ }
+
+ public long getMemoryLimit() {
+ return this.globalMemStoreLimit;
+ }
+
interface FlushQueueEntry extends Delayed {}
/**
@@ -672,3 +732,7 @@
}
}
}
+
+enum FlushType {
+ NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
+}
\ No newline at end of file
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java (revision 0)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java (working copy)
@@ -0,0 +1,454 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
+import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestHeapMemoryManager {
+
+ private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
+
+ @Test
+ public void testAutoTunerShouldBeOffWhenMaxMinRangesForMemstoreIsNotGiven() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
+ HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
+ new MemstoreFlusherStub(0), new RegionServerStub(conf));
+ assertFalse(manager.isTunerOn());
+ }
+
+ @Test
+ public void testAutoTunerShouldBeOffWhenMaxMinRangesForBlockCacheIsNotGiven() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f);
+ HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
+ new MemstoreFlusherStub(0), new RegionServerStub(conf));
+ assertFalse(manager.isTunerOn());
+ }
+
+ @Test
+ public void testWhenMemstoreAndBlockCacheMaxMinChecksFails() throws Exception {
+ BlockCacheStub blockCache = new BlockCacheStub(0);
+ MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub(0);
+ Configuration conf = HBaseConfiguration.create();
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f);
+ try {
+ new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
+ fail();
+ } catch (RuntimeException e) {
+ }
+ conf = HBaseConfiguration.create();
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.2f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
+ try {
+ new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
+ fail();
+ } catch (RuntimeException e) {
+ }
+ }
+
+ @Test
+ public void testWhenClusterIsWriteHeavy() throws Exception {
+ BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
+ MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+ Configuration conf = HBaseConfiguration.create();
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
+ conf.setLong(HConstants.HBASE_RS_MEMORY_TUNER_PERIOD, 1000);
+ // Let the system start with default values for memstore heap and block cache size.
+ HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+ new RegionServerStub(conf));
+ long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
+ long oldBlockCacheSize = blockCache.maxSize;
+ heapMemoryManager.start();
+ memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
+ memStoreFlusher.requestFlush(null);
+ memStoreFlusher.requestFlush(null);
+ memStoreFlusher.requestFlush(null);
+ memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
+ memStoreFlusher.requestFlush(null);
+ Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
+ assertHeapSpaceDelta(DefaultHeapMemoryBalancer.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
+ memStoreFlusher.memstoreSize);
+ assertHeapSpaceDelta(-(DefaultHeapMemoryBalancer.DEFAULT_STEP_VALUE), oldBlockCacheSize,
+ blockCache.maxSize);
+ oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
+ oldBlockCacheSize = blockCache.maxSize;
+ // Do some more flushes before the next run of HeapMemoryBalancer
+ memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
+ memStoreFlusher.requestFlush(null);
+ memStoreFlusher.requestFlush(null);
+ Thread.sleep(1500);
+ assertHeapSpaceDelta(DefaultHeapMemoryBalancer.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
+ memStoreFlusher.memstoreSize);
+ assertHeapSpaceDelta(-(DefaultHeapMemoryBalancer.DEFAULT_STEP_VALUE), oldBlockCacheSize,
+ blockCache.maxSize);
+ }
+
+ @Test
+ public void testWhenClusterIsReadHeavy() throws Exception {
+ BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
+ MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+ Configuration conf = HBaseConfiguration.create();
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
+ conf.setLong(HConstants.HBASE_RS_MEMORY_TUNER_PERIOD, 1000);
+ // Let the system start with default values for memstore heap and block cache size.
+ HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+ new RegionServerStub(conf));
+ long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
+ long oldBlockCacheSize = blockCache.maxSize;
+ heapMemoryManager.start();
+ blockCache.evictBlock(null);
+ blockCache.evictBlock(null);
+ blockCache.evictBlock(null);
+ Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
+ assertHeapSpaceDelta(-(DefaultHeapMemoryBalancer.DEFAULT_STEP_VALUE), oldMemstoreHeapSize,
+ memStoreFlusher.memstoreSize);
+ assertHeapSpaceDelta(DefaultHeapMemoryBalancer.DEFAULT_STEP_VALUE, oldBlockCacheSize,
+ blockCache.maxSize);
+ oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
+ oldBlockCacheSize = blockCache.maxSize;
+ // Do some more evictions before the next run of HeapMemoryBalancer
+ blockCache.evictBlock(null);
+ Thread.sleep(1500);
+ assertHeapSpaceDelta(-(DefaultHeapMemoryBalancer.DEFAULT_STEP_VALUE), oldMemstoreHeapSize,
+ memStoreFlusher.memstoreSize);
+ assertHeapSpaceDelta(DefaultHeapMemoryBalancer.DEFAULT_STEP_VALUE, oldBlockCacheSize,
+ blockCache.maxSize);
+ }
+
+ @Test
+ public void testPluggingInHeapMemoryBalancer() throws Exception {
+ BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
+ MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+ Configuration conf = HBaseConfiguration.create();
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.78f);
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.02f);
+ conf.setLong(HConstants.HBASE_RS_MEMORY_TUNER_PERIOD, 1000);
+ conf.setClass(HConstants.HBASE_RS_MEMORY_BALANCER_CLASS, CustomHeapMemoryBalancer.class,
+ HeapMemoryBalancer.class);
+ // Let the system start with default values for memstore heap and block cache size.
+ HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+ new RegionServerStub(conf));
+ heapMemoryManager.start();
+ // Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryBalancer
+ CustomHeapMemoryBalancer.memstoreSize = 0.78f;
+ CustomHeapMemoryBalancer.blockCacheSize = 0.02f;
+ Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
+ assertHeapSpace(0.78f, memStoreFlusher.memstoreSize);// Memstore
+ assertHeapSpace(0.02f, blockCache.maxSize);// BlockCache
+ // Now we wants to be in read mode. Set bigger memstore size from CustomHeapMemoryBalancer
+ CustomHeapMemoryBalancer.blockCacheSize = 0.75f;
+ CustomHeapMemoryBalancer.memstoreSize = 0.05f;
+ Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
+ assertHeapSpace(0.75f, blockCache.maxSize);// BlockCache
+ assertHeapSpace(0.05f, memStoreFlusher.memstoreSize);// Memstore
+ }
+
+ @Test
+ public void testWhenSizeGivenByHeapBalancerGoesOutsideRange() throws Exception {
+ BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
+ MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+ Configuration conf = HBaseConfiguration.create();
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.7f);
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.1f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
+ conf.setLong(HConstants.HBASE_RS_MEMORY_TUNER_PERIOD, 1000);
+ conf.setClass(HConstants.HBASE_RS_MEMORY_BALANCER_CLASS, CustomHeapMemoryBalancer.class,
+ HeapMemoryBalancer.class);
+ HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+ new RegionServerStub(conf));
+ heapMemoryManager.start();
+ CustomHeapMemoryBalancer.memstoreSize = 0.78f;
+ CustomHeapMemoryBalancer.blockCacheSize = 0.02f;
+ Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
+ // Even if the balancer says to set the memstore to 78%, HBase makes it as 70% as that is the
+ // upper bound. Same with block cache as 10% is the lower bound.
+ assertHeapSpace(0.7f, memStoreFlusher.memstoreSize);
+ assertHeapSpace(0.1f, blockCache.maxSize);
+ }
+
+ @Test
+ public void testWhenCombinedHeapSizesFromBalancerGoesOutSideMaxLimit() throws Exception {
+ BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
+ MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
+ Configuration conf = HBaseConfiguration.create();
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.7f);
+ conf.setFloat(HConstants.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.1f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
+ conf.setFloat(HConstants.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
+ conf.setLong(HConstants.HBASE_RS_MEMORY_TUNER_PERIOD, 1000);
+ conf.setClass(HConstants.HBASE_RS_MEMORY_BALANCER_CLASS, CustomHeapMemoryBalancer.class,
+ HeapMemoryBalancer.class);
+ HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
+ new RegionServerStub(conf));
+ long oldMemstoreSize = memStoreFlusher.memstoreSize;
+ long oldBlockCacheSize = blockCache.maxSize;
+ heapMemoryManager.start();
+ CustomHeapMemoryBalancer.memstoreSize = 0.7f;
+ CustomHeapMemoryBalancer.blockCacheSize = 0.3f;
+ Thread.sleep(1500);
+ assertEquals(oldMemstoreSize, memStoreFlusher.memstoreSize);
+ assertEquals(oldBlockCacheSize, blockCache.maxSize);
+ }
+
+ private void assertHeapSpace(float expectedHeapPercentage, long currentHeapSpace) {
+ long expected = (long) (this.maxHeapSize * expectedHeapPercentage);
+ assertEquals(expected, currentHeapSpace);
+ }
+
+ private void assertHeapSpaceDelta(float expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) {
+ long expctedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent);
+ if (expectedDeltaPercent > 0) {
+ assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace));
+ } else {
+ assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace));
+ }
+ }
+
+ private static class BlockCacheStub implements ResizableBlockCache {
+
+ CacheStats stats = new CacheStats();
+ long maxSize = 0;
+
+ public BlockCacheStub(long size){
+ this.maxSize = size;
+ }
+
+ @Override
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
+
+ }
+
+ @Override
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
+
+ }
+
+ @Override
+ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
+ return null;
+ }
+
+ @Override
+ public boolean evictBlock(BlockCacheKey cacheKey) {
+ stats.evicted();
+ return false;
+ }
+
+ @Override
+ public int evictBlocksByHfileName(String hfileName) {
+ stats.evicted(); // Just assuming only one block for file here.
+ return 0;
+ }
+
+ @Override
+ public CacheStats getStats() {
+ return this.stats;
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public long size() {
+ return 0;
+ }
+
+ @Override
+ public long getFreeSize() {
+ return 0;
+ }
+
+ @Override
+ public long getCurrentSize() {
+ return 0;
+ }
+
+ @Override
+ public long getEvictedCount() {
+ return 0;
+ }
+
+ @Override
+ public long getBlockCount() {
+ return 0;
+ }
+
+ @Override
+ public List getBlockCacheColumnFamilySummaries(Configuration conf)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public void setMaxSize(long size) {
+ this.maxSize = size;
+ }
+ }
+
+ private static class MemstoreFlusherStub implements FlushRequester {
+
+ long memstoreSize;
+
+ FlushListener listener;
+
+ FlushType flushType = FlushType.NORMAL;
+
+ public MemstoreFlusherStub(long memstoreSize) {
+ this.memstoreSize = memstoreSize;
+ }
+
+ @Override
+ public void requestFlush(HRegion region) {
+ this.listener.flushCompleted(flushType, region);
+ }
+
+ @Override
+ public void requestDelayedFlush(HRegion region, long delay) {
+
+ }
+
+ @Override
+ public void registerFlushListener(FlushListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public boolean unregisterFlushListener(FlushListener listener) {
+ return false;
+ }
+
+ @Override
+ public void setGlobalMemstoreLimit(long globalMemStoreSize) {
+ this.memstoreSize = globalMemStoreSize;
+ }
+ }
+
+ private static class RegionServerStub implements Server {
+ private Configuration conf;
+ private boolean stopped = false;
+
+ public RegionServerStub(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+
+ @Override
+ public void stop(String why) {
+ this.stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ return null;
+ }
+
+ @Override
+ public CatalogTracker getCatalogTracker() {
+ return null;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return new ServerName("server1,4000,12345");
+ }
+ }
+
+ static class CustomHeapMemoryBalancer implements HeapMemoryBalancer {
+ static float blockCacheSize = 0.4f;
+ static float memstoreSize = 0.4f;
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration arg0) {
+
+ }
+
+ @Override
+ public TunerResult balance(TunerContext context) {
+ TunerResult result = new TunerResult(true);
+ result.setBlockCacheSize(blockCacheSize);
+ result.setMemstoreSize(memstoreSize);
+ return result;
+ }
+ }
+}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1530193)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy)
@@ -64,6 +64,7 @@
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
+import org.apache.hadoop.hbase.regionserver.FlushListener;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -888,6 +889,21 @@
// TODO Auto-generated method stub
}
+
+ @Override
+ public void registerFlushListener(FlushListener listener) {
+
+ }
+
+ @Override
+ public boolean unregisterFlushListener(FlushListener listener) {
+ return false;
+ }
+
+ @Override
+ public void setGlobalMemstoreLimit(long globalMemStoreSize) {
+
+ }
}
private void addWALEdits (final TableName tableName, final HRegionInfo hri,