diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 207f275..100fc56 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -100,6 +100,12 @@ public final class CellUtil { return output; } + public static byte[] cloneTags(Cell cell) { + byte[] output = new byte[cell.getTagsLength()]; + copyTagTo(cell, output, 0); + return output; + } + /** * Returns tag value in a new byte array. If server-side, use * {@link Tag#getBuffer()} with appropriate {@link Tag#getTagOffset()} and diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 704b90f..0fddd3c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase; import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collections; @@ -495,6 +496,8 @@ public final class HConstants { */ public static final byte [] EMPTY_BYTE_ARRAY = new byte [0]; + public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY); + /** * Used by scanners, etc when they want to start at the beginning of a region */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java new file mode 100644 index 0000000..477d16e --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java @@ -0,0 +1,210 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This is a key only Cell implementation which is identical to {@link KeyValue.KeyOnlyKeyValue} + * with respect to key serialization but have its data in off heap memory. + */ +@InterfaceAudience.Private +public class OffheapKeyOnlyKeyValue extends ByteBufferedCell { + + private ByteBuffer buf; + private int offset = 0; // offset into buffer where key starts at + private int length = 0; // length of this. + private short rowLen; + + public OffheapKeyOnlyKeyValue(ByteBuffer buf, int offset, int length) { + assert buf.isDirect(); + this.buf = buf; + this.offset = offset; + this.length = length; + this.rowLen = ByteBufferUtils.toShort(this.buf, this.offset); + } + + @Override + public byte[] getRowArray() { + return CellUtil.cloneRow(this); + } + + @Override + public int getRowOffset() { + return 0; + } + + @Override + public short getRowLength() { + return this.rowLen; + } + + @Override + public byte[] getFamilyArray() { + return CellUtil.cloneFamily(this); + } + + @Override + public int getFamilyOffset() { + return 0; + } + + @Override + public byte getFamilyLength() { + return getFamilyLength(getFamilyLengthPosition()); + } + + private byte getFamilyLength(int famLenPos) { + return ByteBufferUtils.toByte(this.buf, famLenPos); + } + + @Override + public byte[] getQualifierArray() { + return CellUtil.cloneQualifier(this); + } + + @Override + public int getQualifierOffset() { + return 0; + } + + @Override + public int getQualifierLength() { + return getQualifierLength(getRowLength(), getFamilyLength()); + } + + private int getQualifierLength(int rlength, int flength) { + return this.length - (int) KeyValue.getKeyDataStructureSize(rlength, flength, 0); + } + + @Override + public long getTimestamp() { + return ByteBufferUtils.toLong(this.buf, getTimestampOffset()); + } + + private int getTimestampOffset() { + return this.offset + this.length - KeyValue.TIMESTAMP_TYPE_SIZE; + } + + @Override + public byte getTypeByte() { + return ByteBufferUtils.toByte(this.buf, this.offset + this.length - 1); + } + + @Override + public long getSequenceId() { + return 0; + } + + @Override + public byte[] getValueArray() { + throw new IllegalArgumentException("This is a key only Cell"); + } + + @Override + public int getValueOffset() { + return 0; + } + + @Override + public int getValueLength() { + return 0; + } + + @Override + public byte[] getTagsArray() { + throw new IllegalArgumentException("This is a key only Cell"); + } + + @Override + public int getTagsOffset() { + return 0; + } + + @Override + public int getTagsLength() { + return 0; + } + + @Override + public ByteBuffer getRowByteBuffer() { + return this.buf; + } + + @Override + public int getRowPositionInByteBuffer() { + return this.offset + Bytes.SIZEOF_SHORT; + } + + @Override + public ByteBuffer getFamilyByteBuffer() { + return this.buf; + } + + @Override + public int getFamilyPositionInByteBuffer() { + return getFamilyLengthPosition() + Bytes.SIZEOF_BYTE; + } + + // The position in BB where the family length is added. + private int getFamilyLengthPosition() { + return this.offset + Bytes.SIZEOF_SHORT + getRowLength(); + } + + @Override + public ByteBuffer getQualifierByteBuffer() { + return this.buf; + } + + @Override + public int getQualifierPositionInByteBuffer() { + int famLenPos = getFamilyLengthPosition(); + return famLenPos + Bytes.SIZEOF_BYTE + getFamilyLength(famLenPos); + } + + @Override + public ByteBuffer getValueByteBuffer() { + throw new IllegalArgumentException("This is a key only Cell"); + } + + @Override + public int getValuePositionInByteBuffer() { + return 0; + } + + @Override + public ByteBuffer getTagsByteBuffer() { + throw new IllegalArgumentException("This is a key only Cell"); + } + + @Override + public int getTagsPositionInByteBuffer() { + return 0; + } + + @Override + public String toString() { + return CellUtil.toString(this, false); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index 1a18b2d..1b2ab5d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -127,6 +127,11 @@ public class ByteBufferOutputStream extends OutputStream { ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len); } + public void write(ByteBuffer b, int off, int len) throws IOException { + checkSizeAndGrow(len); + ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len); + } + /** * Writes an int to the underlying output stream as four * bytes, high byte first. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java index 26e7b50..05c4ad1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IOUtils; @@ -133,7 +134,7 @@ public class TagCompressionContext { * @return bytes count read from source to uncompress all tags. * @throws IOException */ - public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length) + public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length) throws IOException { int srcBeginPos = src.position(); int endOffset = offset + length; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 966c59b..a8a5888 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -22,10 +22,13 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.ByteBufferedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.OffheapKeyOnlyKeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.SettableSequenceId; @@ -38,9 +41,11 @@ import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; /** @@ -102,8 +107,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { - qualCommonPrefix); } - protected static class SeekerState implements Cell { - protected ByteBuffer currentBuffer; + protected static class SeekerState { + protected ByteBuff currentBuffer; protected TagCompressionContext tagCompressionContext; protected int valueOffset = -1; protected int keyLength; @@ -121,6 +126,15 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { protected long memstoreTS; protected int nextKvOffset; protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue(); + // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too + // many object creations. + private final Pair tmpPair; + private final boolean includeTags; + + public SeekerState(Pair tmpPair, boolean includeTags) { + this.tmpPair = tmpPair; + this.includeTags = includeTags; + } protected boolean isValid() { return valueOffset != -1; @@ -200,74 +214,190 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } } + public Cell toCell() { + // Buffer backing the value and tags part from the HFileBlock's buffer + // When tag compression in use, this will be only the value bytes area. + ByteBuffer valAndTagsBuffer; + int vOffset; + int valAndTagsLength = this.valueLength; + int tagsLenSerializationSize = 0; + if (this.includeTags && this.tagCompressionContext == null) { + // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags + // length + tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength); + valAndTagsLength += tagsLenSerializationSize + this.tagsLength; + } + this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair); + valAndTagsBuffer = this.tmpPair.getFirst(); + vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB + if (valAndTagsBuffer.hasArray()) { + return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); + } else { + return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize); + } + } + + private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset, + int tagsLenSerializationSize) { + byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY; + int tOffset = 0; + if (this.includeTags) { + if (this.tagCompressionContext == null) { + tagsArray = valAndTagsBuffer.array(); + tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength + + tagsLenSerializationSize; + } else { + tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength); + tOffset = 0; + } + } + return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength), + currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(), + currentKey.getQualifierOffset(), currentKey.getQualifierLength(), + currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(), + valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray, + tOffset, this.tagsLength); + } + + private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset, + int tagsLenSerializationSize) { + ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER; + int tOffset = 0; + if (this.includeTags) { + if (this.tagCompressionContext == null) { + tagsBuf = valAndTagsBuffer; + tOffset = vOffset + this.valueLength + tagsLenSerializationSize; + } else { + tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength)); + tOffset = 0; + } + } + return new OffheapDecodedCell(ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), + currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(), + currentKey.getQualifierOffset(), currentKey.getQualifierLength(), + currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset, + this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength); + } + } + + /** + * Copies only the key part of the keybuffer by doing a deep copy and passes the + * seeker state members for taking a clone. + * Note that the value byte[] part is still pointing to the currentBuffer and the + * represented by the valueOffset and valueLength + */ + // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId + // there. So this has to be an instance of SettableSequenceId. + protected static class OnheapDecodedCell implements Cell, HeapSize, SettableSequenceId, + Streamable { + private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT + + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) + + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY)); + private byte[] keyOnlyBuffer; + private short rowLength; + private int familyOffset; + private byte familyLength; + private int qualifierOffset; + private int qualifierLength; + private long timestamp; + private byte typeByte; + private byte[] valueBuffer; + private int valueOffset; + private int valueLength; + private byte[] tagsBuffer; + private int tagsOffset; + private int tagsLength; + private long seqId; + + protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset, + byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, + byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer, + int tagsOffset, int tagsLength) { + this.keyOnlyBuffer = keyBuffer; + this.rowLength = rowLength; + this.familyOffset = familyOffset; + this.familyLength = familyLength; + this.qualifierOffset = qualOffset; + this.qualifierLength = qualLength; + this.timestamp = timeStamp; + this.typeByte = typeByte; + this.valueBuffer = valueBuffer; + this.valueOffset = valueOffset; + this.valueLength = valueLen; + this.tagsBuffer = tagsBuffer; + this.tagsOffset = tagsOffset; + this.tagsLength = tagsLength; + setSequenceId(seqId); + } + @Override public byte[] getRowArray() { - return currentKey.getRowArray(); + return keyOnlyBuffer; } @Override - public int getRowOffset() { - return Bytes.SIZEOF_SHORT; + public byte[] getFamilyArray() { + return keyOnlyBuffer; } @Override - public short getRowLength() { - return currentKey.getRowLength(); + public byte[] getQualifierArray() { + return keyOnlyBuffer; } @Override - public byte[] getFamilyArray() { - return currentKey.getFamilyArray(); + public int getRowOffset() { + return Bytes.SIZEOF_SHORT; } @Override - public int getFamilyOffset() { - return currentKey.getFamilyOffset(); + public short getRowLength() { + return rowLength; } @Override - public byte getFamilyLength() { - return currentKey.getFamilyLength(); + public int getFamilyOffset() { + return familyOffset; } @Override - public byte[] getQualifierArray() { - return currentKey.getQualifierArray(); + public byte getFamilyLength() { + return familyLength; } @Override public int getQualifierOffset() { - return currentKey.getQualifierOffset(); + return qualifierOffset; } @Override public int getQualifierLength() { - return currentKey.getQualifierLength(); + return qualifierLength; } @Override public long getTimestamp() { - return currentKey.getTimestamp(); + return timestamp; } @Override public byte getTypeByte() { - return currentKey.getTypeByte(); + return typeByte; } @Override public long getSequenceId() { - return memstoreTS; + return seqId; } @Override public byte[] getValueArray() { - return currentBuffer.array(); + return this.valueBuffer; } @Override public int getValueOffset() { - return currentBuffer.arrayOffset() + valueOffset; + return valueOffset; } @Override @@ -277,18 +407,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { @Override public byte[] getTagsArray() { - if (tagCompressionContext != null) { - return tagsBuffer; - } - return currentBuffer.array(); + return this.tagsBuffer; } @Override public int getTagsOffset() { - if (tagCompressionContext != null) { - return 0; - } - return currentBuffer.arrayOffset() + tagsOffset; + return this.tagsOffset; } @Override @@ -298,36 +422,54 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { @Override public String toString() { - return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" - + getValueLength() + "/seqid=" + memstoreTS; + return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" + + getValueLength() + "/seqid=" + seqId; } - public Cell shallowCopy() { - return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(), - currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength, - currentKey.getQualifierOffset(), currentKey.getQualifierLength(), - currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset, - memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer); + @Override + public void setSequenceId(long seqId) { + this.seqId = seqId; + } + + @Override + public long heapSize() { + return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength; + } + + @Override + public int write(OutputStream out) throws IOException { + return write(out, true); + } + + @Override + public int write(OutputStream out, boolean withTags) throws IOException { + int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, + tagsLength, withTags); + writeInt(out, lenToWrite); + writeInt(out, keyOnlyBuffer.length); + writeInt(out, valueLength); + // Write key + out.write(keyOnlyBuffer); + // Write value + out.write(this.valueBuffer, this.valueOffset, this.valueLength); + if (withTags) { + // 2 bytes tags length followed by tags bytes + // tags length is serialized with 2 bytes only(short way) even if the type is int. + // As this is non -ve numbers, we save the sign bit. See HBASE-11437 + out.write((byte) (0xff & (this.tagsLength >> 8))); + out.write((byte) (0xff & this.tagsLength)); + out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength); + } + return lenToWrite + Bytes.SIZEOF_INT; } } - /** - * Copies only the key part of the keybuffer by doing a deep copy and passes the - * seeker state members for taking a clone. - * Note that the value byte[] part is still pointing to the currentBuffer and the - * represented by the valueOffset and valueLength - */ - // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId - // there. So this has to be an instance of SettableSequenceId. SeekerState need not be - // SettableSequenceId as we never return that to top layers. When we have to, we make - // ClonedSeekerState from it. - protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId, - Streamable { + protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize, + SettableSequenceId, Streamable { private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT - + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) - + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY)); - private byte[] keyOnlyBuffer; - private ByteBuffer currentBuffer; + + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT) + + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER)); + private ByteBuffer keyBuffer; private short rowLength; private int familyOffset; private byte familyLength; @@ -335,22 +477,22 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { private int qualifierLength; private long timestamp; private byte typeByte; + private ByteBuffer valueBuffer; private int valueOffset; private int valueLength; - private int tagsLength; + private ByteBuffer tagsBuffer; private int tagsOffset; - private byte[] cloneTagsBuffer; + private int tagsLength; private long seqId; - private TagCompressionContext tagCompressionContext; - - protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength, - int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength, - long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId, - int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext, - byte[] tagsBuffer) { - this.currentBuffer = currentBuffer; - keyOnlyBuffer = new byte[keyLength]; - this.tagCompressionContext = tagCompressionContext; + + protected OffheapDecodedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset, + byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte, + ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer, + int tagsOffset, int tagsLength) { + // The keyBuffer is always onheap + assert keyBuffer.hasArray(); + assert keyBuffer.arrayOffset() == 0; + this.keyBuffer = keyBuffer; this.rowLength = rowLength; this.familyOffset = familyOffset; this.familyLength = familyLength; @@ -358,123 +500,153 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { this.qualifierLength = qualLength; this.timestamp = timeStamp; this.typeByte = typeByte; - this.valueLength = valueLen; + this.valueBuffer = valueBuffer; this.valueOffset = valueOffset; + this.valueLength = valueLen; + this.tagsBuffer = tagsBuffer; this.tagsOffset = tagsOffset; this.tagsLength = tagsLength; - System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength); - if (tagCompressionContext != null) { - this.cloneTagsBuffer = new byte[tagsLength]; - System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength); - } setSequenceId(seqId); } @Override public byte[] getRowArray() { - return keyOnlyBuffer; + return this.keyBuffer.array(); } @Override - public byte[] getFamilyArray() { - return keyOnlyBuffer; + public int getRowOffset() { + return getRowPositionInByteBuffer(); } @Override - public byte[] getQualifierArray() { - return keyOnlyBuffer; + public short getRowLength() { + return this.rowLength; } @Override - public int getRowOffset() { - return Bytes.SIZEOF_SHORT; + public byte[] getFamilyArray() { + return this.keyBuffer.array(); } @Override - public short getRowLength() { - return rowLength; + public int getFamilyOffset() { + return getFamilyPositionInByteBuffer(); } @Override - public int getFamilyOffset() { - return familyOffset; + public byte getFamilyLength() { + return this.familyLength; } @Override - public byte getFamilyLength() { - return familyLength; + public byte[] getQualifierArray() { + return this.keyBuffer.array(); } @Override public int getQualifierOffset() { - return qualifierOffset; + return getQualifierPositionInByteBuffer(); } @Override public int getQualifierLength() { - return qualifierLength; + return this.qualifierLength; } @Override public long getTimestamp() { - return timestamp; + return this.timestamp; } @Override public byte getTypeByte() { - return typeByte; + return this.typeByte; } @Override public long getSequenceId() { - return seqId; + return this.seqId; } @Override public byte[] getValueArray() { - return currentBuffer.array(); + return CellUtil.cloneValue(this); } @Override public int getValueOffset() { - return currentBuffer.arrayOffset() + valueOffset; + return 0; } @Override public int getValueLength() { - return valueLength; + return this.valueLength; } @Override public byte[] getTagsArray() { - if (tagCompressionContext != null) { - return cloneTagsBuffer; - } - return currentBuffer.array(); + return CellUtil.cloneTags(this); } @Override public int getTagsOffset() { - if (tagCompressionContext != null) { - return 0; - } - return currentBuffer.arrayOffset() + tagsOffset; + return 0; } @Override public int getTagsLength() { - return tagsLength; + return this.tagsLength; } @Override - public String toString() { - return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen=" - + getValueLength() + "/seqid=" + seqId; + public ByteBuffer getRowByteBuffer() { + return this.keyBuffer; } @Override - public void setSequenceId(long seqId) { - this.seqId = seqId; + public int getRowPositionInByteBuffer() { + return Bytes.SIZEOF_SHORT; + } + + @Override + public ByteBuffer getFamilyByteBuffer() { + return this.keyBuffer; + } + + @Override + public int getFamilyPositionInByteBuffer() { + return this.familyOffset; + } + + @Override + public ByteBuffer getQualifierByteBuffer() { + return this.keyBuffer; + } + + @Override + public int getQualifierPositionInByteBuffer() { + return this.qualifierOffset; + } + + @Override + public ByteBuffer getValueByteBuffer() { + return this.valueBuffer; + } + + @Override + public int getValuePositionInByteBuffer() { + return this.valueOffset; + } + + @Override + public ByteBuffer getTagsByteBuffer() { + return this.tagsBuffer; + } + + @Override + public int getTagsPositionInByteBuffer() { + return this.tagsOffset; } @Override @@ -483,6 +655,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } @Override + public void setSequenceId(long seqId) { + this.seqId = seqId; + } + + @Override public int write(OutputStream out) throws IOException { return write(out, true); } @@ -492,26 +669,19 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, withTags); writeInt(out, lenToWrite); - writeInt(out, keyOnlyBuffer.length); + writeInt(out, keyBuffer.capacity()); writeInt(out, valueLength); // Write key - out.write(keyOnlyBuffer); + out.write(keyBuffer.array()); // Write value - assert this.currentBuffer.hasArray(); - out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.valueOffset, - this.valueLength); + writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength); if (withTags) { // 2 bytes tags length followed by tags bytes // tags length is serialized with 2 bytes only(short way) even if the type is int. // As this is non -ve numbers, we save the sign bit. See HBASE-11437 out.write((byte) (0xff & (this.tagsLength >> 8))); out.write((byte) (0xff & this.tagsLength)); - if (this.tagCompressionContext != null) { - out.write(cloneTagsBuffer); - } else { - out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.tagsOffset, - this.tagsLength); - } + writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength); } return lenToWrite + Bytes.SIZEOF_INT; } @@ -527,16 +697,30 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } } + private static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length) + throws IOException { + // We have write which takes ByteBuffer in ByteBufferOutputStream so that it can directly write + // bytes from the src ByteBuffer to the destination ByteBuffer. This avoid need for temp array + // creation and copy + if (out instanceof ByteBufferOutputStream) { + ((ByteBufferOutputStream) out).write(b, offset, length); + } else { + ByteBufferUtils.copyBufferToStream(out, b, offset, length); + } + } + protected abstract static class BufferedEncodedSeeker implements EncodedSeeker { protected HFileBlockDecodingContext decodingCtx; protected final CellComparator comparator; - protected ByteBuffer currentBuffer; - protected STATE current = createSeekerState(); // always valid - protected STATE previous = createSeekerState(); // may not be valid + protected ByteBuff currentBuffer; protected TagCompressionContext tagCompressionContext = null; protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue(); + // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too + // many object creations. + protected final Pair tmpPair = new Pair(); + protected STATE current, previous; public BufferedEncodedSeeker(CellComparator comparator, HFileBlockDecodingContext decodingCtx) { @@ -549,6 +733,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { throw new RuntimeException("Failed to initialize TagCompressionContext", e); } } + current = createSeekerState(); // always valid + previous = createSeekerState(); // may not be valid } protected boolean includesMvcc() { @@ -566,7 +752,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } @Override - public void setCurrentBuffer(ByteBuffer buffer) { + public void setCurrentBuffer(ByteBuff buffer) { if (this.tagCompressionContext != null) { this.tagCompressionContext.clear(); } @@ -589,9 +775,10 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { @Override public ByteBuffer getValueShallowCopy() { - ByteBuffer dup = currentBuffer.duplicate(); - dup.position(current.valueOffset); - dup.limit(current.valueOffset + current.valueLength); + currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair); + ByteBuffer dup = tmpPair.getFirst().duplicate(); + dup.position(tmpPair.getSecond()); + dup.limit(tmpPair.getSecond() + current.valueLength); return dup.slice(); } @@ -601,8 +788,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { kvBuffer.putInt(current.keyLength); kvBuffer.putInt(current.valueLength); kvBuffer.put(current.keyBuffer, 0, current.keyLength); - ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.valueOffset, - current.valueLength); + currentBuffer.get(kvBuffer, current.valueOffset, current.valueLength); if (current.tagsLength > 0) { // Put short as unsigned kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff)); @@ -610,8 +796,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { if (current.tagsOffset != -1) { // the offset of the tags bytes in the underlying buffer is marked. So the temp // buffer,tagsBuffer was not been used. - ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.tagsOffset, - current.tagsLength); + currentBuffer.get(kvBuffer, current.tagsOffset, current.tagsLength); } else { // When tagsOffset is marked as -1, tag compression was present and so the tags were // uncompressed into temp buffer, tagsBuffer. Let us copy it from there @@ -631,7 +816,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { @Override public Cell getKeyValue() { - return current.shallowCopy(); + return current.toCell(); } @Override @@ -657,7 +842,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } protected void decodeTags() { - current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer); + current.tagsLength = ByteBuff.readCompressedInt(currentBuffer); if (tagCompressionContext != null) { if (current.uncompressTags) { // Tag compression is been used. uncompress it into tagsBuffer @@ -669,7 +854,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { throw new RuntimeException("Exception while uncompressing tags", e); } } else { - ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength); + currentBuffer.skip(current.tagsCompressedLength); current.uncompressTags = true;// Reset this. } current.tagsOffset = -1; @@ -677,7 +862,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { // When tag compress is not used, let us not do copying of tags bytes into tagsBuffer. // Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer() current.tagsOffset = currentBuffer.position(); - ByteBufferUtils.skip(currentBuffer, current.tagsLength); + currentBuffer.skip(current.tagsLength); } } @@ -844,7 +1029,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { protected STATE createSeekerState() { // This will fail for non-default seeker state if the subclass does not // override this method. - return (STATE) new SeekerState(); + return (STATE) new SeekerState(this.tmpPair, this.includesTags()); } abstract protected void decodeFirst(); @@ -1017,4 +1202,13 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { encodingCtx.postEncoding(BlockType.DATA); } } + + protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) { + if (key.hasArray()) { + return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), + keyLength); + } else { + return new OffheapKeyOnlyKeyValue(key, key.position(), keyLength); + } + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java index 662be29..9310f32 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java @@ -71,8 +71,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { int keyLength = block.getIntStrictlyForward(Bytes.SIZEOF_INT); int pos = 3 * Bytes.SIZEOF_INT; ByteBuffer key = block.asSubByteBuffer(pos + keyLength).duplicate(); - // TODO : to be changed here for BBCell - return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + pos, keyLength); + return createFirstKeyCell(key, keyLength); } @Override @@ -91,14 +90,14 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { current.ensureSpaceForKey(); currentBuffer.get(current.keyBuffer, 0, current.keyLength); current.valueOffset = currentBuffer.position(); - ByteBufferUtils.skip(currentBuffer, current.valueLength); + currentBuffer.skip(current.valueLength); if (includesTags()) { // Read short as unsigned, high byte first current.tagsLength = ((currentBuffer.get() & 0xff) << 8) ^ (currentBuffer.get() & 0xff); - ByteBufferUtils.skip(currentBuffer, current.tagsLength); + currentBuffer.skip(current.tagsLength); } if (includesMvcc()) { - current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + current.memstoreTS = ByteBuff.readVLong(currentBuffer); } else { current.memstoreTS = 0; } @@ -107,7 +106,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { @Override protected void decodeFirst() { - ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT); + currentBuffer.skip(Bytes.SIZEOF_INT); current.lastCommonPrefix = 0; decodeNext(); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java index ce71308..8326116 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java @@ -131,14 +131,14 @@ public interface DataBlockEncoder { * An interface which enable to seek while underlying data is encoded. * * It works on one HFileBlock, but it is reusable. See - * {@link #setCurrentBuffer(ByteBuffer)}. + * {@link #setCurrentBuffer(ByteBuff)}. */ interface EncodedSeeker { /** * Set on which buffer there will be done seeking. * @param buffer Used for seeking. */ - void setCurrentBuffer(ByteBuffer buffer); + void setCurrentBuffer(ByteBuff buffer); /** * From the current position creates a cell using the key part diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java index 90b8e6e..4587352 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * Compress using: @@ -362,9 +363,14 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { } protected static class DiffSeekerState extends SeekerState { + private int rowLengthWithSize; private long timestamp; + public DiffSeekerState(Pair tmpPair, boolean includeTags) { + super(tmpPair, includeTags); + } + @Override protected void copyFromNext(SeekerState that) { super.copyFromNext(that); @@ -389,14 +395,12 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { if (!isFirst) { type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE]; } - current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer); + current.keyLength = ByteBuff.readCompressedInt(currentBuffer); } if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { - current.valueLength = - ByteBufferUtils.readCompressedInt(currentBuffer); + current.valueLength = ByteBuff.readCompressedInt(currentBuffer); } - current.lastCommonPrefix = - ByteBufferUtils.readCompressedInt(currentBuffer); + current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); current.ensureSpaceForKey(); @@ -446,8 +450,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH; int timestampFitInBytes = 1 + ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH); - long timestampOrDiff = - ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes); + long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes); if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { timestampOrDiff = -timestampOrDiff; } @@ -467,13 +470,13 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { } current.valueOffset = currentBuffer.position(); - ByteBufferUtils.skip(currentBuffer, current.valueLength); + currentBuffer.skip(current.valueLength); if (includesTags()) { decodeTags(); } if (includesMvcc()) { - current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + current.memstoreTS = ByteBuff.readVLong(currentBuffer); } else { current.memstoreTS = 0; } @@ -482,7 +485,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { @Override protected void decodeFirst() { - ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT); + currentBuffer.skip(Bytes.SIZEOF_INT); // read column family byte familyNameLength = currentBuffer.get(); @@ -500,7 +503,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { @Override protected DiffSeekerState createSeekerState() { - return new DiffSeekerState(); + return new DiffSeekerState(this.tmpPair, this.includesTags()); } }; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java index fa4adbd..79e2792 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster. @@ -364,8 +365,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { ByteBuff.readCompressedInt(block); // commonLength ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); block.reset(); - // TODO : Change to BBCell. - return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength); + return createFirstKeyCell(key, keyLength); } @Override @@ -379,6 +379,10 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { private int rowLengthWithSize; private int familyLengthWithSize; + public FastDiffSeekerState(Pair tmpPair, boolean includeTags) { + super(tmpPair, includeTags); + } + @Override protected void copyFromNext(SeekerState that) { super.copyFromNext(that); @@ -404,14 +408,12 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { current.prevTimestampAndType, 0, current.prevTimestampAndType.length); } - current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer); + current.keyLength = ByteBuff.readCompressedInt(currentBuffer); } if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) { - current.valueLength = - ByteBufferUtils.readCompressedInt(currentBuffer); + current.valueLength = ByteBuff.readCompressedInt(currentBuffer); } - current.lastCommonPrefix = - ByteBufferUtils.readCompressedInt(currentBuffer); + current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); current.ensureSpaceForKey(); @@ -491,14 +493,14 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { // handle value if ((flag & FLAG_SAME_VALUE) == 0) { current.valueOffset = currentBuffer.position(); - ByteBufferUtils.skip(currentBuffer, current.valueLength); + currentBuffer.skip(current.valueLength); } if (includesTags()) { decodeTags(); } if (includesMvcc()) { - current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + current.memstoreTS = ByteBuff.readVLong(currentBuffer); } else { current.memstoreTS = 0; } @@ -507,7 +509,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { @Override protected void decodeFirst() { - ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT); + currentBuffer.skip(Bytes.SIZEOF_INT); decode(true); } @@ -518,7 +520,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { @Override protected FastDiffSeekerState createSeekerState() { - return new FastDiffSeekerState(); + return new FastDiffSeekerState(this.tmpPair, this.includesTags()); } }; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java index 6e89de4..6f5acd5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java @@ -186,8 +186,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { } ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); block.reset(); - // TODO : Change to BBCell - return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength); + return createFirstKeyCell(key, keyLength); } @Override @@ -201,21 +200,20 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { return new BufferedEncodedSeeker(comparator, decodingCtx) { @Override protected void decodeNext() { - current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer); - current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer); - current.lastCommonPrefix = - ByteBufferUtils.readCompressedInt(currentBuffer); + current.keyLength = ByteBuff.readCompressedInt(currentBuffer); + current.valueLength = ByteBuff.readCompressedInt(currentBuffer); + current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); current.keyLength += current.lastCommonPrefix; current.ensureSpaceForKey(); currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, current.keyLength - current.lastCommonPrefix); current.valueOffset = currentBuffer.position(); - ByteBufferUtils.skip(currentBuffer, current.valueLength); + currentBuffer.skip(current.valueLength); if (includesTags()) { decodeTags(); } if (includesMvcc()) { - current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); + current.memstoreTS = ByteBuff.readVLong(currentBuffer); } else { current.memstoreTS = 0; } @@ -224,7 +222,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { @Override protected void decodeFirst() { - ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT); + currentBuffer.skip(Bytes.SIZEOF_INT); decodeNext(); } }; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java index 0b442a5..6e13b44 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java @@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.io.util; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Pair; import com.google.common.base.Preconditions; @@ -84,7 +84,7 @@ public class StreamUtils { return result; } - public static int readRawVarint32(ByteBuffer input) throws IOException { + public static int readRawVarint32(ByteBuff input) throws IOException { byte tmp = input.get(); if (tmp >= 0) { return tmp; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 14e77a7..4398f5d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.WritableUtils; /** * An abstract class that abstracts out as to how the byte buffers are used, @@ -435,4 +436,23 @@ public abstract class ByteBuff { } return tmpLength; } + + /** + * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a + * {@link ByteBuff}. + */ + public static long readVLong(ByteBuff in) { + byte firstByte = in.get(); + int len = WritableUtils.decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + for (int idx = 0; idx < len-1; idx++) { + byte b = in.get(); + i = i << 8; + i = i | (b & 0xFF); + } + return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); + } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java index 841c468..f4c4afe 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java @@ -29,6 +29,7 @@ import java.util.List; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -60,11 +61,11 @@ public class TestTagCompressionContext { byte[] dest = new byte[tagsLength1]; ByteBuffer ob = ByteBuffer.wrap(baos.toByteArray()); - context.uncompressTags(ob, dest, 0, tagsLength1); + context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1); assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0, tagsLength1)); dest = new byte[tagsLength2]; - context.uncompressTags(ob, dest, 0, tagsLength2); + context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2); assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0, tagsLength2)); } diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java index 93610c4..f6ceec3 100644 --- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java +++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.codec.prefixtree.decode.PrefixTreeArraySearcher; import org.apache.hadoop.hbase.codec.prefixtree.scanner.CellScannerPosition; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -55,8 +56,8 @@ public class PrefixTreeSeeker implements EncodedSeeker { } @Override - public void setCurrentBuffer(ByteBuffer fullBlockBuffer) { - block = fullBlockBuffer; + public void setCurrentBuffer(ByteBuff fullBlockBuffer) { + block = fullBlockBuffer.asSubByteBuffer(fullBlockBuffer.limit()); // TODO : change to Bytebuff ptSearcher = DecoderFactory.checkOut(block, includeMvccVersion); rewind(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index ae2f6c1..4188c89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1454,8 +1454,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { + DataBlockEncoding.getNameFromId(dataBlockEncoderId)); } ByteBuff encodedBuffer = getEncodedBuffer(newBlock); - // TODO : Change the DBEs to work with ByteBuffs - seeker.setCurrentBuffer(encodedBuffer.asSubByteBuffer(encodedBuffer.limit())); + seeker.setCurrentBuffer(encodedBuffer); blockFetches++; // Reset the next indexed key diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e8b79a8..feec5f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HRegionLocator; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; @@ -249,6 +248,19 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return Collections.unmodifiableList(configurations); } + public static List memStoreTSTagsAndOffheapCombination() { + List configurations = new ArrayList(); + configurations.add(new Object[] { false, false, true }); + configurations.add(new Object[] { false, false, false }); + configurations.add(new Object[] { false, true, true }); + configurations.add(new Object[] { false, true, false }); + configurations.add(new Object[] { true, false, true }); + configurations.add(new Object[] { true, false, false }); + configurations.add(new Object[] { true, true, true }); + configurations.add(new Object[] { true, true, false }); + return Collections.unmodifiableList(configurations); + } + public static final Collection BLOOM_AND_COMPRESSION_COMBINATIONS = bloomAndCompressionCombinations(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java index 238e6da..5e61c8f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hbase.io.encoding; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.nio.ByteBuffer; + import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -47,7 +50,7 @@ public class TestBufferedDataBlockEncoder { @Test public void testEnsureSpaceForKey() { BufferedDataBlockEncoder.SeekerState state = - new BufferedDataBlockEncoder.SeekerState(); + new BufferedDataBlockEncoder.SeekerState(new Pair(), false); for (int i = 1; i <= 65536; ++i) { state.keyLength = i; state.ensureSpaceForKey(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java index f4160db..d767fa4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -74,14 +74,18 @@ public class TestDataBlockEncoders { private final boolean includesMemstoreTS; private final boolean includesTags; + private final boolean useOffheapData; @Parameters public static Collection parameters() { - return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED; + return HBaseTestingUtility.memStoreTSTagsAndOffheapCombination(); } - public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag) { + + public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag, + boolean useOffheapData) { this.includesMemstoreTS = includesMemstoreTS; this.includesTags = includesTag; + this.useOffheapData = useOffheapData; } private HFileBlockEncodingContext getEncodingContext(Compression.Algorithm algo, @@ -178,12 +182,15 @@ public class TestDataBlockEncoders { List encodedSeekers = new ArrayList(); for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + // Off heap block data support not added for PREFIX_TREE DBE yet. + // TODO remove this once support is added. HBASE-12298 + if (this.useOffheapData && encoding == DataBlockEncoding.PREFIX_TREE) continue; DataBlockEncoder encoder = encoding.getEncoder(); if (encoder == null) { continue; } ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, - getEncodingContext(Compression.Algorithm.NONE, encoding)); + getEncodingContext(Compression.Algorithm.NONE, encoding), this.useOffheapData); HFileContext meta = new HFileContextBuilder() .withHBaseCheckSum(false) .withIncludesMvcc(includesMemstoreTS) @@ -192,7 +199,7 @@ public class TestDataBlockEncoders { .build(); DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(CellComparator.COMPARATOR, encoder.newDataBlockDecodingContext(meta)); - seeker.setCurrentBuffer(encodedBuffer); + seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); encodedSeekers.add(seeker); } // test it! @@ -222,7 +229,7 @@ public class TestDataBlockEncoders { } static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List kvs, - HFileBlockEncodingContext encodingContext) throws IOException { + HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException { DataBlockEncoder encoder = encoding.getEncoder(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER); @@ -236,6 +243,12 @@ public class TestDataBlockEncoders { encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer()); byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET]; System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length); + if (useOffheapData) { + ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length); + bb.put(encodedData); + bb.rewind(); + return bb; + } return ByteBuffer.wrap(encodedData); } @@ -244,12 +257,15 @@ public class TestDataBlockEncoders { List sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + // Off heap block data support not added for PREFIX_TREE DBE yet. + // TODO remove this once support is added. HBASE-12298 + if (this.useOffheapData && encoding == DataBlockEncoding.PREFIX_TREE) continue; if (encoding.getEncoder() == null) { continue; } DataBlockEncoder encoder = encoding.getEncoder(); ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, - getEncodingContext(Compression.Algorithm.NONE, encoding)); + getEncodingContext(Compression.Algorithm.NONE, encoding), this.useOffheapData); HFileContext meta = new HFileContextBuilder() .withHBaseCheckSum(false) .withIncludesMvcc(includesMemstoreTS) @@ -258,31 +274,19 @@ public class TestDataBlockEncoders { .build(); DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(CellComparator.COMPARATOR, encoder.newDataBlockDecodingContext(meta)); - seeker.setCurrentBuffer(encodedBuffer); + seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); int i = 0; do { KeyValue expectedKeyValue = sampleKv.get(i); - ByteBuffer keyValue = seeker.getKeyValueBuffer(); - if (0 != Bytes.compareTo(keyValue.array(), keyValue.arrayOffset(), keyValue.limit(), - expectedKeyValue.getBuffer(), expectedKeyValue.getOffset(), - expectedKeyValue.getLength())) { - - int commonPrefix = 0; - byte[] left = keyValue.array(); - byte[] right = expectedKeyValue.getBuffer(); - int leftOff = keyValue.arrayOffset(); - int rightOff = expectedKeyValue.getOffset(); - int length = Math.min(keyValue.limit(), expectedKeyValue.getLength()); - while (commonPrefix < length - && left[commonPrefix + leftOff] == right[commonPrefix + rightOff]) { - commonPrefix++; - } - + Cell cell = seeker.getKeyValue(); + if (CellComparator.COMPARATOR.compareKeyIgnoresMvcc(expectedKeyValue, cell) != 0) { + int commonPrefix = CellUtil + .findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true); fail(String.format("next() produces wrong results " + "encoder: %s i: %d commonPrefix: %d" + "\n expected %s\n actual %s", encoder .toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(), - expectedKeyValue.getOffset(), expectedKeyValue.getLength()), Bytes - .toStringBinary(keyValue))); + expectedKeyValue.getKeyOffset(), expectedKeyValue.getKeyLength()), CellUtil.toString( + cell, false))); } i++; } while (seeker.next()); @@ -298,33 +302,19 @@ public class TestDataBlockEncoders { List sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); for (DataBlockEncoding encoding : DataBlockEncoding.values()) { + // Off heap block data support not added for PREFIX_TREE DBE yet. + // TODO remove this once support is added. HBASE-12298 + if (this.useOffheapData && encoding == DataBlockEncoding.PREFIX_TREE) continue; if (encoding.getEncoder() == null) { continue; } DataBlockEncoder encoder = encoding.getEncoder(); ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, - getEncodingContext(Compression.Algorithm.NONE, encoding)); - Cell key = encoder.getFirstKeyCellInBlock(new MultiByteBuff( - encodedBuffer)); - KeyValue keyBuffer = null; - if(encoding == DataBlockEncoding.PREFIX_TREE) { - // This is not an actual case. So the Prefix tree block is not loaded in case of Prefix_tree - // Just copy only the key part to form a keyBuffer - byte[] serializedKey = CellUtil.getCellKeySerializedAsKeyValueKey(key); - keyBuffer = KeyValueUtil.createKeyValueFromKey(serializedKey); - } else { - keyBuffer = KeyValueUtil.ensureKeyValue(key); - } + getEncodingContext(Compression.Algorithm.NONE, encoding), this.useOffheapData); + Cell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer)); KeyValue firstKv = sampleKv.get(0); - if (0 != CellComparator.COMPARATOR.compareKeyIgnoresMvcc(keyBuffer, firstKv)) { - - int commonPrefix = 0; - int length = Math.min(keyBuffer.getKeyLength(), firstKv.getKeyLength()); - while (commonPrefix < length - && keyBuffer.getBuffer()[keyBuffer.getKeyOffset() + commonPrefix] == firstKv - .getBuffer()[firstKv.getKeyOffset() + commonPrefix]) { - commonPrefix++; - } + if (0 != CellComparator.COMPARATOR.compareKeyIgnoresMvcc(key, firstKv)) { + int commonPrefix = CellUtil.findCommonPrefixInFlatKey(key, firstKv, false, true); fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java index 91115c1..92c2feb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -117,7 +118,7 @@ public class TestPrefixTreeEncoding { byte[] onDiskBytes = baosInMemory.toByteArray(); ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE, onDiskBytes.length - DataBlockEncoding.ID_SIZE); - seeker.setCurrentBuffer(readBuffer); + seeker.setCurrentBuffer(new SingleByteBuff(readBuffer)); // Seek before the first keyvalue; KeyValue seekKey = KeyValueUtil.createFirstDeleteFamilyOnRow(getRowKey(batchId, 0), CF_BYTES); @@ -166,7 +167,7 @@ public class TestPrefixTreeEncoding { byte[] onDiskBytes = baosInMemory.toByteArray(); ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE, onDiskBytes.length - DataBlockEncoding.ID_SIZE); - seeker.setCurrentBuffer(readBuffer); + seeker.setCurrentBuffer(new SingleByteBuff(readBuffer)); Cell previousKV = null; do { Cell currentKV = seeker.getKeyValue(); @@ -235,7 +236,7 @@ public class TestPrefixTreeEncoding { List kvList = new ArrayList(); for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) { kvList.clear(); - encodeSeeker.setCurrentBuffer(encodedData); + encodeSeeker.setCurrentBuffer(new SingleByteBuff(encodedData)); KeyValue firstOnRow = KeyValueUtil.createFirstOnRow(getRowKey(batchId, i)); encodeSeeker.seekToKeyInBlock( new KeyValue.KeyOnlyKeyValue(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java index cf94c6b..db7982c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java @@ -21,25 +21,43 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; @Category({IOTests.class, SmallTests.class}) +@RunWith(Parameterized.class) public class TestSeekToBlockWithEncoders { + private final boolean useOffheapData; + + @Parameters + public static Collection parameters() { + return HBaseTestingUtility.BOOLEAN_PARAMETERIZED; + } + + public TestSeekToBlockWithEncoders(boolean useOffheapData) { + this.useOffheapData = useOffheapData; + } + /** * Test seeking while file is encoded. */ @@ -265,10 +283,10 @@ public class TestSeekToBlockWithEncoders { HFileBlockEncodingContext encodingContext = encoder.newDataBlockEncodingContext(encoding, HConstants.HFILEBLOCK_DUMMY_HEADER, meta); ByteBuffer encodedBuffer = TestDataBlockEncoders.encodeKeyValues(encoding, kvs, - encodingContext); + encodingContext, this.useOffheapData); DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(CellComparator.COMPARATOR, encoder.newDataBlockDecodingContext(meta)); - seeker.setCurrentBuffer(encodedBuffer); + seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); encodedSeekers.add(seeker); } // test it!