diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 484eebd..3ba2126 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -142,6 +142,18 @@ public final class CellUtil { return destinationOffset + rowLen; } + public static int copyRowTo(Cell cell, ByteBuffer destination, int destinationOffset) { + short rowLen = cell.getRowLength(); + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyFromBufferToBuffer(((ByteBufferedCell) cell).getRowByteBuffer(), + destination, ((ByteBufferedCell) cell).getRowPosition(), destinationOffset, rowLen); + } else { + ByteBufferUtils.copyFromArrayToBuffer(destination, destinationOffset, cell.getRowArray(), + cell.getRowOffset(), rowLen); + } + return destinationOffset + rowLen; + } + /** * Copies the row to a new byte[] * @param cell the cell from which row has to copied @@ -171,6 +183,18 @@ public final class CellUtil { return destinationOffset + fLen; } + public static int copyFamilyTo(Cell cell, ByteBuffer destination, int destinationOffset) { + byte fLen = cell.getFamilyLength(); + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyFromBufferToBuffer(((ByteBufferedCell) cell).getFamilyByteBuffer(), + destination, ((ByteBufferedCell) cell).getFamilyPosition(), destinationOffset, fLen); + } else { + ByteBufferUtils.copyFromArrayToBuffer(destination, destinationOffset, cell.getFamilyArray(), + cell.getFamilyOffset(), fLen); + } + return destinationOffset + fLen; + } + public static int copyQualifierTo(Cell cell, byte[] destination, int destinationOffset) { int qlen = cell.getQualifierLength(); if (cell instanceof ByteBufferedCell) { @@ -184,6 +208,18 @@ public final class CellUtil { return destinationOffset + qlen; } + public static int copyQualifierTo(Cell cell, ByteBuffer destination, int destinationOffset) { + int qlen = cell.getQualifierLength(); + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyFromBufferToBuffer(((ByteBufferedCell) cell).getQualifierByteBuffer(), + destination, ((ByteBufferedCell) cell).getQualifierPosition(), destinationOffset, qlen); + } else { + ByteBufferUtils.copyFromArrayToBuffer(destination, destinationOffset, + cell.getQualifierArray(), cell.getQualifierOffset(), qlen); + } + return destinationOffset + qlen; + } + public static int copyValueTo(Cell cell, byte[] destination, int destinationOffset) { int vlen = cell.getValueLength(); if (cell instanceof ByteBufferedCell) { @@ -197,6 +233,18 @@ public final class CellUtil { return destinationOffset + vlen; } + public static int copyValueTo(Cell cell, ByteBuffer destination, int destinationOffset) { + int vlen = cell.getValueLength(); + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyFromBufferToBuffer(((ByteBufferedCell) cell).getValueByteBuffer(), + destination, ((ByteBufferedCell) cell).getValuePosition(), destinationOffset, vlen); + } else { + ByteBufferUtils.copyFromArrayToBuffer(destination, destinationOffset, cell.getValueArray(), + cell.getValueOffset(), vlen); + } + return destinationOffset + vlen; + } + /** * Copies the tags info into the tag portion of the cell * @param cell @@ -217,6 +265,18 @@ public final class CellUtil { return destinationOffset + tlen; } + public static int copyTagTo(Cell cell, ByteBuffer destination, int destinationOffset) { + int tlen = cell.getTagsLength(); + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyFromBufferToBuffer(((ByteBufferedCell) cell).getTagsByteBuffer(), + destination, ((ByteBufferedCell) cell).getTagsPosition(), destinationOffset, tlen); + } else { + ByteBufferUtils.copyFromArrayToBuffer(destination, destinationOffset, cell.getTagsArray(), + cell.getTagsOffset(), tlen); + } + return destinationOffset + tlen; + } + /********************* misc *************************************/ @Private @@ -546,12 +606,12 @@ public final class CellUtil { } @Override - public void write(byte[] buf, int offset) { - offset = KeyValueUtil.appendToByteArray(this.cell, buf, offset, false); + public void write(ByteBuffer buf, int offset) { + offset = KeyValueUtil.appendToByteBuffer(this.cell, buf, offset, false); int tagsLen = this.tags.length; assert tagsLen > 0; - offset = Bytes.putAsShort(buf, offset, tagsLen); - System.arraycopy(this.tags, 0, buf, offset, tagsLen); + offset = ByteBufferUtils.putAsShort(buf, offset, tagsLen); + ByteBufferUtils.copyFromArrayToBuffer(buf, offset, this.tags, 0, tagsLen); } @Override @@ -2604,4 +2664,34 @@ public final class CellUtil { return Type.DeleteFamily.getCode(); } } + + /** + * Clone the passed cell by copying its data into the passed buf. + */ + public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) { + int tagsLen = cell.getTagsLength(); + if (cell instanceof ExtendedCell) { + ((ExtendedCell) cell).write(buf, offset); + } else { + // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the + // other case also. The data fragments within Cell is copied into buf as in KeyValue + // serialization format only. + KeyValueUtil.appendToByteBuffer(cell, buf, offset, true); + } + if (buf.hasArray()) { + KeyValue newKv; + if (tagsLen == 0) { + // When tagsLen is 0, make a NoTagsKeyValue version of Cell. This is an optimized class + // which directly return tagsLen as 0. So we avoid parsing many length components in + // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell + // call getTagsLength(). + newKv = new NoTagsKeyValue(buf.array(), buf.arrayOffset() + offset, len); + } else { + newKv = new KeyValue(buf.array(), buf.arrayOffset() + offset, len); + } + newKv.setSequenceId(cell.getSequenceId()); + return newKv; + } + return new OffheapKeyValue(buf, offset, len, tagsLen > 0, cell.getSequenceId()); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java index f60da14..0c7fbb0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; @@ -65,7 +66,7 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam * @param buf The buffer where to write the Cell. * @param offset The offset within buffer, to write the Cell. */ - void write(byte[] buf, int offset); + void write(ByteBuffer buf, int offset); /** * @return The heap size overhead associated with this Cell. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java index 7b94c3d..4baaabe 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ZKConfig; @@ -81,7 +80,6 @@ public class HBaseConfiguration extends Configuration { conf.addResource("hbase-site.xml"); checkDefaultsVersion(conf); - HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf); return conf; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index a95f814..6a07513 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -2491,8 +2491,8 @@ public class KeyValue implements ExtendedCell { } @Override - public void write(byte[] buf, int offset) { - System.arraycopy(this.bytes, this.offset, buf, offset, this.length); + public void write(ByteBuffer buf, int offset) { + ByteBufferUtils.copyFromArrayToBuffer(buf, offset, this.bytes, this.offset, this.length); } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 077f9ee..d4c047c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -153,7 +153,6 @@ public class KeyValueUtil { return nextOffset; } - /**************** copy key and value *********************/ public static int appendToByteArray(Cell cell, byte[] output, int offset, boolean withTags) { @@ -170,15 +169,25 @@ public class KeyValueUtil { } /** - * The position will be set to the beginning of the new ByteBuffer - * @param cell - * @return the ByteBuffer containing the cell + * Copy the Cell content into the passed buf in KeyValue serialization format. */ - public static ByteBuffer copyToNewByteBuffer(final Cell cell) { - byte[] bytes = new byte[length(cell)]; - appendToByteArray(cell, bytes, 0, true); - ByteBuffer buffer = ByteBuffer.wrap(bytes); - return buffer; + public static int appendToByteBuffer(Cell cell, ByteBuffer buf, int offset, boolean withTags) { + offset = ByteBufferUtils.putInt(buf, offset, keyLength(cell));// Key length + offset = ByteBufferUtils.putInt(buf, offset, cell.getValueLength());// Value length + offset = ByteBufferUtils.putShort(buf, offset, cell.getRowLength());// RK length + offset = CellUtil.copyRowTo(cell, buf, offset);// Row bytes + offset = ByteBufferUtils.putByte(buf, offset, cell.getFamilyLength());// CF length + offset = CellUtil.copyFamilyTo(cell, buf, offset);// CF bytes + offset = CellUtil.copyQualifierTo(cell, buf, offset);// Qualifier bytes + offset = ByteBufferUtils.putLong(buf, offset, cell.getTimestamp());// TS + offset = ByteBufferUtils.putByte(buf, offset, cell.getTypeByte());// Type + offset = CellUtil.copyValueTo(cell, buf, offset);// Value bytes + int tagsLength = cell.getTagsLength(); + if (withTags && (tagsLength > 0)) { + offset = ByteBufferUtils.putAsShort(buf, offset, tagsLength);// Tags length + offset = CellUtil.copyTagTo(cell, buf, offset);// Tags bytes + } + return offset; } public static void appendToByteBuffer(final ByteBuffer bb, final KeyValue kv, @@ -660,29 +669,4 @@ public class KeyValueUtil { return size; } } - - /** - * Write the given cell in KeyValue serialization format into the given buf and return a new - * KeyValue object around that. - */ - public static KeyValue copyCellTo(Cell cell, byte[] buf, int offset, int len) { - int tagsLen = cell.getTagsLength(); - if (cell instanceof ExtendedCell) { - ((ExtendedCell) cell).write(buf, offset); - } else { - appendToByteArray(cell, buf, offset, true); - } - KeyValue newKv; - if (tagsLen == 0) { - // When tagsLen is 0, make a NoTagsKeyValue version of Cell. This is an optimized class which - // directly return tagsLen as 0. So we avoid parsing many length components in reading the - // tagLength stored in the backing buffer. The Memstore addition of every Cell call - // getTagsLength(). - newKv = new NoTagsKeyValue(buf, offset, len); - } else { - newKv = new KeyValue(buf, offset, len); - } - newKv.setSequenceId(cell.getSequenceId()); - return newKv; - } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java index 06a0ed6..4aa77cb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java @@ -266,8 +266,8 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell { } @Override - public void write(byte[] buf, int offset) { - ByteBufferUtils.copyFromBufferToArray(buf, this.buf, this.offset, offset, this.length); + public void write(ByteBuffer buf, int offset) { + ByteBufferUtils.copyFromBufferToBuffer(this.buf, buf, this.offset, offset, this.length); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 216a82d..d1e1ffd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -452,7 +452,7 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { } @Override - public void write(byte[] buf, int offset) { + public void write(ByteBuffer buf, int offset) { // This is not used in actual flow. Throwing UnsupportedOperationException throw new UnsupportedOperationException(); } @@ -708,7 +708,7 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder { } @Override - public void write(byte[] buf, int offset) { + public void write(ByteBuffer buf, int offset) { // This is not used in actual flow. Throwing UnsupportedOperationException throw new UnsupportedOperationException(); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/HeapMemorySizeUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/HeapMemorySizeUtil.java deleted file mode 100644 index 5bec65d..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/HeapMemorySizeUtil.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io.util; - -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -@InterfaceAudience.Private -public class HeapMemorySizeUtil { - - public static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size"; - public static final String MEMSTORE_SIZE_OLD_KEY = - "hbase.regionserver.global.memstore.upperLimit"; - public static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY = - "hbase.regionserver.global.memstore.size.lower.limit"; - public static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY = - "hbase.regionserver.global.memstore.lowerLimit"; - - public static final float DEFAULT_MEMSTORE_SIZE = 0.4f; - // Default lower water mark limit is 95% size of memstore size. - public static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f; - - private static final Log LOG = LogFactory.getLog(HeapMemorySizeUtil.class); - // a constant to convert a fraction to a percentage - private static final int CONVERT_TO_PERCENTAGE = 100; - - /** - * Checks whether we have enough heap memory left out after portion for Memstore and Block cache. - * We need atleast 20% of heap left out for other RS functions. - * @param conf - */ - public static void checkForClusterFreeMemoryLimit(Configuration conf) { - if (conf.get(MEMSTORE_SIZE_OLD_KEY) != null) { - LOG.warn(MEMSTORE_SIZE_OLD_KEY + " is deprecated by " + MEMSTORE_SIZE_KEY); - } - float globalMemstoreSize = getGlobalMemStorePercent(conf, false); - int gml = (int)(globalMemstoreSize * CONVERT_TO_PERCENTAGE); - float blockCacheUpperLimit = getBlockCacheHeapPercent(conf); - int bcul = (int)(blockCacheUpperLimit * CONVERT_TO_PERCENTAGE); - if (CONVERT_TO_PERCENTAGE - (gml + bcul) - < (int)(CONVERT_TO_PERCENTAGE * - HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) { - throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds " - + "the threshold required for successful cluster operation. " - + "The combined value cannot exceed 0.8. Please check " - + "the settings for hbase.regionserver.global.memstore.size and " - + "hfile.block.cache.size in your configuration. " - + "hbase.regionserver.global.memstore.size is " + globalMemstoreSize - + " hfile.block.cache.size is " + blockCacheUpperLimit); - } - } - - /** - * Retrieve global memstore configured size as percentage of total heap. - * @param c - * @param logInvalid - */ - public static float getGlobalMemStorePercent(final Configuration c, final boolean logInvalid) { - float limit = c.getFloat(MEMSTORE_SIZE_KEY, - c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE)); - if (limit > 0.8f || limit <= 0.0f) { - if (logInvalid) { - LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE - + " because supplied value outside allowed range of (0 -> 0.8]"); - } - limit = DEFAULT_MEMSTORE_SIZE; - } - return limit; - } - - /** - * Retrieve configured size for global memstore lower water mark as fraction of global memstore - * size. - */ - public static float getGlobalMemStoreLowerMark(final Configuration conf, float globalMemStorePercent) { - String lowMarkPercentStr = conf.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY); - if (lowMarkPercentStr != null) { - float lowMarkPercent = Float.parseFloat(lowMarkPercentStr); - if (lowMarkPercent > 1.0f) { - LOG.error("Bad configuration value for " + MEMSTORE_SIZE_LOWER_LIMIT_KEY + ": " + - lowMarkPercent + ". Using 1.0f instead."); - lowMarkPercent = 1.0f; - } - return lowMarkPercent; - } - String lowerWaterMarkOldValStr = conf.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY); - if (lowerWaterMarkOldValStr != null) { - LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use " - + MEMSTORE_SIZE_LOWER_LIMIT_KEY); - float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr); - if (lowerWaterMarkOldVal > globalMemStorePercent) { - lowerWaterMarkOldVal = globalMemStorePercent; - LOG.error("Value of " + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " (" + lowerWaterMarkOldVal - + ") is greater than global memstore limit (" + globalMemStorePercent + ") set by " - + MEMSTORE_SIZE_KEY + "/" + MEMSTORE_SIZE_OLD_KEY + ". Setting memstore lower limit " - + "to " + globalMemStorePercent); - } - return lowerWaterMarkOldVal / globalMemStorePercent; - } - return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT; - } - - /** - * Retrieve configured size for on heap block cache as percentage of total heap. - * @param conf - */ - public static float getBlockCacheHeapPercent(final Configuration conf) { - // L1 block cache is always on heap - float l1CachePercent = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, - HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); - float l2CachePercent = getL2BlockCacheHeapPercent(conf); - return l1CachePercent + l2CachePercent; - } - - /** - * @param conf - * @return The on heap size for L2 block cache. - */ - public static float getL2BlockCacheHeapPercent(Configuration conf) { - float l2CachePercent = 0.0F; - String bucketCacheIOEngineName = conf.get(HConstants.BUCKET_CACHE_IOENGINE_KEY, null); - // L2 block cache can be on heap when IOEngine is "heap" - if (bucketCacheIOEngineName != null && bucketCacheIOEngineName.startsWith("heap")) { - float bucketCachePercentage = conf.getFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0F); - MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - l2CachePercent = bucketCachePercentage < 1 ? bucketCachePercentage - : (bucketCachePercentage * 1024 * 1024) / mu.getMax(); - } - return l2CachePercent; - } -} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index c9a19ff..760afd4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -876,6 +876,14 @@ public final class ByteBufferUtils { } } + public static int putInt(ByteBuffer buffer, int index, int val) { + if (UNSAFE_UNALIGNED) { + return UnsafeAccess.putInt(buffer, index, val); + } + buffer.putInt(index, val); + return index + Bytes.SIZEOF_INT; + } + /** * Reads a double value at the given buffer's offset. * @param buffer @@ -919,6 +927,21 @@ public final class ByteBufferUtils { } } + public static int putShort(ByteBuffer buffer, int index, short val) { + if (UNSAFE_UNALIGNED) { + return UnsafeAccess.putShort(buffer, index, val); + } + buffer.putShort(index, val); + return index + Bytes.SIZEOF_SHORT; + } + + public static int putAsShort(ByteBuffer buf, int index, int val) { + buf.put(index + 1, (byte) val); + val >>= 8; + buf.put(index, (byte) val); + return index + Bytes.SIZEOF_SHORT; + } + /** * Put a long value out to the given ByteBuffer's current position in big-endian format. * This also advances the position in buffer by long size. @@ -933,6 +956,15 @@ public final class ByteBufferUtils { buffer.putLong(val); } } + + public static int putLong(ByteBuffer buffer, int index, long val) { + if (UNSAFE_UNALIGNED) { + return UnsafeAccess.putLong(buffer, index, val); + } + buffer.putLong(index, val); + return index + Bytes.SIZEOF_LONG; + } + /** * Copies the bytes from given array's offset to length part into the given buffer. Puts the bytes * to buffer's current position. This also advances the position in the 'out' buffer by 'length' diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java new file mode 100644 index 0000000..5ca8159 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.util; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryType; +import java.lang.management.MemoryUsage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB; +import org.apache.hadoop.hbase.util.Pair; + +@InterfaceAudience.Private +public class MemorySizeUtil { + + public static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size"; + public static final String MEMSTORE_SIZE_OLD_KEY = + "hbase.regionserver.global.memstore.upperLimit"; + public static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY = + "hbase.regionserver.global.memstore.size.lower.limit"; + public static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY = + "hbase.regionserver.global.memstore.lowerLimit"; + // Max global off heap memory that can be used for all memstores + // This should be an absolute value in MBs and not percent. + public static final String OFFHEAP_MEMSTORE_SIZE_KEY = + "hbase.regionserver.offheap.global.memstore.size"; + + public static final float DEFAULT_MEMSTORE_SIZE = 0.4f; + // Default lower water mark limit is 95% size of memstore size. + public static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f; + + private static final Log LOG = LogFactory.getLog(MemorySizeUtil.class); + // a constant to convert a fraction to a percentage + private static final int CONVERT_TO_PERCENTAGE = 100; + + /** + * Checks whether we have enough heap memory left out after portion for Memstore and Block cache. + * We need atleast 20% of heap left out for other RS functions. + * @param conf + */ + public static void checkForClusterFreeHeapMemoryLimit(Configuration conf) { + if (conf.get(MEMSTORE_SIZE_OLD_KEY) != null) { + LOG.warn(MEMSTORE_SIZE_OLD_KEY + " is deprecated by " + MEMSTORE_SIZE_KEY); + } + float globalMemstoreSize = getGlobalMemStoreHeapPercent(conf, false); + int gml = (int)(globalMemstoreSize * CONVERT_TO_PERCENTAGE); + float blockCacheUpperLimit = getBlockCacheHeapPercent(conf); + int bcul = (int)(blockCacheUpperLimit * CONVERT_TO_PERCENTAGE); + if (CONVERT_TO_PERCENTAGE - (gml + bcul) + < (int)(CONVERT_TO_PERCENTAGE * + HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD)) { + throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds " + + "the threshold required for successful cluster operation. " + + "The combined value cannot exceed 0.8. Please check " + + "the settings for hbase.regionserver.global.memstore.size and " + + "hfile.block.cache.size in your configuration. " + + "hbase.regionserver.global.memstore.size is " + globalMemstoreSize + + " hfile.block.cache.size is " + blockCacheUpperLimit); + } + } + + /** + * Retrieve global memstore configured size as percentage of total heap. + * @param c + * @param logInvalid + */ + public static float getGlobalMemStoreHeapPercent(final Configuration c, + final boolean logInvalid) { + float limit = c.getFloat(MEMSTORE_SIZE_KEY, + c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE)); + if (limit > 0.8f || limit <= 0.0f) { + if (logInvalid) { + LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE + + " because supplied value outside allowed range of (0 -> 0.8]"); + } + limit = DEFAULT_MEMSTORE_SIZE; + } + return limit; + } + + /** + * Retrieve configured size for global memstore lower water mark as fraction of global memstore + * size. + */ + public static float getGlobalMemStoreHeapLowerMark(final Configuration conf, + boolean honorOldConfig) { + String lowMarkPercentStr = conf.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY); + if (lowMarkPercentStr != null) { + float lowMarkPercent = Float.parseFloat(lowMarkPercentStr); + if (lowMarkPercent > 1.0f) { + LOG.error("Bad configuration value for " + MEMSTORE_SIZE_LOWER_LIMIT_KEY + ": " + + lowMarkPercent + ". Using 1.0f instead."); + lowMarkPercent = 1.0f; + } + return lowMarkPercent; + } + if (!honorOldConfig) return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT; + String lowerWaterMarkOldValStr = conf.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY); + if (lowerWaterMarkOldValStr != null) { + LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use " + + MEMSTORE_SIZE_LOWER_LIMIT_KEY); + float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr); + float upperMarkPercent = getGlobalMemStoreHeapPercent(conf, false); + if (lowerWaterMarkOldVal > upperMarkPercent) { + lowerWaterMarkOldVal = upperMarkPercent; + LOG.error("Value of " + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " (" + lowerWaterMarkOldVal + + ") is greater than global memstore limit (" + upperMarkPercent + ") set by " + + MEMSTORE_SIZE_KEY + "/" + MEMSTORE_SIZE_OLD_KEY + ". Setting memstore lower limit " + + "to " + upperMarkPercent); + } + return lowerWaterMarkOldVal / upperMarkPercent; + } + return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT; + } + + /** + * @return Pair of global memstore size and memory type(ie. on heap or off heap). + */ + public static Pair getGlobalMemstoreSize(Configuration conf) { + long offheapMSGlobal = conf.getLong(OFFHEAP_MEMSTORE_SIZE_KEY, 0);// Size in MBs + if (offheapMSGlobal > 0) { + // Off heap memstore size has not relevance when MSLAB is turned OFF. We will go with making + // this entire size split into Chunks and pooling them in MemstoreLABPoool. We dont want to + // create so many on demand off heap chunks. In fact when this off heap size is configured, we + // will go with 100% of this size as the pool size + if (MemStoreLAB.isEnabled(conf)) { + // We are in offheap Memstore use + long globalMemStoreLimit = (long) (offheapMSGlobal * 1024 * 1024); // Size in bytes + return new Pair(globalMemStoreLimit, MemoryType.NON_HEAP); + } else { + // Off heap max memstore size is configured with turning off MSLAB. It makes no sense. Do a + // warn log and go with on heap memstore percentage. By default it will be 40% of Xmx + LOG.warn("There is no relevance of configuring '" + OFFHEAP_MEMSTORE_SIZE_KEY + "' when '" + + MemStoreLAB.USEMSLAB_KEY + "' is turned off." + + " Going with on heap global memstore size ('" + MEMSTORE_SIZE_KEY + "')"); + } + } + long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + float globalMemStorePercent = getGlobalMemStoreHeapPercent(conf, true); + return new Pair((long) (max * globalMemStorePercent), MemoryType.HEAP); + } + + /** + * Retrieve configured size for on heap block cache as percentage of total heap. + * @param conf + */ + public static float getBlockCacheHeapPercent(final Configuration conf) { + // L1 block cache is always on heap + float l1CachePercent = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + float l2CachePercent = getL2BlockCacheHeapPercent(conf); + return l1CachePercent + l2CachePercent; + } + + /** + * @param conf + * @return The on heap size for L2 block cache. + */ + public static float getL2BlockCacheHeapPercent(Configuration conf) { + float l2CachePercent = 0.0F; + String bucketCacheIOEngineName = conf.get(HConstants.BUCKET_CACHE_IOENGINE_KEY, null); + // L2 block cache can be on heap when IOEngine is "heap" + if (bucketCacheIOEngineName != null && bucketCacheIOEngineName.startsWith("heap")) { + float bucketCachePercentage = conf.getFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0F); + MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + l2CachePercent = bucketCachePercentage < 1 ? bucketCachePercentage + : (bucketCachePercentage * 1024 * 1024) / mu.getMax(); + } + return l2CachePercent; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java index d968ed9..94f3575 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -30,7 +31,7 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class Chunk { /** Actual underlying data */ - private byte[] data; + private ByteBuffer data; private static final int UNINITIALIZED = -1; private static final int OOM = -2; @@ -45,14 +46,16 @@ public class Chunk { /** Size of chunk in bytes */ private final int size; + private final boolean offheap; /** * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. * * @param size in bytes */ - Chunk(int size) { + Chunk(int size, boolean offheap) { this.size = size; + this.offheap = offheap; } /** @@ -64,7 +67,7 @@ public class Chunk { assert nextFreeOffset.get() == UNINITIALIZED; try { if (data == null) { - data = new byte[size]; + data = this.offheap? ByteBuffer.allocateDirect(this.size): ByteBuffer.allocate(this.size); } } catch (OutOfMemoryError e) { boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); @@ -109,7 +112,7 @@ public class Chunk { return -1; } - if (oldOffset + size > data.length) { + if (oldOffset + size > data.capacity()) { return -1; // alloc doesn't fit } @@ -126,14 +129,14 @@ public class Chunk { /** * @return This chunk's backing data. */ - byte[] getData() { + ByteBuffer getData() { return this.data; } @Override public String toString() { return "Chunk@" + System.identityHashCode(this) + " allocs=" + allocCount.get() + "waste=" - + (data.length - nextFreeOffset.get()); + + (data.capacity() - nextFreeOffset.get()); } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java index 1d237d0..1c7dfe2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java @@ -29,7 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult; import org.apache.hadoop.hbase.util.RollingStatCalculator; @@ -109,6 +109,9 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { private float globalMemStorePercentMaxRange; private float blockCachePercentMinRange; private float blockCachePercentMaxRange; + + private float globalMemStoreLimitLowMarkPercent; + // Store statistics about the corresponding parameters for memory tuning private RollingStatCalculator rollingStatsForCacheMisses; private RollingStatCalculator rollingStatsForFlushes; @@ -165,11 +168,9 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { newTuneDirection = StepDirection.NEUTRAL; } // Increase / decrease the memstore / block cahce sizes depending on new tuner step. - float globalMemstoreLowerMark = HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, - curMemstoreSize); // We don't want to exert immediate pressure on memstore. So, we decrease its size gracefully; // we set a minimum bar in the middle of the total memstore size and the lower limit. - float minMemstoreSize = ((globalMemstoreLowerMark + 1) * curMemstoreSize) / 2.00f; + float minMemstoreSize = ((globalMemStoreLimitLowMarkPercent + 1) * curMemstoreSize) / 2.00f; switch (newTuneDirection) { case INCREASE_BLOCK_CACHE_SIZE: @@ -365,9 +366,11 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT)); this.globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, - HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false)); + MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, - HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false)); + MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); + this.globalMemStoreLimitLowMarkPercent = MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf, + true); // Default value of periods to ignore is number of lookup periods this.numPeriodsToIgnore = conf.getInt(NUM_PERIODS_TO_IGNORE, this.tunerLookupPeriods); this.rollingStatsForCacheMisses = new RollingStatCalculator(this.tunerLookupPeriods); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6415672..dc56151 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6999,7 +6999,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder(); stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreDataSize.get() * 100) / this .memstoreFlushSize))); - stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100); + if (rsServices.getHeapMemoryManager() != null) { + stats.setHeapOccupancy( + (int) rsServices.getHeapMemoryManager().getHeapOccupancyPercent() * 100); + } stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 : (int)rsServices.getCompactionPressure()*100); return stats.build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8e78422..56fc6eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; +import java.lang.management.MemoryType; import java.lang.management.MemoryUsage; import java.lang.reflect.Constructor; import java.net.BindException; @@ -100,6 +101,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; @@ -170,6 +172,7 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JSONBean; import org.apache.hadoop.hbase.util.JvmPauseMonitor; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; @@ -516,6 +519,7 @@ public class HRegionServer extends HasThread implements super("RegionServer"); // thread name this.fsOk = true; this.conf = conf; + MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); HFile.checkHFileVersion(this.conf); checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); @@ -1451,6 +1455,8 @@ public class HRegionServer extends HasThread implements startServiceThreads(); startHeapMemoryManager(); + // Call it after starting HeapMemoryManager. + initializeMemStoreChunkPool(); LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x" + @@ -1470,16 +1476,34 @@ public class HRegionServer extends HasThread implements } } + private void initializeMemStoreChunkPool() { + if (MemStoreLAB.isEnabled(conf)) { + // MSLAB is enabled. So initialize MemStoreChunkPool + // By this time, the MemstoreFlusher is already initialized. We can get the global limits from + // it. + Pair pair = MemorySizeUtil.getGlobalMemstoreSize(conf); + long globalMemStoreSize = pair.getFirst(); + boolean offheap = pair.getSecond() == MemoryType.NON_HEAP; + // When off heap memstore in use, take full area for chunk pool. + float poolSizePercentage = offheap ? 1.0F + : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); + float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, + MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); + int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); + MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage, + initialCountPercentage, chunkSize, offheap); + if (pool != null && this.hMemManager != null) { + // Register with Heap Memory manager + this.hMemManager.registerTuneObserver(pool); + } + } + } + private void startHeapMemoryManager() { - this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, - this, this.regionServerAccounting); + this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this, + this.regionServerAccounting); if (this.hMemManager != null) { this.hMemManager.start(getChoreService()); - MemStoreChunkPool chunkPool = MemStoreChunkPool.getPool(this.conf); - if (chunkPool != null) { - // Register it as HeapMemoryTuneObserver - this.hMemManager.registerTuneObserver(chunkPool); - } } } @@ -3523,11 +3547,6 @@ public class HRegionServer extends HasThread implements } @Override - public HeapMemoryManager getHeapMemoryManager() { - return hMemManager; - } - - @Override public double getCompactionPressure() { double max = 0; for (Region region : onlineRegions.values()) { @@ -3541,6 +3560,11 @@ public class HRegionServer extends HasThread implements return max; } + @Override + public HeapMemoryManager getHeapMemoryManager() { + return hMemManager; + } + /** * For testing * @return whether all wal roll request finished for this regionserver diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java deleted file mode 100644 index 99b2bb6..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ /dev/null @@ -1,245 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.PooledChunk; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -/** - * A memstore-local allocation buffer. - *

- * The MemStoreLAB is basically a bump-the-pointer allocator that allocates - * big (2MB) byte[] chunks from and then doles it out to threads that request - * slices into the array. - *

- * The purpose of this class is to combat heap fragmentation in the - * regionserver. By ensuring that all KeyValues in a given memstore refer - * only to large chunks of contiguous memory, we ensure that large blocks - * get freed up when the memstore is flushed. - *

- * Without the MSLAB, the byte array allocated during insertion end up - * interleaved throughout the heap, and the old generation gets progressively - * more fragmented until a stop-the-world compacting collection occurs. - *

- * TODO: we should probably benchmark whether word-aligning the allocations - * would provide a performance improvement - probably would speed up the - * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached - * anyway - */ -@InterfaceAudience.Private -public class HeapMemStoreLAB implements MemStoreLAB { - - static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize"; - static final int CHUNK_SIZE_DEFAULT = 2048 * 1024; - static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation"; - static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through - // allocator - - static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class); - - private AtomicReference curChunk = new AtomicReference(); - // A queue of chunks from pool contained by this memstore LAB - // TODO: in the future, it would be better to have List implementation instead of Queue, - // as FIFO order is not so important here - @VisibleForTesting - BlockingQueue pooledChunkQueue = null; - private final int chunkSize; - private final int maxAlloc; - private final MemStoreChunkPool chunkPool; - - // This flag is for closing this instance, its set when clearing snapshot of - // memstore - private volatile boolean closed = false; - // This flag is for reclaiming chunks. Its set when putting chunks back to - // pool - private AtomicBoolean reclaimed = new AtomicBoolean(false); - // Current count of open scanners which reading data from this MemStoreLAB - private final AtomicInteger openScannerCount = new AtomicInteger(); - - // Used in testing - public HeapMemStoreLAB() { - this(new Configuration()); - } - - public HeapMemStoreLAB(Configuration conf) { - chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); - maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); - this.chunkPool = MemStoreChunkPool.getPool(conf); - // currently chunkQueue is only used for chunkPool - if (this.chunkPool != null) { - // set queue length to chunk pool max count to avoid keeping reference of - // too many non-reclaimable chunks - pooledChunkQueue = new LinkedBlockingQueue(chunkPool.getMaxCount()); - } - - // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! - Preconditions.checkArgument(maxAlloc <= chunkSize, - MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); - } - - - @Override - public Cell copyCellInto(Cell cell) { - int size = KeyValueUtil.length(cell); - Preconditions.checkArgument(size >= 0, "negative size"); - // Callers should satisfy large allocations directly from JVM since they - // don't cause fragmentation as badly. - if (size > maxAlloc) { - return null; - } - Chunk c = null; - int allocOffset = 0; - while (true) { - c = getOrMakeChunk(); - // Try to allocate from this chunk - allocOffset = c.alloc(size); - if (allocOffset != -1) { - // We succeeded - this is the common case - small alloc - // from a big buffer - break; - } - // not enough space! - // try to retire this chunk - tryRetireChunk(c); - } - return KeyValueUtil.copyCellTo(cell, c.getData(), allocOffset, size); - } - - /** - * Close this instance since it won't be used any more, try to put the chunks - * back to pool - */ - @Override - public void close() { - this.closed = true; - // We could put back the chunks to pool for reusing only when there is no - // opening scanner which will read their data - if (chunkPool != null && openScannerCount.get() == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); - } - } - - /** - * Called when opening a scanner on the data of this MemStoreLAB - */ - @Override - public void incScannerCount() { - this.openScannerCount.incrementAndGet(); - } - - /** - * Called when closing a scanner on the data of this MemStoreLAB - */ - @Override - public void decScannerCount() { - int count = this.openScannerCount.decrementAndGet(); - if (this.closed && chunkPool != null && count == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); - } - } - - /** - * Try to retire the current chunk if it is still - * c. Postcondition is that curChunk.get() - * != c - * @param c the chunk to retire - * @return true if we won the race to retire the chunk - */ - private void tryRetireChunk(Chunk c) { - curChunk.compareAndSet(c, null); - // If the CAS succeeds, that means that we won the race - // to retire the chunk. We could use this opportunity to - // update metrics on external fragmentation. - // - // If the CAS fails, that means that someone else already - // retired the chunk for us. - } - - /** - * Get the current chunk, or, if there is no current chunk, - * allocate a new one from the JVM. - */ - private Chunk getOrMakeChunk() { - while (true) { - // Try to get the chunk - Chunk c = curChunk.get(); - if (c != null) { - return c; - } - - // No current chunk, so we want to allocate one. We race - // against other allocators to CAS in an uninitialized chunk - // (which is cheap to allocate) - if (chunkPool != null) { - c = chunkPool.getChunk(); - } - boolean pooledChunk = false; - if (c != null) { - // This is chunk from pool - pooledChunk = true; - } else { - c = new Chunk(chunkSize); - } - if (curChunk.compareAndSet(null, c)) { - // we won race - now we need to actually do the expensive - // allocation step - c.init(); - if (pooledChunk) { - if (!this.closed && !this.pooledChunkQueue.offer((PooledChunk) c)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " - + pooledChunkQueue.size()); - } - } - } - return c; - } else if (pooledChunk) { - chunkPool.putbackChunk((PooledChunk) c); - } - // someone else won race - that's fine, we'll try to grab theirs - // in the next iteration of the loop. - } - } - - @VisibleForTesting - Chunk getCurrentChunk() { - return this.curChunk.get(); - } - - - BlockingQueue getPooledChunks() { - return this.pooledChunkQueue; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index 7646293..a2f546a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -36,13 +36,15 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; -import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; /** - * Manages tuning of Heap memory using HeapMemoryTuner. + * Manages tuning of Heap memory using HeapMemoryTuner. Most part of the heap memory is + * split between Memstores and BlockCache. This manager helps in tuning sizes of both these + * dynamically, as per the R/W load on the servers. */ @InterfaceAudience.Private public class HeapMemoryManager { @@ -91,7 +93,7 @@ public class HeapMemoryManager { private List tuneObservers = new ArrayList(); public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher, - Server server, RegionServerAccounting regionServerAccounting) { + Server server, RegionServerAccounting regionServerAccounting) { ResizableBlockCache l1Cache = CacheConfig.getL1(conf); if (l1Cache != null) { return new HeapMemoryManager(l1Cache, memStoreFlusher, server, regionServerAccounting); @@ -117,10 +119,10 @@ public class HeapMemoryManager { private boolean doInit(Configuration conf) { boolean tuningEnabled = true; - globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false); + globalMemStorePercent = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false); blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); - HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf); + MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(conf); // Initialize max and min range for memstore heap space globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercent); @@ -128,14 +130,14 @@ public class HeapMemoryManager { globalMemStorePercent); if (globalMemStorePercent < globalMemStorePercentMinRange) { LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent - + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY + + ", same value as " + MemorySizeUtil.MEMSTORE_SIZE_KEY + " because supplied value greater than initial memstore size value."); globalMemStorePercentMinRange = globalMemStorePercent; conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange); } if (globalMemStorePercent > globalMemStorePercentMaxRange) { LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent - + ", same value as " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY + + ", same value as " + MemorySizeUtil.MEMSTORE_SIZE_KEY + " because supplied value less than initial memstore size value."); globalMemStorePercentMaxRange = globalMemStorePercent; conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange); @@ -167,7 +169,7 @@ public class HeapMemoryManager { } int gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE); - this.l2BlockCachePercent = HeapMemorySizeUtil.getL2BlockCacheHeapPercent(conf); + this.l2BlockCachePercent = MemorySizeUtil.getL2BlockCacheHeapPercent(conf); int bcul = (int) ((blockCachePercentMinRange + l2BlockCachePercent) * CONVERT_TO_PERCENTAGE); if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) { throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds " @@ -340,7 +342,7 @@ public class HeapMemoryManager { if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) { LOG.info("Current heap configuration from HeapMemoryTuner exceeds " + "the threshold required for successful cluster operation. " - + "The combined value cannot exceed 0.8. " + HeapMemorySizeUtil.MEMSTORE_SIZE_KEY + + "The combined value cannot exceed 0.8. " + MemorySizeUtil.MEMSTORE_SIZE_KEY + " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is " + blockCacheSize); // TODO can adjust the value so as not exceed 80%. Is that correct? may be. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java index 430b642..9d7cf2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * add any more Cells into it. {@link #copyCellInto(Cell)} throws Exception */ @InterfaceAudience.Private -public class ImmutableMemStoreLAB implements MemStoreLAB { +public class ImmutableMemStoreLAB extends MemStoreLAB { private final AtomicInteger openScannerCount = new AtomicInteger(); private volatile boolean closed = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index db2cd18..eb38a55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.lang.management.ManagementFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -29,8 +28,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; import org.apache.hadoop.util.StringUtils; @@ -45,7 +42,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * collection on JVM. * * The pool instance is globally unique and could be obtained through - * {@link MemStoreChunkPool#getPool(Configuration)} + * {@link MemStoreChunkPool#initialize(long, float, float, int)} * * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called @@ -55,10 +52,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; @InterfaceAudience.Private public class MemStoreChunkPool implements HeapMemoryTuneObserver { private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); - final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize"; - final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize"; - final static float POOL_MAX_SIZE_DEFAULT = 1.0f; - final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f; // Static reference to the MemStoreChunkPool static MemStoreChunkPool GLOBAL_INSTANCE; @@ -78,15 +71,17 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { private static final int statThreadPeriod = 60 * 5; private final AtomicLong chunkCount = new AtomicLong(); private final AtomicLong reusedChunkCount = new AtomicLong(); + private final boolean offheap; - MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount, - int initialCount, float poolSizePercentage) { + MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage, + boolean offheap) { this.maxCount = maxCount; this.chunkSize = chunkSize; this.poolSizePercentage = poolSizePercentage; + this.offheap = offheap; this.reclaimedChunks = new LinkedBlockingQueue(); for (int i = 0; i < initialCount; i++) { - PooledChunk chunk = new PooledChunk(chunkSize); + PooledChunk chunk = new PooledChunk(chunkSize, this.offheap); chunk.init(); reclaimedChunks.add(chunk); } @@ -118,7 +113,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { while (true) { long created = this.chunkCount.get(); if (created < this.maxCount) { - chunk = new PooledChunk(chunkSize); + chunk = new PooledChunk(this.chunkSize, this.offheap); if (this.chunkCount.compareAndSet(created, created + 1)) { break; } @@ -191,51 +186,39 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { } /** - * @param conf * @return the global MemStoreChunkPool instance */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DC_DOUBLECHECK", - justification="Intentional") - static MemStoreChunkPool getPool(Configuration conf) { + static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage, + float initialCountPercentage, int chunkSize, boolean offheap) { if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; + if (chunkPoolDisabled) return null; - synchronized (MemStoreChunkPool.class) { - if (chunkPoolDisabled) return null; - if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; - // When MSLAB is turned OFF no need to init chunk pool at all. - if (!conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { - chunkPoolDisabled = true; - return null; - } - float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT); - if (poolSizePercentage <= 0) { - chunkPoolDisabled = true; - return null; - } - if (poolSizePercentage > 1.0) { - throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); - } - long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); - long globalMemStoreLimit = (long) (heapMax * HeapMemorySizeUtil.getGlobalMemStorePercent(conf, - false)); - int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY, - HeapMemStoreLAB.CHUNK_SIZE_DEFAULT); - int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize); - - float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY, - POOL_INITIAL_SIZE_DEFAULT); - if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { - throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY - + " must be between 0.0 and 1.0"); - } - - int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) - + ", max count " + maxCount + ", initial count " + initialCount); - GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, - poolSizePercentage); - return GLOBAL_INSTANCE; + if (poolSizePercentage <= 0) { + chunkPoolDisabled = true; + return null; } + if (poolSizePercentage > 1.0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); + } + int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize); + if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); + } + int initialCount = (int) (initialCountPercentage * maxCount); + LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) + + ", max count " + maxCount + ", initial count " + initialCount); + GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage, + offheap); + return GLOBAL_INSTANCE; + } + + /** + * @return The singleton instance of this pool. + */ + static MemStoreChunkPool getPool() { + return GLOBAL_INSTANCE; } int getMaxCount() { @@ -248,8 +231,8 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { } public static class PooledChunk extends Chunk { - PooledChunk(int size) { - super(size); + PooledChunk(int size, boolean offheap) { + super(size, offheap); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 2f4d225..15cf97c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -24,7 +24,7 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; -import java.lang.management.ManagementFactory; +import java.lang.management.MemoryType; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -49,11 +49,12 @@ import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; @@ -109,12 +110,21 @@ class MemStoreFlusher implements FlushRequester { this.conf = conf; this.server = server; this.threadWakeFrequency = - conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); - long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); - float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true); - this.globalMemStoreLimit = (long) (max * globalMemStorePercent); - this.globalMemStoreLimitLowMarkPercent = - HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent); + conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); + Pair pair = MemorySizeUtil.getGlobalMemstoreSize(conf); + this.globalMemStoreLimit = pair.getFirst(); + boolean onheap = pair.getSecond() == MemoryType.HEAP; + // When off heap memstore in use we configure the global off heap space for memstore as bytes + // not as % of max memory size. In such case, the lower water mark should be specified using the + // key "hbase.regionserver.global.memstore.size.lower.limit" which says % of the global upper + // bound and defaults to 95%. In on heap case also specifying this way is ideal. But in the past + // we used to take lower bound also as the % of xmx (38% as default). For backward compatibility + // for this deprecated config,we will fall back to read that config when new one is missing. + // Only for on heap case, do this fallback mechanism. For off heap it makes no sense. + // TODO When to get rid of the deprecated config? ie + // "hbase.regionserver.global.memstore.lowerLimit". Can get rid of this boolean passing then. + this.globalMemStoreLimitLowMarkPercent = MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf, + onheap); this.globalMemStoreLimitLowMark = (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); @@ -126,7 +136,7 @@ class MemStoreFlusher implements FlushRequester { + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1) + ", globalMemStoreLimitLowMark=" + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1) - + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1)); + + ", Offheap=" + !onheap); } public LongAdder getUpdatesBlockedMsHighWater() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index 706e243..425e213 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ReflectionUtils; /** * A memstore-local allocation buffer. @@ -42,30 +44,56 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; * @see MemStoreChunkPool */ @InterfaceAudience.Private -public interface MemStoreLAB { +public abstract class MemStoreLAB { - String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; - boolean USEMSLAB_DEFAULT = true; + public static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; + static final boolean USEMSLAB_DEFAULT = true; + static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; + + static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize"; + static final int CHUNK_SIZE_DEFAULT = 2048 * 1024; + static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation"; + static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through + // allocator + + // MSLAB pool related configs + final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize"; + final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize"; + final static float POOL_MAX_SIZE_DEFAULT = 1.0f; + final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f; /** * Allocates slice in this LAB and copy the passed Cell into this area. Returns new Cell instance * over the copied the data. When this MemStoreLAB can not copy this Cell, it returns null. */ - Cell copyCellInto(Cell cell); + public abstract Cell copyCellInto(Cell cell); /** * Close instance since it won't be used any more, try to put the chunks back to pool */ - void close(); + public abstract void close(); /** * Called when opening a scanner on the data of this MemStoreLAB */ - void incScannerCount(); + public abstract void incScannerCount(); /** * Called when closing a scanner on the data of this MemStoreLAB */ - void decScannerCount(); + public abstract void decScannerCount(); + + public static MemStoreLAB newInstance(Configuration conf) { + MemStoreLAB memStoreLAB = null; + if (isEnabled(conf)) { + String className = conf.get(MSLAB_CLASS_NAME, MemStoreLABImpl.class.getName()); + memStoreLAB = ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class }, new Object[] { conf }); + } + return memStoreLAB; + } + public static boolean isEnabled(Configuration conf) { + return conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java new file mode 100644 index 0000000..6f55974 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -0,0 +1,244 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.PooledChunk; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * A memstore-local allocation buffer. + *

+ * The MemStoreLAB is basically a bump-the-pointer allocator that allocates + * big (2MB) byte[] chunks from and then doles it out to threads that request + * slices into the array. + *

+ * The purpose of this class is to combat heap fragmentation in the + * regionserver. By ensuring that all Cells in a given memstore refer + * only to large chunks of contiguous memory, we ensure that large blocks + * get freed up when the memstore is flushed. + *

+ * Without the MSLAB, the byte array allocated during insertion end up + * interleaved throughout the heap, and the old generation gets progressively + * more fragmented until a stop-the-world compacting collection occurs. + *

+ * TODO: we should probably benchmark whether word-aligning the allocations + * would provide a performance improvement - probably would speed up the + * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached + * anyway. + * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}. + * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks, + * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be + * always on heap backed. + */ +@InterfaceAudience.Private +public class MemStoreLABImpl extends MemStoreLAB { + + static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class); + + private AtomicReference curChunk = new AtomicReference(); + // A queue of chunks from pool contained by this memstore LAB + // TODO: in the future, it would be better to have List implementation instead of Queue, + // as FIFO order is not so important here + @VisibleForTesting + BlockingQueue pooledChunkQueue = null; + private final int chunkSize; + private final int maxAlloc; + private final MemStoreChunkPool chunkPool; + + // This flag is for closing this instance, its set when clearing snapshot of + // memstore + private volatile boolean closed = false; + // This flag is for reclaiming chunks. Its set when putting chunks back to + // pool + private AtomicBoolean reclaimed = new AtomicBoolean(false); + // Current count of open scanners which reading data from this MemStoreLAB + private final AtomicInteger openScannerCount = new AtomicInteger(); + + // Used in testing + public MemStoreLABImpl() { + this(new Configuration()); + } + + public MemStoreLABImpl(Configuration conf) { + chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); + maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); + this.chunkPool = MemStoreChunkPool.getPool(); + // currently chunkQueue is only used for chunkPool + if (this.chunkPool != null) { + // set queue length to chunk pool max count to avoid keeping reference of + // too many non-reclaimable chunks + pooledChunkQueue = new LinkedBlockingQueue(chunkPool.getMaxCount()); + } + + // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! + Preconditions.checkArgument(maxAlloc <= chunkSize, + MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); + } + + + @Override + public Cell copyCellInto(Cell cell) { + int size = KeyValueUtil.length(cell); + Preconditions.checkArgument(size >= 0, "negative size"); + // Callers should satisfy large allocations directly from JVM since they + // don't cause fragmentation as badly. + if (size > maxAlloc) { + return null; + } + Chunk c = null; + int allocOffset = 0; + while (true) { + c = getOrMakeChunk(); + // Try to allocate from this chunk + allocOffset = c.alloc(size); + if (allocOffset != -1) { + // We succeeded - this is the common case - small alloc + // from a big buffer + break; + } + // not enough space! + // try to retire this chunk + tryRetireChunk(c); + } + return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size); + } + + /** + * Close this instance since it won't be used any more, try to put the chunks + * back to pool + */ + @Override + public void close() { + this.closed = true; + // We could put back the chunks to pool for reusing only when there is no + // opening scanner which will read their data + if (chunkPool != null && openScannerCount.get() == 0 + && reclaimed.compareAndSet(false, true)) { + chunkPool.putbackChunks(this.pooledChunkQueue); + } + } + + /** + * Called when opening a scanner on the data of this MemStoreLAB + */ + @Override + public void incScannerCount() { + this.openScannerCount.incrementAndGet(); + } + + /** + * Called when closing a scanner on the data of this MemStoreLAB + */ + @Override + public void decScannerCount() { + int count = this.openScannerCount.decrementAndGet(); + if (this.closed && chunkPool != null && count == 0 + && reclaimed.compareAndSet(false, true)) { + chunkPool.putbackChunks(this.pooledChunkQueue); + } + } + + /** + * Try to retire the current chunk if it is still + * c. Postcondition is that curChunk.get() + * != c + * @param c the chunk to retire + * @return true if we won the race to retire the chunk + */ + private void tryRetireChunk(Chunk c) { + curChunk.compareAndSet(c, null); + // If the CAS succeeds, that means that we won the race + // to retire the chunk. We could use this opportunity to + // update metrics on external fragmentation. + // + // If the CAS fails, that means that someone else already + // retired the chunk for us. + } + + /** + * Get the current chunk, or, if there is no current chunk, + * allocate a new one from the JVM. + */ + private Chunk getOrMakeChunk() { + while (true) { + // Try to get the chunk + Chunk c = curChunk.get(); + if (c != null) { + return c; + } + + // No current chunk, so we want to allocate one. We race + // against other allocators to CAS in an uninitialized chunk + // (which is cheap to allocate) + if (chunkPool != null) { + c = chunkPool.getChunk(); + } + boolean pooledChunk = false; + if (c != null) { + // This is chunk from pool + pooledChunk = true; + } else { + c = new Chunk(chunkSize, false);// When chunk is not from pool, always make it as on heap. + } + if (curChunk.compareAndSet(null, c)) { + // we won race - now we need to actually do the expensive + // allocation step + c.init(); + if (pooledChunk) { + if (!this.closed && !this.pooledChunkQueue.offer((PooledChunk) c)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " + + pooledChunkQueue.size()); + } + } + } + return c; + } else if (pooledChunk) { + chunkPool.putbackChunk((PooledChunk) c); + } + // someone else won race - that's fine, we'll try to grab theirs + // in the next iteration of the loop. + } + } + + @VisibleForTesting + Chunk getCurrentChunk() { + return this.curChunk.get(); + } + + + BlockingQueue getPooledChunks() { + return this.pooledChunkQueue; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java index fa8860a..01e07ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ReflectionUtils; import java.io.IOException; import java.util.ArrayList; @@ -35,8 +34,6 @@ import java.util.List; @InterfaceAudience.Private public final class SegmentFactory { - static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; - private SegmentFactory() {} private static SegmentFactory instance = new SegmentFactory(); @@ -47,7 +44,7 @@ public final class SegmentFactory { // create skip-list-based (non-flat) immutable segment from compacting old immutable segments public ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator, MemStoreSegmentsIterator iterator) { - return new ImmutableSegment(comparator, iterator, getMemStoreLAB(conf)); + return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf)); } // create new flat immutable segment from compacting old immutable segments @@ -57,7 +54,7 @@ public final class SegmentFactory { throws IOException { Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED, "wrong immutable segment type"); - MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf); return // the last parameter "false" means not to merge, but to compact the pipeline // in order to create the new segment @@ -77,7 +74,7 @@ public final class SegmentFactory { // create mutable segment public MutableSegment createMutableSegment(final Configuration conf, CellComparator comparator) { - MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf); return generateMutableSegment(conf, comparator, memStoreLAB); } @@ -103,16 +100,6 @@ public final class SegmentFactory { return new MutableSegment(set, comparator, memStoreLAB); } - private MemStoreLAB getMemStoreLAB(Configuration conf) { - MemStoreLAB memStoreLAB = null; - if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { - String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); - memStoreLAB = ReflectionUtils.instantiateWithCustomCtor(className, - new Class[] { Configuration.class }, new Object[] { conf }); - } - return memStoreLAB; - } - private MemStoreLAB getMergedMemStoreLAB(Configuration conf, List segments) { List mslabs = new ArrayList(); for (ImmutableSegment segment : segments) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index aa57881..e7c4f98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; -import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -386,8 +386,7 @@ public abstract class AbstractFSWAL implements WAL { this.logrollsize = (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); - float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, conf.getFloat( - HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE)); + float memstoreRatio = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false); boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; if (maxLogsDefined) { LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java index 62506ad..3b4d068 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -44,7 +44,6 @@ public class TestCellFlatSet extends TestCase { private Cell descCells[]; private CellArrayMap descCbOnHeap; private final static Configuration CONF = new Configuration(); - private HeapMemStoreLAB mslab; private KeyValue lowerOuterCell; private KeyValue upperOuterCell; @@ -73,9 +72,8 @@ public class TestCellFlatSet extends TestCase { descCells = new Cell[] {kv4,kv3,kv2,kv1}; descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true); CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); - CONF.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); + CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); MemStoreChunkPool.chunkPoolDisabled = false; - mslab = new HeapMemStoreLAB(CONF); } /* Create and test CellSet based on CellArrayMap */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 4f2b12f..d1bbd50 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.List; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -87,7 +89,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { super.internalSetUp(); Configuration conf = new Configuration(); conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); - conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); + conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf); HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); @@ -95,7 +97,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore { this.regionServicesForStores = region.getRegionServicesForStores(); this.store = new HStore(region, hcd, conf); - chunkPool = MemStoreChunkPool.getPool(conf); + long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() + .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); + chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, + MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); assertTrue(chunkPool != null); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 433388d..27ed295 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -132,9 +132,9 @@ public class TestDefaultMemStore { // make sure memstore size increased even when writing the same cell, if using MSLAB assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize()); // make sure chunk size increased even when writing the same cell, if using MSLAB - if (msLab instanceof HeapMemStoreLAB) { + if (msLab instanceof MemStoreLABImpl) { assertEquals(2 * Segment.getCellLength(kv), - ((HeapMemStoreLAB) msLab).getCurrentChunk().getNextFreeOffset()); + ((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset()); } } else { // make sure no memstore size change w/o MSLAB diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 5f42a03..f620eb0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; -import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -62,7 +62,7 @@ public class TestHeapMemoryManager { @Test public void testAutoTunerShouldBeOffWhenMaxMinRangesForMemstoreIsNotGiven() throws Exception { Configuration conf = HBaseConfiguration.create(); - conf.setFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, 0.02f); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.02f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.03f); HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0), @@ -228,7 +228,7 @@ public class TestHeapMemoryManager { blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); regionServerAccounting.setTestMemstoreSize(0); Configuration conf = HBaseConfiguration.create(); - conf.setFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); @@ -462,7 +462,7 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); - conf.setFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, 0.4F); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.4F); conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.3F); conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0.1F); conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "heap"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 5779dbc..e2ba169 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -32,6 +33,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.List; import java.util.Random; @@ -50,10 +52,13 @@ public class TestMemStoreChunkPool { @BeforeClass public static void setUpBeforeClass() throws Exception { conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); - conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); + conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; MemStoreChunkPool.chunkPoolDisabled = false; - chunkPool = MemStoreChunkPool.getPool(conf); + long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() + .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); + chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, + MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); assertTrue(chunkPool != null); } @@ -70,7 +75,7 @@ public class TestMemStoreChunkPool { @Test public void testReusingChunks() { Random rand = new Random(); - MemStoreLAB mslab = new HeapMemStoreLAB(conf); + MemStoreLAB mslab = new MemStoreLABImpl(conf); int expectedOff = 0; byte[] lastBuffer = null; final byte[] rk = Bytes.toBytes("r1"); @@ -96,7 +101,7 @@ public class TestMemStoreChunkPool { int chunkCount = chunkPool.getPoolSize(); assertTrue(chunkCount > 0); // reconstruct mslab - mslab = new HeapMemStoreLAB(conf); + mslab = new MemStoreLABImpl(conf); // chunk should be got from the pool, so we can reuse it. KeyValue kv = new KeyValue(rk, cf, q, new byte[10]); mslab.copyCellInto(kv); @@ -209,7 +214,7 @@ public class TestMemStoreChunkPool { final int initialCount = 5; final int chunkSize = 30; final int valSize = 7; - MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, 1); + MemStoreChunkPool pool = new MemStoreChunkPool(chunkSize, maxCount, initialCount, 1, false); assertEquals(initialCount, pool.getPoolSize()); assertEquals(maxCount, pool.getMaxCount()); MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created. @@ -221,7 +226,7 @@ public class TestMemStoreChunkPool { Runnable r = new Runnable() { @Override public void run() { - MemStoreLAB memStoreLAB = new HeapMemStoreLAB(conf); + MemStoreLAB memStoreLAB = new MemStoreLABImpl(conf); for (int i = 0; i < maxCount; i++) { memStoreLAB.copyCellInto(kv);// Try allocate size = chunkSize. Means every // allocate call will result in a new chunk diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 5194fc3..082dfbf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.*; +import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -33,9 +34,11 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.Iterables; @@ -48,17 +51,27 @@ import org.junit.experimental.categories.Category; @Category({RegionServerTests.class, SmallTests.class}) public class TestMemStoreLAB { + private final static Configuration conf = new Configuration(); + private static final byte[] rk = Bytes.toBytes("r1"); private static final byte[] cf = Bytes.toBytes("f"); private static final byte[] q = Bytes.toBytes("q"); + @BeforeClass + public static void setUpBeforeClass() throws Exception { + long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() + .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); + MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, + MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); + } + /** * Test a bunch of random allocations */ @Test public void testLABRandomAllocation() { Random rand = new Random(); - MemStoreLAB mslab = new HeapMemStoreLAB(); + MemStoreLAB mslab = new MemStoreLABImpl(); int expectedOff = 0; byte[] lastBuffer = null; // 100K iterations by 0-1K alloc -> 50MB expected @@ -82,7 +95,7 @@ public class TestMemStoreLAB { @Test public void testLABLargeAllocation() { - MemStoreLAB mslab = new HeapMemStoreLAB(); + MemStoreLAB mslab = new MemStoreLABImpl(); KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]); Cell newCell = mslab.copyCellInto(kv); assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell); @@ -100,7 +113,7 @@ public class TestMemStoreLAB { final AtomicInteger totalAllocated = new AtomicInteger(); - final MemStoreLAB mslab = new HeapMemStoreLAB(); + final MemStoreLAB mslab = new MemStoreLABImpl(); List> allocations = Lists.newArrayList(); for (int i = 0; i < 10; i++) { @@ -170,21 +183,21 @@ public class TestMemStoreLAB { */ @Test public void testLABChunkQueue() throws Exception { - HeapMemStoreLAB mslab = new HeapMemStoreLAB(); + MemStoreLABImpl mslab = new MemStoreLABImpl(); // by default setting, there should be no chunks initialized in the pool assertTrue(mslab.getPooledChunks().isEmpty()); // reset mslab with chunk pool Configuration conf = HBaseConfiguration.create(); - conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1); + conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1); // set chunk size to default max alloc size, so we could easily trigger chunk retirement - conf.setLong(HeapMemStoreLAB.CHUNK_SIZE_KEY, HeapMemStoreLAB.MAX_ALLOC_DEFAULT); + conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); // reconstruct mslab MemStoreChunkPool.clearDisableFlag(); - mslab = new HeapMemStoreLAB(conf); + mslab = new MemStoreLABImpl(conf); // launch multiple threads to trigger frequent chunk retirement List threads = new ArrayList(); final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), - new byte[HeapMemStoreLAB.MAX_ALLOC_DEFAULT - 24]); + new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 24]); for (int i = 0; i < 10; i++) { threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv)); } @@ -214,7 +227,7 @@ public class TestMemStoreLAB { + " after mslab closed but actually: " + queueLength, queueLength == 0); } - private Thread getChunkQueueTestThread(final HeapMemStoreLAB mslab, String threadName, + private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, Cell cellToCopyInto) { Thread thread = new Thread() { boolean stopped = false;