.../java/org/apache/hadoop/hbase/ExtendedCell.java | 12 +- .../org/apache/hadoop/hbase/MemstoreChunkCell.java | 39 +++ .../hadoop/hbase/OffheapMemstoreChunkCell.java | 282 ++++++++++++++++++ .../hadoop/hbase/OnheapMemstoreChunkCell.java | 324 +++++++++++++++++++++ .../apache/hadoop/hbase/util/ByteBufferUtils.java | 8 + .../hadoop/hbase/regionserver/CellChunkMap.java | 109 +++++++ .../apache/hadoop/hbase/regionserver/Chunk.java | 18 +- .../hadoop/hbase/regionserver/HeapMemStoreLAB.java | 2 + .../hadoop/hbase/regionserver/MemStoreLAB.java | 9 + .../regionserver/MemstoreLABChunkCreator.java | 71 +++++ 10 files changed, 868 insertions(+), 6 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java index 420a5f9..c7c2c45 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; @@ -62,8 +63,15 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam /** * Write the given Cell into the given buf's offset. - * @param buf The buffer where to write the Cell. - * @param offset The offset within buffer, to write the Cell. + * @param buf The byte[] where to write the Cell. + * @param offset The offset within byte[], to write the Cell. */ void write(byte[] buf, int offset); + + /** + * Wwrite the given Cell into the given ByteBuffer's offset + * @param buf the bytebuffer where to write the clel + * @param offset The offset within byte[], to write the Cell. + */ + void write(ByteBuffer buf, int offset); } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/MemstoreChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/MemstoreChunkCell.java new file mode 100644 index 0000000..6d48ce9 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/MemstoreChunkCell.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +/** + * Indicates that the cell that will be implementing this was created out of the + * MemstoreChunk and hence it will have the ChunkId and the offset at which the + * cell was written + */ +@InterfaceAudience.Private +public interface MemstoreChunkCell { + static final int FIXED_HEAP_SIZE_OVERHEAD = + ClassSize.OBJECT + ClassSize.REFERENCE + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT; + static final int EXTRA_BYTES_FOR_SEQ_CHUNK_IDS = Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; + + public int getChunkId(); + + public int getOffset(); + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapMemstoreChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapMemstoreChunkCell.java new file mode 100644 index 0000000..5aeac08 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapMemstoreChunkCell.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +@InterfaceAudience.Private +public class OffheapMemstoreChunkCell extends ByteBufferedCell + implements HeapSize, ExtendedCell, MemstoreChunkCell { + + private ByteBuffer buf; + private int offset = -1; + private int length = -1; + // May be we can remove this? + private short rowLen = -1; + + public OffheapMemstoreChunkCell(ByteBuffer buf, int offset, int length) { + this.buf = buf; + this.offset = offset; + // This len is inclusive of the chunkid + this.length = length; + rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET); + } + + @Override + public byte[] getRowArray() { + return CellUtil.cloneRow(this); + } + + @Override + public int getRowOffset() { + return 0; + } + + @Override + public short getRowLength() { + return rowLen; + } + + @Override + public byte[] getFamilyArray() { + return CellUtil.cloneFamily(this); + } + + @Override + public int getFamilyOffset() { + return 0; + } + + @Override + public byte getFamilyLength() { + return getFamilyLength(getFamilyLengthPosition()); + } + + private int getFamilyLengthPosition() { + return this.offset + KeyValue.ROW_KEY_OFFSET + getRowLength(); + } + + private byte getFamilyLength(int famLenPos) { + return ByteBufferUtils.toByte(this.buf, famLenPos); + } + + @Override + public byte[] getQualifierArray() { + return CellUtil.cloneQualifier(this); + } + + @Override + public int getQualifierOffset() { + return 0; + } + + @Override + public int getQualifierLength() { + return getQualifierLength(getRowLength(), getFamilyLength()); + } + + private int getQualifierLength(int rlength, int flength) { + return getKeyLength() - (int) KeyValue.getKeyDataStructureSize(rlength, flength, 0); + } + + @Override + public long getTimestamp() { + int offset = getTimestampOffset(getKeyLength()); + return ByteBufferUtils.toLong(this.buf, offset); + } + + private int getTimestampOffset(int keyLen) { + return this.offset + KeyValue.ROW_OFFSET + keyLen - KeyValue.TIMESTAMP_TYPE_SIZE; + } + + @Override + public byte getTypeByte() { + return ByteBufferUtils.toByte(this.buf, this.offset + getKeyLength() - 1 + KeyValue.ROW_OFFSET); + } + + @Override + public long getSequenceId() { + return ByteBufferUtils.toLong(this.buf, + ((this.offset + this.length) - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS)); + } + + @Override + public byte[] getValueArray() { + return CellUtil.cloneValue(this); + } + + @Override + public int getValueOffset() { + return 0; + } + + @Override + public int getValueLength() { + return ByteBufferUtils.toInt(this.buf, this.offset + Bytes.SIZEOF_INT); + } + + @Override + public byte[] getTagsArray() { + return CellUtil.cloneTags(this); + } + + @Override + public int getTagsOffset() { + return 0; + } + + @Override + public int getTagsLength() { + int tagsLen = this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS + - (getKeyLength() + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + if (tagsLen > 0) { + // There are some Tag bytes in the byte[]. So reduce 2 bytes which is + // added to denote the tags + // length + tagsLen -= KeyValue.TAGS_LENGTH_SIZE; + } + return tagsLen; + } + + protected int getKeyLength() { + return ByteBufferUtils.toInt(this.buf, this.offset); + } + + @Override + public ByteBuffer getRowByteBuffer() { + return this.buf; + } + + @Override + public int getRowPosition() { + return this.offset + KeyValue.ROW_KEY_OFFSET; + } + + @Override + public ByteBuffer getFamilyByteBuffer() { + return this.buf; + } + + @Override + public int getFamilyPosition() { + return getFamilyLengthPosition() + Bytes.SIZEOF_BYTE; + } + + @Override + public ByteBuffer getQualifierByteBuffer() { + return this.buf; + } + + @Override + public int getQualifierPosition() { + return getFamilyPosition() + getFamilyLength(); + } + + @Override + public ByteBuffer getValueByteBuffer() { + return this.buf; + } + + @Override + public int getValuePosition() { + return this.offset + KeyValue.ROW_OFFSET + getKeyLength(); + } + + @Override + public ByteBuffer getTagsByteBuffer() { + return this.buf; + } + + @Override + public int getTagsPosition() { + int tagsLen = getTagsLength(); + if (tagsLen == 0) { + return this.offset + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + return this.offset + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS - tagsLen; + } + + @Override + public long heapSize() { + // Will change once HBASE-16747 comes in + return ClassSize.align(FIXED_HEAP_SIZE_OVERHEAD + ClassSize.align(length)); + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any + // changes doing here, pls check KeyValueUtil#oswrite also and do necessary changes. + int length = this.length; + if (!withTags) { + length = getKeyLength() + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; + } else { + length -= EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + ByteBufferUtils.putInt(out, length); + ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length); + return length + Bytes.SIZEOF_INT; + } + + @Override + public String toString() { + return CellUtil.toString(this, true); + } + + @Override + public void setTimestamp(byte[] ts, int tsOffset) throws IOException { + ByteBufferUtils.copyFromArrayToBuffer(buf, getTimestampOffset(getKeyLength()), ts, tsOffset, + Bytes.SIZEOF_LONG); + } + + @Override + public void write(ByteBuffer buf, int pos) { + ByteBufferUtils.copyFromBufferToBuffer(this.buf, buf, this.offset, pos, + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + } + + public int getChunkId() { + return ByteBufferUtils.toInt(this.buf, ((this.offset + this.length) - Bytes.SIZEOF_INT)); + } + + public int getOffset() { + return this.offset; + } + + @Override + public void setSequenceId(long seqId) throws IOException { + // should we do this + } + + @Override + public int getSerializedSize(boolean withTags) { + return this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + + @Override + public void write(byte[] buf, int offset) { + ByteBufferUtils.copyFromBufferToArray(buf, this.buf, this.offset, offset, + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + } + + @Override + public void setTimestamp(long ts) throws IOException { + ByteBufferUtils.putLong(buf, getTimestampOffset(getKeyLength()), ts); + + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OnheapMemstoreChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OnheapMemstoreChunkCell.java new file mode 100644 index 0000000..1af5050 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OnheapMemstoreChunkCell.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; + +@InterfaceAudience.Private +public class OnheapMemstoreChunkCell implements Cell, HeapSize, ExtendedCell, MemstoreChunkCell { + + private byte[] bytes; + private int offset = -1; + private int length = -1; + // May be we can remove this? + private short rowLen = -1; + + public OnheapMemstoreChunkCell(byte[] bytes, int offset, int length) { + this.bytes = bytes; + this.offset = offset; + // This len is inclusive of the chunkid + this.length = length; + rowLen = Bytes.toShort(this.bytes, this.offset + KeyValue.ROW_OFFSET); + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getRowArray() { + return bytes; + } + + /** + * @return Row offset + */ + @Override + public int getRowOffset() { + return this.offset + KeyValue.ROW_KEY_OFFSET; + } + + /** + * @return Row length + */ + @Override + public short getRowLength() { + return rowLen; + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getFamilyArray() { + return bytes; + } + + /** + * @return Family offset + */ + @Override + public int getFamilyOffset() { + return getFamilyOffset(getRowLength()); + } + + /** + * @return Family offset + */ + private int getFamilyOffset(int rlength) { + return this.offset + KeyValue.ROW_KEY_OFFSET + rlength + Bytes.SIZEOF_BYTE; + } + + /** + * @return Family length + */ + @Override + public byte getFamilyLength() { + return getFamilyLength(getFamilyOffset()); + } + + /** + * @return Family length + */ + public byte getFamilyLength(int foffset) { + return this.bytes[foffset - 1]; + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getQualifierArray() { + return bytes; + } + + /** + * @return Qualifier offset + */ + @Override + public int getQualifierOffset() { + return getQualifierOffset(getFamilyOffset()); + } + + /** + * @return Qualifier offset + */ + private int getQualifierOffset(int foffset) { + return foffset + getFamilyLength(foffset); + } + + /** + * @return Qualifier length + */ + @Override + public int getQualifierLength() { + return getQualifierLength(getRowLength(), getFamilyLength()); + } + + /** + * @return Qualifier length + */ + private int getQualifierLength(int rlength, int flength) { + return getKeyLength() - (int) KeyValue.getKeyDataStructureSize(rlength, flength, 0); + } + + /** + * @return Timestamp offset + */ + public int getTimestampOffset() { + return getTimestampOffset(getKeyLength()); + } + + /** + * @return Length of key portion. + */ + public int getKeyLength() { + return Bytes.toInt(this.bytes, this.offset); + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getValueArray() { + return bytes; + } + + /** + * @return the value offset + */ + @Override + public int getValueOffset() { + int voffset = getKeyOffset() + getKeyLength(); + return voffset; + } + + public int getKeyOffset() { + return this.offset + KeyValue.ROW_OFFSET; + } + + /** + * @return Value length + */ + @Override + public int getValueLength() { + int vlength = Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT); + return vlength; + } + + /** + * @param keylength Pass if you have it to save on a int creation. + * @return Timestamp offset + */ + private int getTimestampOffset(final int keylength) { + return getKeyOffset() + keylength - KeyValue.TIMESTAMP_TYPE_SIZE; + } + + @Override + public void setTimestamp(long ts) { + Bytes.putBytes(this.bytes, this.getTimestampOffset(), Bytes.toBytes(ts), 0, Bytes.SIZEOF_LONG); + } + + @Override + public void setTimestamp(byte[] ts, int tsOffset) { + Bytes.putBytes(this.bytes, this.getTimestampOffset(), ts, tsOffset, Bytes.SIZEOF_LONG); + } + + /** + * This returns the offset where the tag actually starts. + */ + @Override + public int getTagsOffset() { + int tagsLen = getTagsLength(); + if (tagsLen == 0) { + return this.offset + this.length - -EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + return this.offset + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS - tagsLen; + } + + /** + * This returns the total length of the tag bytes + */ + @Override + public int getTagsLength() { + int tagsLen = this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS + - (getKeyLength() + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + if (tagsLen > 0) { + // There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags + // length + tagsLen -= KeyValue.TAGS_LENGTH_SIZE; + } + return tagsLen; + } + + /** + * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array) + */ + @Override + public byte[] getTagsArray() { + return bytes; + } + + @Override + public void write(ByteBuffer buf, int pos) { + ByteBufferUtils.copyFromArrayToBuffer(buf, pos, this.bytes, this.offset, + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls + // check KeyValueUtil#oswrite also and do necessary changes. + int length = this.length; + if (!withTags) { + length = this.getKeyLength() + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; + } else { + length -= EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + ByteBufferUtils.putInt(out, length); + out.write(this.bytes, this.offset, length); + return length + Bytes.SIZEOF_INT; + } + + public int getChunkId() { + return Bytes.toInt(this.bytes, ((this.offset + this.length) - Bytes.SIZEOF_INT)); + } + + public int getOffset() { + return this.offset; + } + + @Override + public long heapSize() { + // Will change once HBASE-16747 comes in + return ClassSize.align( + FIXED_HEAP_SIZE_OVERHEAD + ClassSize.align(ClassSize.ARRAY) + ClassSize.align(length)); + } + + @Override + public long getTimestamp() { + return getTimestamp(getKeyLength()); + } + + /** + * @param keylength Pass if you have it to save on a int creation. + * @return Timestamp + */ + long getTimestamp(final int keylength) { + int tsOffset = getTimestampOffset(keylength); + return Bytes.toLong(this.bytes, tsOffset); + } + + @Override + public byte getTypeByte() { + return this.bytes[this.offset + getKeyLength() - 1 + KeyValue.ROW_OFFSET]; + } + + @Override + public long getSequenceId() { + return Bytes.toLong(this.bytes, ((this.offset + this.length) - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS)); + } + + @Override + public String toString() { + return CellUtil.toString(this, true); + } + + @Override + public void setSequenceId(long seqId) throws IOException { + // should we do it? + } + + @Override + public int getSerializedSize(boolean withTags) { + return this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS; + } + + @Override + public void write(byte[] buf, int offset) { + System.arraycopy(this.bytes, this.offset, buf, offset, + this.length - EXTRA_BYTES_FOR_SEQ_CHUNK_IDS); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 0653d1a..47801ba 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -933,6 +933,14 @@ public final class ByteBufferUtils { buffer.putLong(val); } } + + public static int putLong(ByteBuffer buffer, int pos, long val) { + if (UNSAFE_UNALIGNED) { + return UnsafeAccess.putLong(buffer, pos, val); + } + buffer.putLong(pos, val); + return pos + Bytes.SIZEOF_LONG; + } /** * Copies the bytes from given array's offset to length part into the given buffer. Puts the bytes * to buffer's current position. This also advances the position in the 'out' buffer by 'length' diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java new file mode 100644 index 0000000..0b9c2ed --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Cellersion 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY CellIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.nio.ByteBuffer; +import java.util.Comparator; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.OffheapMemstoreChunkCell; +import org.apache.hadoop.hbase.OnheapMemstoreChunkCell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * CellChunkMap is a byte array holding all that is needed to access a Cell, which + * is actually saved on another deeper byte array. + * Per Cell we have a reference to this deeper byte array B, offset in bytes in B (integer), + * and length in bytes in B (integer). In order to save reference to byte array we use the Chunk's + * indexes given by MSLAB (also integer). + * + * The CellChunkMap memory layout relevant to a deeper byte array B: + * + * <----------------- first Cell ---------------------> <-------------- second Cell --- ... + * ------------------------------------------------------------------------------------- ... + * | integer = x bytes | integer = x bytes | integer = x bytes | integer = x bytes | + * | reference to B | offset in B where | length of Cell's | reference to may be| ... + * | holding Cell data | Cell's data starts| data in B | another byte array | + * ------------------------------------------------------------------------------------- ... + */ +@InterfaceAudience.Private +public class CellChunkMap extends CellFlatMap { + private final Chunk[] chunks; + private final int numOfCellsInsideChunk; + private final MemStoreLAB memStoreLAB; + public static final int BYTES_IN_CELL = 3*(Integer.SIZE / Byte.SIZE); // each Cell requires 3 integers + + /* C-tor for increasing map starting from index zero */ + /* The given Cell array on given Chunk array must be ordered. */ + public CellChunkMap(Comparator comparator, + MemStoreLAB memStoreLAB, Chunk[] chunks, int max, int chunkSize) { + super(comparator, 0, max, false); + this.memStoreLAB = memStoreLAB; + this.chunks = chunks; + this.numOfCellsInsideChunk = chunkSize / BYTES_IN_CELL; + } + + /* The given Cell array on given Chunk array must be ordered. */ + public CellChunkMap(Comparator comparator, MemStoreLAB memStoreLAB, + Chunk[] chunks, int min, int max, int chunkSize, boolean d) { + super(comparator, min, max, d); + this.memStoreLAB = memStoreLAB; + this.chunks = chunks; + this.numOfCellsInsideChunk = chunkSize / BYTES_IN_CELL; + } + + @Override + protected Cell getCell(int i) { + // TODO : See if we can create one chunk for index and reuse that? + int chunkIndex = (i / numOfCellsInsideChunk); + ByteBuffer block = chunks[chunkIndex].getBuffer(); + i = i - chunkIndex * numOfCellsInsideChunk; + + // find inside chunk + int offsetInBytes = i * BYTES_IN_CELL; + + // find inside chunk + int chunkId = ByteBufferUtils.toInt(block, offsetInBytes); + int offsetOfCell = ByteBufferUtils.toInt(block, offsetInBytes + Bytes.SIZEOF_INT); + int lengthOfCell = ByteBufferUtils.toInt(block, offsetInBytes + 2 * Bytes.SIZEOF_INT); + ByteBuffer chunk = memStoreLAB.getChunkData(chunkId); + + Cell result = null; + if(chunk != null) { + if(chunk.hasArray()) { + // Now parsing of seqId may be heavier?? + result = new OnheapMemstoreChunkCell(chunk.array(), offsetOfCell + chunk.arrayOffset(), lengthOfCell); + } else { + // Now parsing of seqId may be heavier?? + result = new OffheapMemstoreChunkCell(chunk, offsetOfCell, lengthOfCell); + } + return result; + } + return null; + } + + @Override + protected CellFlatMap createSubCellFlatMap(int min, int max, boolean descending) { + return new CellChunkMap(this.comparator(), this.memStoreLAB, this.chunks, min, max, + this.numOfCellsInsideChunk * BYTES_IN_CELL, descending); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java index d968ed9..d7a9afd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java @@ -46,13 +46,23 @@ public class Chunk { /** Size of chunk in bytes */ private final int size; + // The unique id associated with the chunk + private int id = -1; + /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. - * + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. * @param size in bytes + * @param id the chunk id + * @param offheap */ - Chunk(int size) { + public Chunk(int size, int id) { this.size = size; + this.id = id; + } + + int getChunkId() { + return this.id; } /** @@ -126,7 +136,7 @@ public class Chunk { /** * @return This chunk's backing data. */ - byte[] getData() { + public byte[] getData() { return this.data; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 99b2bb6..92d25a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -85,6 +85,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { private AtomicBoolean reclaimed = new AtomicBoolean(false); // Current count of open scanners which reading data from this MemStoreLAB private final AtomicInteger openScannerCount = new AtomicInteger(); + // creates the chunk and associates the chunk with an unique id + private MemstoreLABChunkCreator chunkCreator; // Used in testing public HeapMemStoreLAB() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index 0dcafe6..7dc2d22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.nio.ByteBuffer; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -64,4 +66,11 @@ public interface MemStoreLAB { * Called when closing a scanner on the data of this MemStoreLAB */ void decScannerCount(); + + /** + * Get the data stored in Bytebuffer associated with the chunk + * @param id + * @return the data + */ + ByteBuffer getChunkData(int id); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreLABChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreLABChunkCreator.java new file mode 100644 index 0000000..b16821e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreLABChunkCreator.java @@ -0,0 +1,71 @@ +package org.apache.hadoop.hbase.regionserver; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated + * with every chunk + */ +@InterfaceAudience.Private +public class MemstoreLABChunkCreator { + // monotonically increasing chunkid + private AtomicInteger chunkID = new AtomicInteger(1); + // maps the chunk against the monotonically increasing chunk id + private Map chunkIdMap = + new ConcurrentHashMap(); + private int chunkSize; + private boolean offheap; + private static MemstoreLABChunkCreator INSTANCE; + private MemstoreLABChunkCreator(int chunkSize, boolean offheap) { + this.chunkSize = chunkSize; + this.offheap = offheap; + } + + static MemstoreLABChunkCreator getChunkCreator(int chunkSize, boolean offheap) { + if (INSTANCE != null) return INSTANCE; + synchronized (MemstoreLABChunkCreator.class) { + INSTANCE = new MemstoreLABChunkCreator(chunkSize, offheap); + return INSTANCE; + } + } + + Chunk createChunk() { + int id = chunkID.getAndIncrement(); + Chunk memStoreLABChunk = new Chunk(chunkSize, id, this.offheap); + chunkIdMap.put(id, memStoreLABChunk); + return memStoreLABChunk; + } + + Chunk getChunk(int id) { + return this.chunkIdMap.get(id); + } + + void removeChunk(int id) { + this.chunkIdMap.remove(id); + } + + void clear() { + this.chunkIdMap.clear(); + } +}