.../java/org/apache/hadoop/hbase/KeyValueUtil.java | 43 +++ .../org/apache/hadoop/hbase/MemstoreChunkCell.java | 39 +++ .../apache/hadoop/hbase/NoTagsOffheapKeyValue.java | 46 +++ .../hadoop/hbase/OffheapMemstoreChunkCell.java | 282 ++++++++++++++++++ .../hadoop/hbase/OnheapMemstoreChunkCell.java | 324 +++++++++++++++++++++ .../hadoop/hbase/io/ByteBufferPoolManager.java | 217 ++++++++++++++ .../hadoop/hbase/regionserver/CellChunkMap.java | 109 +++++++ .../apache/hadoop/hbase/regionserver/Chunk.java | 12 +- .../hadoop/hbase/regionserver/HRegionServer.java | 4 +- .../hadoop/hbase/regionserver/HeapMemStoreLAB.java | 30 +- .../hbase/regionserver/MemStoreChunkPool.java | 23 +- .../hadoop/hbase/regionserver/MemStoreLAB.java | 11 +- .../regionserver/MemstoreLABChunkCreator.java | 77 +++++ 13 files changed, 1204 insertions(+), 13 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 76f4147..806afca 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -204,6 +204,49 @@ public class KeyValueUtil { return buffer; } + public static Cell cloneTo(Cell cell, ByteBuffer buf, int pos) { + int len = KeyValueUtil.length(cell); + if (cell instanceof ExtendedCell) { + ((ExtendedCell) cell).write(buf, pos); + } else { + appendToByteBuffer(cell, buf, pos, true); + } + if (buf.hasArray()) { + KeyValue newKv = new KeyValue(buf.array(), buf.arrayOffset() + pos, len); + newKv.setSequenceId(cell.getSequenceId()); + return newKv; + } + if (cell.getTagsLength() == 0) { + return new NoTagsOffheapKeyValue(buf, pos, len, cell.getSequenceId()); + } + return new OffheapKeyValue(buf, pos, len, true, cell.getSequenceId()); + } + + // May not be right to be in CellUtil. + @InterfaceAudience.Private + public static Cell cloneToMemstoreChunkCell(Cell cell, ByteBuffer buf, int pos, int chunkId) { + int initPos = pos; + int len = KeyValueUtil.length(cell); + if (cell instanceof ExtendedCell) { + ((ExtendedCell) cell).write(buf, pos); + pos += len; + } else { + pos = appendToByteBuffer(cell, buf, pos, true); + } + // Add the seqid and chunk id + pos = ByteBufferUtils.putLong(buf, pos, cell.getSequenceId()); + pos = ByteBufferUtils.putInt(buf, pos, chunkId); + if (buf.hasArray()) { + // add the seqid and chunkid to this + // TODO : Add Tags optimized version + Cell newKv = new OnheapMemstoreChunkCell(buf.array(), buf.arrayOffset() + initPos, + len + MemstoreChunkCell.EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + return newKv; + } + // add the seqid and chunkid to this + return new OffheapMemstoreChunkCell(buf, initPos, len + MemstoreChunkCell.EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + } + public static void appendToByteBuffer(final ByteBuffer bb, final KeyValue kv, final boolean includeMvccVersion) { // keep pushing the limit out. assume enough capacity diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/MemstoreChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/MemstoreChunkCell.java new file mode 100644 index 0000000..6d48ce9 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/MemstoreChunkCell.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +/** + * Indicates that the cell that will be implementing this was created out of the + * MemstoreChunk and hence it will have the ChunkId and the offset at which the + * cell was written + */ +@InterfaceAudience.Private +public interface MemstoreChunkCell { + static final int FIXED_HEAP_SIZE_OVERHEAD = + ClassSize.OBJECT + ClassSize.REFERENCE + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT; + static final int EXTRA_BYTES_FOR_SEQ_CHUNK_IDS = Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; + + public int getChunkId(); + + public int getOffset(); + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsOffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsOffheapKeyValue.java new file mode 100644 index 0000000..26aa89d --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsOffheapKeyValue.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +@InterfaceAudience.Private +public class NoTagsOffheapKeyValue extends OffheapKeyValue { + + public NoTagsOffheapKeyValue(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, false, seqId); + } + + @Override + public int getTagsLength() { + return 0; + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + ByteBufferUtils.putInt(out, this.length); + ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, this.length); + return this.length + Bytes.SIZEOF_INT; + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapMemstoreChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapMemstoreChunkCell.java new file mode 100644 index 0000000..5aeac08 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapMemstoreChunkCell.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +@InterfaceAudience.Private +public class OffheapMemstoreChunkCell extends ByteBufferedCell + implements HeapSize, ExtendedCell, MemstoreChunkCell { + + private ByteBuffer buf; + private int offset = -1; + private int length = -1; + // May be we can remove this? + private short rowLen = -1; + + public OffheapMemstoreChunkCell(ByteBuffer buf, int offset, int length) { + this.buf = buf; + this.offset = offset; + // This len is inclusive of the chunkid + this.length = length; + rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET); + } + + @Override + public byte[] getRowArray() { + return CellUtil.cloneRow(this); + } + + @Override + public int getRowOffset() { + return 0; + } + + @Override + public short getRowLength() { + return rowLen; + } + + @Override + public byte[] getFamilyArray() { + return CellUtil.cloneFamily(this); + } + + @Override + public int getFamilyOffset() { + return 0; + } + + @Override + public byte getFamilyLength() { + return getFamilyLength(getFamilyLengthPosition()); + } + + private int getFamilyLengthPosition() { + return this.offset + KeyValue.ROW_KEY_OFFSET + getRowLength(); + } + + private byte getFamilyLength(int famLenPos) { + return ByteBufferUtils.toByte(this.buf, famLenPos); + } + + @Override + public byte[] getQualifierArray() { + return CellUtil.cloneQualifier(this); + } + + @Override + public int getQualifierOffset() { + return 0; + } + + @Override + public int getQualifierLength() { + return getQualifierLength(getRowLength(), getFamilyLength()); + } + + private int getQualifierLength(int rlength, int flength) { + return getKeyLength() - (int) KeyValue.getKeyDataStructureSize(rlength, flength, 0); + } + + @Override + public long getTimestamp() { + int offset = getTimestampOffset(getKeyLength()); + return ByteBufferUtils.toLong(this.buf, offset); + } + + private int getTimestampOffset(int keyLen) { + return this.offset + KeyValue.ROW_OFFSET + keyLen - KeyValue.TIMESTAMP_TYPE_SIZE; + } + + @Override + public byte getTypeByte() { + return ByteBufferUtils.toByte(this.buf, this.offset + getKeyLength() - 1 + KeyValue.ROW_OFFSET); + } + + @Override + public long getSequenceId() { + return ByteBufferUtils.toLong(this.buf, + ((this.offset + this.length) - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS)); + } + + @Override + public byte[] getValueArray() { + return CellUtil.cloneValue(this); + } + + @Override + public int getValueOffset() { + return 0; + } + + @Override + public int getValueLength() { + return ByteBufferUtils.toInt(this.buf, this.offset + Bytes.SIZEOF_INT); + } + + @Override + public byte[] getTagsArray() { + return CellUtil.cloneTags(this); + } + + @Override + public int getTagsOffset() { + return 0; + } + + @Override + public int getTagsLength() { + int tagsLen = this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS + - (getKeyLength() + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + if (tagsLen > 0) { + // There are some Tag bytes in the byte[]. So reduce 2 bytes which is + // added to denote the tags + // length + tagsLen -= KeyValue.TAGS_LENGTH_SIZE; + } + return tagsLen; + } + + protected int getKeyLength() { + return ByteBufferUtils.toInt(this.buf, this.offset); + } + + @Override + public ByteBuffer getRowByteBuffer() { + return this.buf; + } + + @Override + public int getRowPosition() { + return this.offset + KeyValue.ROW_KEY_OFFSET; + } + + @Override + public ByteBuffer getFamilyByteBuffer() { + return this.buf; + } + + @Override + public int getFamilyPosition() { + return getFamilyLengthPosition() + Bytes.SIZEOF_BYTE; + } + + @Override + public ByteBuffer getQualifierByteBuffer() { + return this.buf; + } + + @Override + public int getQualifierPosition() { + return getFamilyPosition() + getFamilyLength(); + } + + @Override + public ByteBuffer getValueByteBuffer() { + return this.buf; + } + + @Override + public int getValuePosition() { + return this.offset + KeyValue.ROW_OFFSET + getKeyLength(); + } + + @Override + public ByteBuffer getTagsByteBuffer() { + return this.buf; + } + + @Override + public int getTagsPosition() { + int tagsLen = getTagsLength(); + if (tagsLen == 0) { + return this.offset + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + return this.offset + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS - tagsLen; + } + + @Override + public long heapSize() { + // Will change once HBASE-16747 comes in + return ClassSize.align(FIXED_HEAP_SIZE_OVERHEAD + ClassSize.align(length)); + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any + // changes doing here, pls check KeyValueUtil#oswrite also and do necessary changes. + int length = this.length; + if (!withTags) { + length = getKeyLength() + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; + } else { + length -= EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + ByteBufferUtils.putInt(out, length); + ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length); + return length + Bytes.SIZEOF_INT; + } + + @Override + public String toString() { + return CellUtil.toString(this, true); + } + + @Override + public void setTimestamp(byte[] ts, int tsOffset) throws IOException { + ByteBufferUtils.copyFromArrayToBuffer(buf, getTimestampOffset(getKeyLength()), ts, tsOffset, + Bytes.SIZEOF_LONG); + } + + @Override + public void write(ByteBuffer buf, int pos) { + ByteBufferUtils.copyFromBufferToBuffer(this.buf, buf, this.offset, pos, + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + } + + public int getChunkId() { + return ByteBufferUtils.toInt(this.buf, ((this.offset + this.length) - Bytes.SIZEOF_INT)); + } + + public int getOffset() { + return this.offset; + } + + @Override + public void setSequenceId(long seqId) throws IOException { + // should we do this + } + + @Override + public int getSerializedSize(boolean withTags) { + return this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + + @Override + public void write(byte[] buf, int offset) { + ByteBufferUtils.copyFromBufferToArray(buf, this.buf, this.offset, offset, + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + } + + @Override + public void setTimestamp(long ts) throws IOException { + ByteBufferUtils.putLong(buf, getTimestampOffset(getKeyLength()), ts); + + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OnheapMemstoreChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OnheapMemstoreChunkCell.java new file mode 100644 index 0000000..1af5050 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OnheapMemstoreChunkCell.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +@InterfaceAudience.Private +public class OnheapMemstoreChunkCell implements Cell, HeapSize, ExtendedCell, MemstoreChunkCell { + + private byte[] bytes; + private int offset = -1; + private int length = -1; + // May be we can remove this? + private short rowLen = -1; + + public OnheapMemstoreChunkCell(byte[] bytes, int offset, int length) { + this.bytes = bytes; + this.offset = offset; + // This len is inclusive of the chunkid + this.length = length; + rowLen = Bytes.toShort(this.bytes, this.offset + KeyValue.ROW_OFFSET); + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getRowArray() { + return bytes; + } + + /** + * @return Row offset + */ + @Override + public int getRowOffset() { + return this.offset + KeyValue.ROW_KEY_OFFSET; + } + + /** + * @return Row length + */ + @Override + public short getRowLength() { + return rowLen; + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getFamilyArray() { + return bytes; + } + + /** + * @return Family offset + */ + @Override + public int getFamilyOffset() { + return getFamilyOffset(getRowLength()); + } + + /** + * @return Family offset + */ + private int getFamilyOffset(int rlength) { + return this.offset + KeyValue.ROW_KEY_OFFSET + rlength + Bytes.SIZEOF_BYTE; + } + + /** + * @return Family length + */ + @Override + public byte getFamilyLength() { + return getFamilyLength(getFamilyOffset()); + } + + /** + * @return Family length + */ + public byte getFamilyLength(int foffset) { + return this.bytes[foffset - 1]; + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getQualifierArray() { + return bytes; + } + + /** + * @return Qualifier offset + */ + @Override + public int getQualifierOffset() { + return getQualifierOffset(getFamilyOffset()); + } + + /** + * @return Qualifier offset + */ + private int getQualifierOffset(int foffset) { + return foffset + getFamilyLength(foffset); + } + + /** + * @return Qualifier length + */ + @Override + public int getQualifierLength() { + return getQualifierLength(getRowLength(), getFamilyLength()); + } + + /** + * @return Qualifier length + */ + private int getQualifierLength(int rlength, int flength) { + return getKeyLength() - (int) KeyValue.getKeyDataStructureSize(rlength, flength, 0); + } + + /** + * @return Timestamp offset + */ + public int getTimestampOffset() { + return getTimestampOffset(getKeyLength()); + } + + /** + * @return Length of key portion. + */ + public int getKeyLength() { + return Bytes.toInt(this.bytes, this.offset); + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getValueArray() { + return bytes; + } + + /** + * @return the value offset + */ + @Override + public int getValueOffset() { + int voffset = getKeyOffset() + getKeyLength(); + return voffset; + } + + public int getKeyOffset() { + return this.offset + KeyValue.ROW_OFFSET; + } + + /** + * @return Value length + */ + @Override + public int getValueLength() { + int vlength = Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT); + return vlength; + } + + /** + * @param keylength Pass if you have it to save on a int creation. + * @return Timestamp offset + */ + private int getTimestampOffset(final int keylength) { + return getKeyOffset() + keylength - KeyValue.TIMESTAMP_TYPE_SIZE; + } + + @Override + public void setTimestamp(long ts) { + Bytes.putBytes(this.bytes, this.getTimestampOffset(), Bytes.toBytes(ts), 0, Bytes.SIZEOF_LONG); + } + + @Override + public void setTimestamp(byte[] ts, int tsOffset) { + Bytes.putBytes(this.bytes, this.getTimestampOffset(), ts, tsOffset, Bytes.SIZEOF_LONG); + } + + /** + * This returns the offset where the tag actually starts. + */ + @Override + public int getTagsOffset() { + int tagsLen = getTagsLength(); + if (tagsLen == 0) { + return this.offset + this.length - -EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + return this.offset + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS - tagsLen; + } + + /** + * This returns the total length of the tag bytes + */ + @Override + public int getTagsLength() { + int tagsLen = this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS + - (getKeyLength() + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + if (tagsLen > 0) { + // There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags + // length + tagsLen -= KeyValue.TAGS_LENGTH_SIZE; + } + return tagsLen; + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getTagsArray() { + return bytes; + } + + @Override + public void write(ByteBuffer buf, int pos) { + ByteBufferUtils.copyFromArrayToBuffer(buf, pos, this.bytes, this.offset, + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls + // check KeyValueUtil#oswrite also and do necessary changes. + int length = this.length; + if (!withTags) { + length = this.getKeyLength() + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; + } else { + length -= EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + ByteBufferUtils.putInt(out, length); + out.write(this.bytes, this.offset, length); + return length + Bytes.SIZEOF_INT; + } + + public int getChunkId() { + return Bytes.toInt(this.bytes, ((this.offset + this.length) - Bytes.SIZEOF_INT)); + } + + public int getOffset() { + return this.offset; + } + + @Override + public long heapSize() { + // Will change once HBASE-16747 comes in + return ClassSize.align( + FIXED_HEAP_SIZE_OVERHEAD + ClassSize.align(ClassSize.ARRAY) + ClassSize.align(length)); + } + + @Override + public long getTimestamp() { + return getTimestamp(getKeyLength()); + } + + /** + * @param keylength Pass if you have it to save on a int creation. + * @return Timestamp + */ + long getTimestamp(final int keylength) { + int tsOffset = getTimestampOffset(keylength); + return Bytes.toLong(this.bytes, tsOffset); + } + + @Override + public byte getTypeByte() { + return this.bytes[this.offset + getKeyLength() - 1 + KeyValue.ROW_OFFSET]; + } + + @Override + public long getSequenceId() { + return Bytes.toLong(this.bytes, ((this.offset + this.length) - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS)); + } + + @Override + public String toString() { + return CellUtil.toString(this, true); + } + + @Override + public void setSequenceId(long seqId) throws IOException { + // should we do it? + } + + @Override + public int getSerializedSize(boolean withTags) { + return this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + + @Override + public void write(byte[] buf, int offset) { + System.arraycopy(this.bytes, this.offset, buf, offset, + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPoolManager.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPoolManager.java new file mode 100644 index 0000000..0c1b70f --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPoolManager.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +@InterfaceAudience.Private +/** + * Manages the byteBuffer pool and helps creating and releasing back the buffers to the pool/ + * TODO : Check if it can be used with reqBuffs also + */ +public class ByteBufferPoolManager { + private static final Log LOG = LogFactory.getLog(ByteBufferPoolManager.class); + + private ByteBufferPool pool; + + // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If + // it is not available will make a new one our own and keep writing to that. We keep track of all + // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure + // to return back all of them to pool + protected List allBufs = new ArrayList(); + protected List bufsFromPool = new ArrayList(); + + private boolean lastBufFlipped = false; + + private ByteBuffer curBuf; + + /** + * @param bufferSize Size of each buffer created by this pool. + * @param maxPoolSize Max number of buffers to keep in this pool. + * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer. + */ + // TODO : Better to pass config and then instantiate the pool here + public ByteBufferPoolManager(ByteBufferPool pool) { + this.pool = pool; + getNewBuffer(); + } + + public int getBufferSize() { + // Ok to be here? or in the manager + return this.pool.getBufferSize(); + } + + /** + * Creates a new byte buffer + * @param length if length is non negative allocates an onheap BB of the specified length + * @return + */ + private ByteBuffer getNewBuffer() { + // Get an initial BB to work with from the pool + if (curBuf != null) { + curBuf.flip(); + } + ByteBuffer buffer; + buffer = this.pool.getBuffer(); + if (buffer == null) { + // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off + // heap BB on demand. It is difficult to account for all such and so proper sizing of Max + // direct heap size. See HBASE-15525 also for more details. + // Make BB with same size of pool's buffer size. + buffer = ByteBuffer.allocate(getBufferSize()); + } else { + this.bufsFromPool.add(buffer); + } + this.allBufs.add(buffer); + this.curBuf = buffer; + return buffer; + } + + /** + * Returns the current active byte buffer + * TODO : Should we have both APIs for getNewBB or + * getCurrentBB + * @return + */ + public ByteBuffer getCurrentBuffer() { + return this.curBuf; + } + + public void checkSizeAndGrow(int extra) { + // this also to go inside Pool manager + long capacityNeeded = curBuf.position() + (long) extra; + if (capacityNeeded > curBuf.limit()) { + getNewBuffer(); + } + } + + /** + * Writes the given buffer's data to the buffers managed by this buffer pool + * @param b + * @param off + * @param len + */ + public void write(byte[] b, int off, int len) { + int toWrite = 0; + while (len > 0) { + toWrite = Math.min(len, this.curBuf.remaining()); + ByteBufferUtils.copyFromArrayToBuffer(this.curBuf, b, off, toWrite); + off += toWrite; + len -= toWrite; + if (len > 0) { + getNewBuffer();// The curBuf is over. Let us move to the next one + } + } + } + + public int size() { + int s = 0; + for (int i = 0; i < this.getByteBuffers().size() - 1; i++) { + s += this.getByteBuffers().get(i).remaining(); + } + // On the last BB, it might not be flipped yet if getByteBuffers is not yet called + if (this.lastBufFlipped) { + s += this.curBuf.remaining(); + } else { + s += this.curBuf.position(); + } + return s; + } + + /** + * Writes the given buffer's data to the buffers managed by this buffer pool + * @param b + * @param off + * @param len + * @throws IOException + */ + public void write(ByteBuffer b, int off, int len) throws IOException { + int toWrite = 0; + while (len > 0) { + toWrite = Math.min(len, this.curBuf.remaining()); + ByteBufferUtils.copyFromBufferToBuffer(b, this.curBuf, off, toWrite); + off += toWrite; + len -= toWrite; + if (len > 0) { + getNewBuffer();// The curBuf is over. Let us move to the next one + } + } + } + + /** + * Release the resources it uses (The ByteBuffers) which are obtained from pool. Call this only + * when all the data is fully used. And it must be called at the end of usage else we will leak + * ByteBuffers from pool. + */ + public void releaseResources() { + // Return back all the BBs to pool + if (this.bufsFromPool != null) { + for (int i = 0; i < this.bufsFromPool.size(); i++) { + this.pool.putbackBuffer(this.bufsFromPool.get(i)); + } + this.bufsFromPool = null; + } + this.allBufs = null; + } + + /** + * Returns the set of buffers that were allocated + * @return + */ + // Ideally this should also be ByteBuff type + public List getByteBuffers() { + if (!this.lastBufFlipped) { + this.lastBufFlipped = true; + // All the other BBs are already flipped while moving to the new BB. + curBuf.flip(); + } + return this.allBufs; + } + + /** + * Allocates a set of buffers such that the buffers allocated could be used + * to write data for the given length + * @param length + * @return + */ + public ByteBuff createByteBuff(int length) { + int remain = length; + while (remain > 0 && (getNewBuffer()) != null) { + remain -= getBufferSize(); + } + // all are created. + if(allBufs.size() == 1) { + return new SingleByteBuff(allBufs.get(0)); + } else { + ByteBuffer[] allItems = new ByteBuffer[allBufs.size()]; + return new MultiByteBuff(allBufs.toArray(allItems)); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java new file mode 100644 index 0000000..c17a9af --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Cellersion 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY CellIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.nio.ByteBuffer; +import java.util.Comparator; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.OffheapMemstoreChunkCell; +import org.apache.hadoop.hbase.OnheapMemstoreChunkCell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * CellChunkMap is a byte array holding all that is needed to access a Cell, which + * is actually saved on another deeper byte array. + * Per Cell we have a reference to this deeper byte array B, offset in bytes in B (integer), + * and length in bytes in B (integer). In order to save reference to byte array we use the Chunk's + * indexes given by MSLAB (also integer). + * + * The CellChunkMap memory layout relevant to a deeper byte array B: + * + * <----------------- first Cell ---------------------> <-------------- second Cell --- ... + * ------------------------------------------------------------------------------------- ... + * | integer = x bytes | integer = x bytes | integer = x bytes | integer = x bytes | + * | reference to B | offset in B where | length of Cell's | reference to may be| ... + * | holding Cell data | Cell's data starts| data in B | another byte array | + * ------------------------------------------------------------------------------------- ... + */ +@InterfaceAudience.Private +public class CellChunkMap extends CellFlatMap { + private final Chunk[] chunks; + private final int numOfCellsInsideChunk; + private final MemStoreLAB memStoreLAB; + public static final int BYTES_IN_CELL = 3*(Integer.SIZE / Byte.SIZE); // each Cell requires 3 integers + + /* C-tor for increasing map starting from index zero */ + /* The given Cell array on given Chunk array must be ordered. */ + public CellChunkMap(Comparator comparator, + MemStoreLAB memStoreLAB, Chunk[] chunks, int max, int chunkSize) { + super(comparator, 0, max, false); + this.memStoreLAB = memStoreLAB; + this.chunks = chunks; + this.numOfCellsInsideChunk = chunkSize / BYTES_IN_CELL; + } + + /* The given Cell array on given Chunk array must be ordered. */ + public CellChunkMap(Comparator comparator, MemStoreLAB memStoreLAB, + Chunk[] chunks, int min, int max, int chunkSize, boolean d) { + super(comparator, min, max, d); + this.memStoreLAB = memStoreLAB; + this.chunks = chunks; + this.numOfCellsInsideChunk = chunkSize / BYTES_IN_CELL; + } + + @Override + protected Cell getCell(int i) { + // TODO : See if we can create one chunk for index and reuse that? + int chunkIndex = (i / numOfCellsInsideChunk); + ByteBuffer block = chunks[chunkIndex].getData(); + i = i - chunkIndex * numOfCellsInsideChunk; + + // find inside chunk + int offsetInBytes = i * BYTES_IN_CELL; + + // find inside chunk + int chunkId = ByteBufferUtils.toInt(block, offsetInBytes); + int offsetOfCell = ByteBufferUtils.toInt(block, offsetInBytes + Bytes.SIZEOF_INT); + int lengthOfCell = ByteBufferUtils.toInt(block, offsetInBytes + 2 * Bytes.SIZEOF_INT); + ByteBuffer chunk = memStoreLAB.getChunkData(chunkId); + + Cell result = null; + if(chunk != null) { + if(chunk.hasArray()) { + // Now parsing of seqId may be heavier?? + result = new OnheapMemstoreChunkCell(chunk.array(), offsetOfCell + chunk.arrayOffset(), lengthOfCell); + } else { + // Now parsing of seqId may be heavier?? + result = new OffheapMemstoreChunkCell(chunk, offsetOfCell, lengthOfCell); + } + return result; + } + return null; + } + + @Override + protected CellFlatMap createSubCellFlatMap(int min, int max, boolean descending) { + return new CellChunkMap(this.comparator(), this.memStoreLAB, this.chunks, min, max, + this.numOfCellsInsideChunk * BYTES_IN_CELL, descending); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java index 94f3575..d7f304e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java @@ -47,17 +47,25 @@ public class Chunk { /** Size of chunk in bytes */ private final int size; private final boolean offheap; + + // An unique id associated with the chunk + private int id = -1; /** * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. * * @param size in bytes */ - Chunk(int size, boolean offheap) { + Chunk(int size, int id, boolean offheap) { this.size = size; + this.id = id; this.offheap = offheap; } + int getChunkId() { + return this.id; + } + /** * Actually claim the memory for this chunk. This should only be called from the thread that * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block @@ -129,7 +137,7 @@ public class Chunk { /** * @return This chunk's backing data. */ - ByteBuffer getData() { + public ByteBuffer getData() { return this.data; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f64d4a0..e34820d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1478,8 +1478,10 @@ public class HRegionServer extends HasThread implements float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); + // init MemstoreStoreChunkCreator also. From then we could use MemstoreLABChunkCreator#getInstance() + MemstoreLABChunkCreator chunkCreator = MemstoreLABChunkCreator.getChunkCreator(chunkSize, offheap); MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage, - initialCountPercentage, chunkSize, offheap); + initialCountPercentage, chunkSize, offheap, chunkCreator); if (pool != null && this.hMemManager != null) { // Register with Heap Memory manager this.hMemManager.registerTuneObserver(pool); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 0d415f2..551f4cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -77,6 +78,8 @@ public class HeapMemStoreLAB extends MemStoreLAB { private AtomicBoolean reclaimed = new AtomicBoolean(false); // Current count of open scanners which reading data from this MemStoreLAB private final AtomicInteger openScannerCount = new AtomicInteger(); + // creates the chunk and associates the chunk with an unique id + private MemstoreLABChunkCreator chunkCreator; // Used in testing public HeapMemStoreLAB() { @@ -87,6 +90,7 @@ public class HeapMemStoreLAB extends MemStoreLAB { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); this.chunkPool = MemStoreChunkPool.getPool(); + this.chunkCreator = MemstoreLABChunkCreator.getChunkCreator(chunkSize, false); // currently chunkQueue is only used for chunkPool if (this.chunkPool != null) { // set queue length to chunk pool max count to avoid keeping reference of @@ -124,9 +128,17 @@ public class HeapMemStoreLAB extends MemStoreLAB { // try to retire this chunk tryRetireChunk(c); } - return KeyValueUtil.copyCellTo(cell, c.getData(), allocOffset); + return cloneCellTo(cell, c, allocOffset); } + + protected Cell cloneCellTo(Cell cell, Chunk c, int offset) { + if (c.getChunkId() != -1) { + return KeyValueUtil.cloneToMemstoreChunkCell(cell, c.getData(), offset, c.getChunkId()); + } else { + return KeyValueUtil.cloneTo(cell, c.getData(), offset); + } + } /** * Close this instance since it won't be used any more, try to put the chunks * back to pool @@ -202,7 +214,7 @@ public class HeapMemStoreLAB extends MemStoreLAB { // This is chunk from pool pooledChunk = true; } else { - c = new Chunk(chunkSize, false);// When chunk is not from pool, always make it as on heap. + c = this.chunkCreator.createChunk(false);// When chunk is not from pool, always make it as on heap. } if (curChunk.compareAndSet(null, c)) { // we won race - now we need to actually do the expensive @@ -234,4 +246,18 @@ public class HeapMemStoreLAB extends MemStoreLAB { BlockingQueue getChunkQueue() { return this.pooledChunkQueue; } + + @Override + public ByteBuffer getChunkData(int id) { + Chunk chunk; + if (chunkPool != null) { + chunk = chunkPool.getChunk(id); + } else { + chunk = chunkCreator.getChunk(id); + } + if (chunk != null) { + return chunk.getData(); + } + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index eb38a55..2405900 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -72,16 +72,19 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { private final AtomicLong chunkCount = new AtomicLong(); private final AtomicLong reusedChunkCount = new AtomicLong(); private final boolean offheap; + + private MemstoreLABChunkCreator chunkCreator; MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage, - boolean offheap) { + boolean offheap, MemstoreLABChunkCreator chunkCreator) { this.maxCount = maxCount; this.chunkSize = chunkSize; this.poolSizePercentage = poolSizePercentage; this.offheap = offheap; + this.chunkCreator = chunkCreator; this.reclaimedChunks = new LinkedBlockingQueue(); for (int i = 0; i < initialCount; i++) { - PooledChunk chunk = new PooledChunk(chunkSize, this.offheap); + PooledChunk chunk = (PooledChunk)this.chunkCreator.createChunk(true); chunk.init(); reclaimedChunks.add(chunk); } @@ -113,7 +116,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { while (true) { long created = this.chunkCount.get(); if (created < this.maxCount) { - chunk = new PooledChunk(this.chunkSize, this.offheap); + chunk = (PooledChunk)this.chunkCreator.createChunk(true); if (this.chunkCount.compareAndSet(created, created + 1)) { break; } @@ -154,6 +157,9 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { return this.reclaimedChunks.size(); } + Chunk getChunk(int id) { + return this.chunkCreator.getChunk(id); + } /* * Only used in testing */ @@ -186,10 +192,11 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { } /** + * @param chunkCreator * @return the global MemStoreChunkPool instance */ static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage, int chunkSize, boolean offheap) { + float initialCountPercentage, int chunkSize, boolean offheap, MemstoreLABChunkCreator chunkCreator) { if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; if (chunkPoolDisabled) return null; @@ -209,12 +216,14 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { int initialCount = (int) (initialCountPercentage * maxCount); LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) + ", max count " + maxCount + ", initial count " + initialCount); + GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage, - offheap); + offheap, chunkCreator); return GLOBAL_INSTANCE; } /** + * @param chunkCreator * @return The singleton instance of this pool. */ static MemStoreChunkPool getPool() { @@ -231,8 +240,8 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { } public static class PooledChunk extends Chunk { - PooledChunk(int size, boolean offheap) { - super(size, offheap); + PooledChunk(int size, int id, boolean offheap) { + super(size, id, offheap); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index 72f199a..30cc5a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.nio.ByteBuffer; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -82,7 +84,14 @@ public abstract class MemStoreLAB { * Called when closing a scanner on the data of this MemStoreLAB */ public abstract void decScannerCount(); - + + /** + * Get the data stored in Bytebuffer associated with the chunk + * @param id + * @return the data + */ + public abstract ByteBuffer getChunkData(int id); + public static MemStoreLAB newInstance(Configuration conf) { MemStoreLAB memStoreLAB = null; if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreLABChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreLABChunkCreator.java new file mode 100644 index 0000000..f7113ee --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreLABChunkCreator.java @@ -0,0 +1,77 @@ +package org.apache.hadoop.hbase.regionserver; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication; + +/** + * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated + * with every chunk + */ +@InterfaceAudience.Private +public class MemstoreLABChunkCreator { + // monotonically increasing chunkid + private AtomicInteger chunkID = new AtomicInteger(1); + // maps the chunk against the monotonically increasing chunk id + private Map chunkIdMap = + new ConcurrentHashMap(); + private int chunkSize; + private boolean offheap; + private static MemstoreLABChunkCreator INSTANCE; + private MemstoreLABChunkCreator(int chunkSize, boolean offheap) { + this.chunkSize = chunkSize; + this.offheap = offheap; + } + + static MemstoreLABChunkCreator getChunkCreator(int chunkSize, boolean offheap) { + if (INSTANCE != null) return INSTANCE; + synchronized (MemstoreLABChunkCreator.class) { + INSTANCE = new MemstoreLABChunkCreator(chunkSize, offheap); + return INSTANCE; + } + } + + Chunk createChunk(boolean createPooledChunk) { + int id = chunkID.getAndIncrement(); + Chunk memStoreLABChunk; + if (createPooledChunk) { + memStoreLABChunk = new MemStoreChunkPool.PooledChunk(chunkSize, id, this.offheap); + } else { + memStoreLABChunk = new Chunk(chunkSize, id, this.offheap); + } + chunkIdMap.put(id, memStoreLABChunk); + return memStoreLABChunk; + } + + Chunk getChunk(int id) { + return this.chunkIdMap.get(id); + } + + void removeChunk(int id) { + this.chunkIdMap.remove(id); + } + + void clear() { + this.chunkIdMap.clear(); + } +}