.../org/apache/hadoop/hbase/OffheapKeyValue.java | 3 +- .../org/apache/hadoop/hbase/nio/MultiByteBuff.java | 20 +++++----- .../apache/hadoop/hbase/TestOffheapKeyValue.java | 8 ++-- .../apache/hadoop/hbase/SizeCachedKeyValue.java | 3 +- .../hadoop/hbase/SizeCachedNoTagsKeyValue.java | 4 +- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 45 ++++++++++------------ 6 files changed, 41 insertions(+), 42 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java index 4a5dc70..8f51d4d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java @@ -48,7 +48,7 @@ public class OffheapKeyValue extends ByteBufferedCell implements HeapSize, Clone + ClassSize.align(ClassSize.BYTE_BUFFER) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BOOLEAN + Bytes.SIZEOF_LONG; - public OffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags) { + public OffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags, long seqId) { assert buf.isDirect(); this.buf = buf; this.offset = offset; @@ -56,6 +56,7 @@ public class OffheapKeyValue extends ByteBufferedCell implements HeapSize, Clone rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET); keyLen = ByteBufferUtils.toInt(this.buf, this.offset); this.hasTags = hasTags; + this.seqId = seqId; } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java index ad339e3..a136bad 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -224,9 +224,9 @@ public class MultiByteBuff extends ByteBuff { ByteBuffer nextItem = items[itemIndex + 1]; // Get available one byte from this item and remaining one from next short n = 0; - n ^= item.get(offsetInItem) & 0xFF; + n ^= ByteBufferUtils.toByte(item, offsetInItem) & 0xFF; n <<= 8; - n ^= nextItem.get(0) & 0xFF; + n ^= ByteBufferUtils.toByte(nextItem, 0) & 0xFF; return n; } @@ -259,11 +259,11 @@ public class MultiByteBuff extends ByteBuff { int l = 0; for (int i = offsetInItem; i < item.capacity(); i++) { l <<= 8; - l ^= item.get(i) & 0xFF; + l ^= ByteBufferUtils.toByte(item, i) & 0xFF; } for (int i = 0; i < Bytes.SIZEOF_INT - remainingLen; i++) { l <<= 8; - l ^= nextItem.get(i) & 0xFF; + l ^= ByteBufferUtils.toByte(nextItem, i) & 0xFF; } return l; } @@ -284,11 +284,11 @@ public class MultiByteBuff extends ByteBuff { short l = 0; for (int i = offsetInItem; i < item.capacity(); i++) { l <<= 8; - l ^= item.get(i) & 0xFF; + l ^= ByteBufferUtils.toByte(item, i) & 0xFF; } for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) { l <<= 8; - l ^= nextItem.get(i) & 0xFF; + l ^= ByteBufferUtils.toByte(nextItem, i) & 0xFF; } return l; } @@ -309,11 +309,11 @@ public class MultiByteBuff extends ByteBuff { long l = 0; for (int i = offsetInItem; i < item.capacity(); i++) { l <<= 8; - l ^= item.get(i) & 0xFF; + l ^= ByteBufferUtils.toByte(item, i) & 0xFF; } for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) { l <<= 8; - l ^= nextItem.get(i) & 0xFF; + l ^= ByteBufferUtils.toByte(nextItem, i) & 0xFF; } return l; } @@ -347,11 +347,11 @@ public class MultiByteBuff extends ByteBuff { long l = 0; for (int i = offsetInItem; i < item.capacity(); i++) { l <<= 8; - l ^= item.get(i) & 0xFF; + l ^= ByteBufferUtils.toByte(item, i) & 0xFF; } for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) { l <<= 8; - l ^= nextItem.get(i) & 0xFF; + l ^= ByteBufferUtils.toByte(nextItem, i) & 0xFF; } return l; } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestOffheapKeyValue.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestOffheapKeyValue.java index 0849106..e416003 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestOffheapKeyValue.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestOffheapKeyValue.java @@ -56,7 +56,7 @@ public class TestOffheapKeyValue { KeyValue kvCell = new KeyValue(row1, fam1, qual1, 0l, Type.Put, row1); ByteBuffer buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length); ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length); - ByteBufferedCell offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), false); + ByteBufferedCell offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), false, 0l); assertEquals( ROW1, ByteBufferUtils.toStringBinary(offheapKV.getRowByteBuffer(), @@ -99,7 +99,7 @@ public class TestOffheapKeyValue { kvCell = new KeyValue(row1, fam2, qual2, 0l, Type.Put, row1); buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length); ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length); - offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), false); + offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), false, 0l); assertEquals( FAM2, ByteBufferUtils.toStringBinary(offheapKV.getFamilyByteBuffer(), @@ -112,7 +112,7 @@ public class TestOffheapKeyValue { kvCell = new KeyValue(row1, fam1, nullQualifier, 0L, Type.Put, row1); buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length); ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length); - offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), false); + offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), false, 0l); assertEquals( ROW1, ByteBufferUtils.toStringBinary(offheapKV.getRowByteBuffer(), @@ -138,7 +138,7 @@ public class TestOffheapKeyValue { KeyValue kvCell = new KeyValue(row1, fam1, qual1, 0l, Type.Put, row1, tags); ByteBuffer buf = ByteBuffer.allocateDirect(kvCell.getBuffer().length); ByteBufferUtils.copyFromArrayToBuffer(buf, kvCell.getBuffer(), 0, kvCell.getBuffer().length); - ByteBufferedCell offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), true); + ByteBufferedCell offheapKV = new OffheapKeyValue(buf, 0, buf.capacity(), true, 0l); assertEquals( ROW1, ByteBufferUtils.toStringBinary(offheapKV.getRowByteBuffer(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java index 3ed2fc9..8c1d901 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java @@ -37,12 +37,13 @@ public class SizeCachedKeyValue extends KeyValue { private short rowLen; private int keyLen; - public SizeCachedKeyValue(byte[] bytes, int offset, int length) { + public SizeCachedKeyValue(byte[] bytes, int offset, int length, long seqId) { super(bytes, offset, length); // We will read all these cached values at least once. Initialize now itself so that we can // avoid uninitialized checks with every time call rowLen = super.getRowLength(); keyLen = super.getKeyLength(); + setSequenceId(seqId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java index f093dc7..d28d1a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SizeCachedNoTagsKeyValue.java @@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.util.Bytes; @InterfaceAudience.Private public class SizeCachedNoTagsKeyValue extends SizeCachedKeyValue { - public SizeCachedNoTagsKeyValue(byte[] bytes, int offset, int length) { - super(bytes, offset, length); + public SizeCachedNoTagsKeyValue(byte[] bytes, int offset, int length, long seqId) { + super(bytes, offset, length, seqId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 82f5366..7c14e40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -562,7 +562,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } protected int getNextCellStartPosition() { - int nextKvPos = blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen + int nextKvPos = KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen + currMemstoreTSLen; if (this.reader.getFileContext().isIncludesTags()) { nextKvPos += Bytes.SIZEOF_SHORT + currTagsLen; @@ -892,40 +892,37 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { Cell ret; int cellBufSize = getCellBufSize(); + long seqId = 0l; + if (this.reader.shouldIncludeMemstoreTS()) { + seqId = currMemstoreTS; + } if (blockBuffer.hasArray()) { // TODO : reduce the varieties of KV here. Check if based on a boolean // we can handle the 'no tags' case. if (currTagsLen > 0) { if (this.curBlock.getMemoryType() == MemoryType.SHARED) { ret = new ShareableMemoryKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position(), getCellBufSize()); + + blockBuffer.position(), getCellBufSize(), seqId); } else { ret = new SizeCachedKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position(), cellBufSize); + + blockBuffer.position(), cellBufSize, seqId); } } else { if (this.curBlock.getMemoryType() == MemoryType.SHARED) { ret = new ShareableMemoryNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position(), getCellBufSize()); + + blockBuffer.position(), getCellBufSize(), seqId); } else { ret = new SizeCachedNoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position(), cellBufSize); + + blockBuffer.position(), cellBufSize, seqId); } } } else { ByteBuffer buf = blockBuffer.asSubByteBuffer(cellBufSize); if (this.curBlock.getMemoryType() == MemoryType.SHARED) { ret = new ShareableMemoryOffheapKeyValue(buf, buf.position(), cellBufSize, - currTagsLen > 0); + currTagsLen > 0, seqId); } else { - ret = new OffheapKeyValue(buf, buf.position(), cellBufSize, currTagsLen > 0); - } - } - if (this.reader.shouldIncludeMemstoreTS()) { - try { - CellUtil.setSequenceId(ret, currMemstoreTS); - } catch (IOException e) { - // will not happen + ret = new OffheapKeyValue(buf, buf.position(), cellBufSize, currTagsLen > 0, seqId); } } return ret; @@ -948,42 +945,42 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { private static class ShareableMemoryKeyValue extends SizeCachedKeyValue implements ShareableMemory { - public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) { - super(bytes, offset, length); + public ShareableMemoryKeyValue(byte[] bytes, int offset, int length, long seqId) { + super(bytes, offset, length, seqId); } @Override public Cell cloneToCell() { byte[] copy = Bytes.copy(this.bytes, this.offset, this.length); - return new SizeCachedKeyValue(copy, 0, copy.length); + return new SizeCachedKeyValue(copy, 0, copy.length, getSequenceId()); } } private static class ShareableMemoryNoTagsKeyValue extends SizeCachedNoTagsKeyValue implements ShareableMemory { - public ShareableMemoryNoTagsKeyValue(byte[] bytes, int offset, int length) { - super(bytes, offset, length); + public ShareableMemoryNoTagsKeyValue(byte[] bytes, int offset, int length, long seqId) { + super(bytes, offset, length, seqId); } @Override public Cell cloneToCell() { byte[] copy = Bytes.copy(this.bytes, this.offset, this.length); - return new SizeCachedNoTagsKeyValue(copy, 0, copy.length); + return new SizeCachedNoTagsKeyValue(copy, 0, copy.length, getSequenceId()); } } private static class ShareableMemoryOffheapKeyValue extends OffheapKeyValue implements ShareableMemory { public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length, - boolean hasTags) { - super(buf, offset, length, hasTags); + boolean hasTags, long seqId) { + super(buf, offset, length, hasTags, seqId); } @Override public Cell cloneToCell() { byte[] copy = new byte[this.length]; ByteBufferUtils.copyFromBufferToArray(copy, this.buf, this.offset, 0, this.length); - return new SizeCachedKeyValue(copy, 0, copy.length); + return new SizeCachedKeyValue(copy, 0, copy.length, getSequenceId()); } } @@ -1015,7 +1012,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ private void positionThisBlockBuffer() { try { - blockBuffer.position(getNextCellStartPosition()); + blockBuffer.skip(getNextCellStartPosition()); } catch (IllegalArgumentException e) { LOG.error("Current pos = " + blockBuffer.position() + "; currKeyLen = " + currKeyLen + "; currValLen = "