diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 207f275..100fc56 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -100,6 +100,12 @@ public final class CellUtil {
return output;
}
+ public static byte[] cloneTags(Cell cell) {
+ byte[] output = new byte[cell.getTagsLength()];
+ copyTagTo(cell, output, 0);
+ return output;
+ }
+
/**
* Returns tag value in a new byte array. If server-side, use
* {@link Tag#getBuffer()} with appropriate {@link Tag#getTagOffset()} and
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java
new file mode 100644
index 0000000..477d16e
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java
@@ -0,0 +1,210 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This is a key only Cell implementation which is identical to {@link KeyValue.KeyOnlyKeyValue}
+ * with respect to key serialization but have its data in off heap memory.
+ */
+@InterfaceAudience.Private
+public class OffheapKeyOnlyKeyValue extends ByteBufferedCell {
+
+ private ByteBuffer buf;
+ private int offset = 0; // offset into buffer where key starts at
+ private int length = 0; // length of this.
+ private short rowLen;
+
+ public OffheapKeyOnlyKeyValue(ByteBuffer buf, int offset, int length) {
+ assert buf.isDirect();
+ this.buf = buf;
+ this.offset = offset;
+ this.length = length;
+ this.rowLen = ByteBufferUtils.toShort(this.buf, this.offset);
+ }
+
+ @Override
+ public byte[] getRowArray() {
+ return CellUtil.cloneRow(this);
+ }
+
+ @Override
+ public int getRowOffset() {
+ return 0;
+ }
+
+ @Override
+ public short getRowLength() {
+ return this.rowLen;
+ }
+
+ @Override
+ public byte[] getFamilyArray() {
+ return CellUtil.cloneFamily(this);
+ }
+
+ @Override
+ public int getFamilyOffset() {
+ return 0;
+ }
+
+ @Override
+ public byte getFamilyLength() {
+ return getFamilyLength(getFamilyLengthPosition());
+ }
+
+ private byte getFamilyLength(int famLenPos) {
+ return ByteBufferUtils.toByte(this.buf, famLenPos);
+ }
+
+ @Override
+ public byte[] getQualifierArray() {
+ return CellUtil.cloneQualifier(this);
+ }
+
+ @Override
+ public int getQualifierOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getQualifierLength() {
+ return getQualifierLength(getRowLength(), getFamilyLength());
+ }
+
+ private int getQualifierLength(int rlength, int flength) {
+ return this.length - (int) KeyValue.getKeyDataStructureSize(rlength, flength, 0);
+ }
+
+ @Override
+ public long getTimestamp() {
+ return ByteBufferUtils.toLong(this.buf, getTimestampOffset());
+ }
+
+ private int getTimestampOffset() {
+ return this.offset + this.length - KeyValue.TIMESTAMP_TYPE_SIZE;
+ }
+
+ @Override
+ public byte getTypeByte() {
+ return ByteBufferUtils.toByte(this.buf, this.offset + this.length - 1);
+ }
+
+ @Override
+ public long getSequenceId() {
+ return 0;
+ }
+
+ @Override
+ public byte[] getValueArray() {
+ throw new IllegalArgumentException("This is a key only Cell");
+ }
+
+ @Override
+ public int getValueOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getValueLength() {
+ return 0;
+ }
+
+ @Override
+ public byte[] getTagsArray() {
+ throw new IllegalArgumentException("This is a key only Cell");
+ }
+
+ @Override
+ public int getTagsOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getTagsLength() {
+ return 0;
+ }
+
+ @Override
+ public ByteBuffer getRowByteBuffer() {
+ return this.buf;
+ }
+
+ @Override
+ public int getRowPositionInByteBuffer() {
+ return this.offset + Bytes.SIZEOF_SHORT;
+ }
+
+ @Override
+ public ByteBuffer getFamilyByteBuffer() {
+ return this.buf;
+ }
+
+ @Override
+ public int getFamilyPositionInByteBuffer() {
+ return getFamilyLengthPosition() + Bytes.SIZEOF_BYTE;
+ }
+
+ // The position in BB where the family length is added.
+ private int getFamilyLengthPosition() {
+ return this.offset + Bytes.SIZEOF_SHORT + getRowLength();
+ }
+
+ @Override
+ public ByteBuffer getQualifierByteBuffer() {
+ return this.buf;
+ }
+
+ @Override
+ public int getQualifierPositionInByteBuffer() {
+ int famLenPos = getFamilyLengthPosition();
+ return famLenPos + Bytes.SIZEOF_BYTE + getFamilyLength(famLenPos);
+ }
+
+ @Override
+ public ByteBuffer getValueByteBuffer() {
+ throw new IllegalArgumentException("This is a key only Cell");
+ }
+
+ @Override
+ public int getValuePositionInByteBuffer() {
+ return 0;
+ }
+
+ @Override
+ public ByteBuffer getTagsByteBuffer() {
+ throw new IllegalArgumentException("This is a key only Cell");
+ }
+
+ @Override
+ public int getTagsPositionInByteBuffer() {
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return CellUtil.toString(this, false);
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index 1a18b2d..1b2ab5d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -127,6 +127,11 @@ public class ByteBufferOutputStream extends OutputStream {
ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
}
+ public void write(ByteBuffer b, int off, int len) throws IOException {
+ checkSizeAndGrow(len);
+ ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len);
+ }
+
/**
* Writes an int to the underlying output stream as four
* bytes, high byte first.
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
index 26e7b50..05c4ad1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IOUtils;
@@ -133,7 +134,7 @@ public class TagCompressionContext {
* @return bytes count read from source to uncompress all tags.
* @throws IOException
*/
- public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int length)
+ public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length)
throws IOException {
int srcBeginPos = src.position();
int endOffset = offset + length;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 966c59b..6e301db 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -22,14 +22,18 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.ByteBufferedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.OffheapKeyOnlyKeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SettableSequenceId;
import org.apache.hadoop.hbase.Streamable;
+import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -38,9 +42,11 @@ import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
/**
@@ -102,8 +108,8 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
- qualCommonPrefix);
}
- protected static class SeekerState implements Cell {
- protected ByteBuffer currentBuffer;
+ protected static class SeekerState {
+ protected ByteBuff currentBuffer;
protected TagCompressionContext tagCompressionContext;
protected int valueOffset = -1;
protected int keyLength;
@@ -121,6 +127,13 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
protected long memstoreTS;
protected int nextKvOffset;
protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
+ // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
+ // many object creations.
+ private Pair tmpPair;
+
+ public SeekerState(Pair tmpPair) {
+ this.tmpPair = tmpPair;
+ }
protected boolean isValid() {
return valueOffset != -1;
@@ -200,74 +213,182 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
}
+ public Cell toCell() {
+ // Buffer backing the value and tags part from the HFileBlock's buffer
+ // When tag compression in use, this will be only the value bytes area.
+ ByteBuffer valAndTagsBuffer;
+ int vOffset;
+ int valAndTagsLength = this.valueLength;
+ if (this.tagCompressionContext == null) {
+ // Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
+ // length
+ valAndTagsLength += Tag.TAG_LENGTH_SIZE + this.tagsLength;
+ }
+ this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
+ valAndTagsBuffer = this.tmpPair.getFirst();
+ vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
+ if (valAndTagsBuffer.hasArray()) {
+ return toOnheapCell(valAndTagsBuffer, vOffset);
+ } else {
+ return toOffheapCell(valAndTagsBuffer, vOffset);
+ }
+ }
+
+ private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset) {
+ byte[] tagsArray;
+ int tOffset;
+ if (this.tagCompressionContext == null) {
+ tagsArray = valAndTagsBuffer.array();
+ tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
+ + Tag.TAG_LENGTH_SIZE;
+ } else {
+ tagsArray = Bytes.copy(tagsBuffer);
+ tOffset = 0;
+ }
+ return new OnheapDecodedCell(Bytes.copy(keyBuffer), currentKey.getRowLength(),
+ currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
+ currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
+ currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(),
+ valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray,
+ tOffset, this.tagsLength);
+ }
+
+ private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset) {
+ ByteBuffer tagsBuf;
+ int tOffset;
+ if (this.tagCompressionContext == null) {
+ tagsBuf = valAndTagsBuffer;
+ tOffset = vOffset + this.valueLength + Tag.TAG_LENGTH_SIZE;
+ } else {
+ tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer));
+ tOffset = 0;
+ }
+ return new OffheapDecodedCell(ByteBuffer.wrap(Bytes.copy(keyBuffer)),
+ currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
+ currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
+ currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset,
+ this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
+ }
+ }
+
+ /**
+ * Copies only the key part of the keybuffer by doing a deep copy and passes the
+ * seeker state members for taking a clone.
+ * Note that the value byte[] part is still pointing to the currentBuffer and the
+ * represented by the valueOffset and valueLength
+ */
+ // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
+ // there. So this has to be an instance of SettableSequenceId.
+ protected static class OnheapDecodedCell implements Cell, HeapSize, SettableSequenceId,
+ Streamable {
+ private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
+ private byte[] keyOnlyBuffer;
+ private short rowLength;
+ private int familyOffset;
+ private byte familyLength;
+ private int qualifierOffset;
+ private int qualifierLength;
+ private long timestamp;
+ private byte typeByte;
+ private byte[] valueBuffer;
+ private int valueOffset;
+ private int valueLength;
+ private byte[] tagsBuffer;
+ private int tagsOffset;
+ private int tagsLength;
+ private long seqId;
+
+ protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
+ byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
+ byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
+ int tagsOffset, int tagsLength) {
+ this.keyOnlyBuffer = keyBuffer;
+ this.rowLength = rowLength;
+ this.familyOffset = familyOffset;
+ this.familyLength = familyLength;
+ this.qualifierOffset = qualOffset;
+ this.qualifierLength = qualLength;
+ this.timestamp = timeStamp;
+ this.typeByte = typeByte;
+ this.valueBuffer = valueBuffer;
+ this.valueOffset = valueOffset;
+ this.valueLength = valueLen;
+ this.tagsBuffer = tagsBuffer;
+ this.tagsOffset = tagsOffset;
+ this.tagsLength = tagsLength;
+ setSequenceId(seqId);
+ }
+
@Override
public byte[] getRowArray() {
- return currentKey.getRowArray();
+ return keyOnlyBuffer;
}
@Override
- public int getRowOffset() {
- return Bytes.SIZEOF_SHORT;
+ public byte[] getFamilyArray() {
+ return keyOnlyBuffer;
}
@Override
- public short getRowLength() {
- return currentKey.getRowLength();
+ public byte[] getQualifierArray() {
+ return keyOnlyBuffer;
}
@Override
- public byte[] getFamilyArray() {
- return currentKey.getFamilyArray();
+ public int getRowOffset() {
+ return Bytes.SIZEOF_SHORT;
}
@Override
- public int getFamilyOffset() {
- return currentKey.getFamilyOffset();
+ public short getRowLength() {
+ return rowLength;
}
@Override
- public byte getFamilyLength() {
- return currentKey.getFamilyLength();
+ public int getFamilyOffset() {
+ return familyOffset;
}
@Override
- public byte[] getQualifierArray() {
- return currentKey.getQualifierArray();
+ public byte getFamilyLength() {
+ return familyLength;
}
@Override
public int getQualifierOffset() {
- return currentKey.getQualifierOffset();
+ return qualifierOffset;
}
@Override
public int getQualifierLength() {
- return currentKey.getQualifierLength();
+ return qualifierLength;
}
@Override
public long getTimestamp() {
- return currentKey.getTimestamp();
+ return timestamp;
}
@Override
public byte getTypeByte() {
- return currentKey.getTypeByte();
+ return typeByte;
}
@Override
public long getSequenceId() {
- return memstoreTS;
+ return seqId;
}
@Override
public byte[] getValueArray() {
- return currentBuffer.array();
+ return this.valueBuffer;
}
@Override
public int getValueOffset() {
- return currentBuffer.arrayOffset() + valueOffset;
+ return valueOffset;
}
@Override
@@ -277,18 +398,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
@Override
public byte[] getTagsArray() {
- if (tagCompressionContext != null) {
- return tagsBuffer;
- }
- return currentBuffer.array();
+ return this.tagsBuffer;
}
@Override
public int getTagsOffset() {
- if (tagCompressionContext != null) {
- return 0;
- }
- return currentBuffer.arrayOffset() + tagsOffset;
+ return this.tagsOffset;
}
@Override
@@ -298,36 +413,54 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
@Override
public String toString() {
- return KeyValue.keyToString(this.keyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
- + getValueLength() + "/seqid=" + memstoreTS;
+ return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
+ + getValueLength() + "/seqid=" + seqId;
}
- public Cell shallowCopy() {
- return new ClonedSeekerState(currentBuffer, keyBuffer, currentKey.getRowLength(),
- currentKey.getFamilyOffset(), currentKey.getFamilyLength(), keyLength,
- currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
- currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, valueOffset,
- memstoreTS, tagsOffset, tagsLength, tagCompressionContext, tagsBuffer);
+ @Override
+ public void setSequenceId(long seqId) {
+ this.seqId = seqId;
+ }
+
+ @Override
+ public long heapSize() {
+ return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
+ }
+
+ @Override
+ public int write(OutputStream out) throws IOException {
+ return write(out, true);
+ }
+
+ @Override
+ public int write(OutputStream out, boolean withTags) throws IOException {
+ int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
+ tagsLength, withTags);
+ writeInt(out, lenToWrite);
+ writeInt(out, keyOnlyBuffer.length);
+ writeInt(out, valueLength);
+ // Write key
+ out.write(keyOnlyBuffer);
+ // Write value
+ out.write(this.valueBuffer, this.valueOffset, this.valueLength);
+ if (withTags) {
+ // 2 bytes tags length followed by tags bytes
+ // tags length is serialized with 2 bytes only(short way) even if the type is int.
+ // As this is non -ve numbers, we save the sign bit. See HBASE-11437
+ out.write((byte) (0xff & (this.tagsLength >> 8)));
+ out.write((byte) (0xff & this.tagsLength));
+ out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
+ }
+ return lenToWrite + Bytes.SIZEOF_INT;
}
}
- /**
- * Copies only the key part of the keybuffer by doing a deep copy and passes the
- * seeker state members for taking a clone.
- * Note that the value byte[] part is still pointing to the currentBuffer and the
- * represented by the valueOffset and valueLength
- */
- // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
- // there. So this has to be an instance of SettableSequenceId. SeekerState need not be
- // SettableSequenceId as we never return that to top layers. When we have to, we make
- // ClonedSeekerState from it.
- protected static class ClonedSeekerState implements Cell, HeapSize, SettableSequenceId,
- Streamable {
+ protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize,
+ SettableSequenceId, Streamable {
private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
- + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
- + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * ClassSize.ARRAY));
- private byte[] keyOnlyBuffer;
- private ByteBuffer currentBuffer;
+ + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
+ private ByteBuffer keyBuffer;
private short rowLength;
private int familyOffset;
private byte familyLength;
@@ -335,22 +468,22 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
private int qualifierLength;
private long timestamp;
private byte typeByte;
+ private ByteBuffer valueBuffer;
private int valueOffset;
private int valueLength;
- private int tagsLength;
+ private ByteBuffer tagsBuffer;
private int tagsOffset;
- private byte[] cloneTagsBuffer;
+ private int tagsLength;
private long seqId;
- private TagCompressionContext tagCompressionContext;
-
- protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
- int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
- long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
- int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
- byte[] tagsBuffer) {
- this.currentBuffer = currentBuffer;
- keyOnlyBuffer = new byte[keyLength];
- this.tagCompressionContext = tagCompressionContext;
+
+ protected OffheapDecodedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
+ byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
+ ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
+ int tagsOffset, int tagsLength) {
+ // The keyBuffer is always onheap
+ assert keyBuffer.hasArray();
+ assert keyBuffer.arrayOffset() == 0;
+ this.keyBuffer = keyBuffer;
this.rowLength = rowLength;
this.familyOffset = familyOffset;
this.familyLength = familyLength;
@@ -358,123 +491,153 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
this.qualifierLength = qualLength;
this.timestamp = timeStamp;
this.typeByte = typeByte;
- this.valueLength = valueLen;
+ this.valueBuffer = valueBuffer;
this.valueOffset = valueOffset;
+ this.valueLength = valueLen;
+ this.tagsBuffer = tagsBuffer;
this.tagsOffset = tagsOffset;
this.tagsLength = tagsLength;
- System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
- if (tagCompressionContext != null) {
- this.cloneTagsBuffer = new byte[tagsLength];
- System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
- }
setSequenceId(seqId);
}
@Override
public byte[] getRowArray() {
- return keyOnlyBuffer;
+ return this.keyBuffer.array();
}
@Override
- public byte[] getFamilyArray() {
- return keyOnlyBuffer;
+ public int getRowOffset() {
+ return getRowPositionInByteBuffer();
}
@Override
- public byte[] getQualifierArray() {
- return keyOnlyBuffer;
+ public short getRowLength() {
+ return this.rowLength;
}
@Override
- public int getRowOffset() {
- return Bytes.SIZEOF_SHORT;
+ public byte[] getFamilyArray() {
+ return this.keyBuffer.array();
}
@Override
- public short getRowLength() {
- return rowLength;
+ public int getFamilyOffset() {
+ return getFamilyPositionInByteBuffer();
}
@Override
- public int getFamilyOffset() {
- return familyOffset;
+ public byte getFamilyLength() {
+ return this.familyLength;
}
@Override
- public byte getFamilyLength() {
- return familyLength;
+ public byte[] getQualifierArray() {
+ return this.keyBuffer.array();
}
@Override
public int getQualifierOffset() {
- return qualifierOffset;
+ return getQualifierPositionInByteBuffer();
}
@Override
public int getQualifierLength() {
- return qualifierLength;
+ return this.qualifierLength;
}
@Override
public long getTimestamp() {
- return timestamp;
+ return this.timestamp;
}
@Override
public byte getTypeByte() {
- return typeByte;
+ return this.typeByte;
}
@Override
public long getSequenceId() {
- return seqId;
+ return this.seqId;
}
@Override
public byte[] getValueArray() {
- return currentBuffer.array();
+ return CellUtil.cloneValue(this);
}
@Override
public int getValueOffset() {
- return currentBuffer.arrayOffset() + valueOffset;
+ return 0;
}
@Override
public int getValueLength() {
- return valueLength;
+ return this.valueLength;
}
@Override
public byte[] getTagsArray() {
- if (tagCompressionContext != null) {
- return cloneTagsBuffer;
- }
- return currentBuffer.array();
+ return CellUtil.cloneTags(this);
}
@Override
public int getTagsOffset() {
- if (tagCompressionContext != null) {
- return 0;
- }
- return currentBuffer.arrayOffset() + tagsOffset;
+ return 0;
}
@Override
public int getTagsLength() {
- return tagsLength;
+ return this.tagsLength;
}
@Override
- public String toString() {
- return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
- + getValueLength() + "/seqid=" + seqId;
+ public ByteBuffer getRowByteBuffer() {
+ return this.keyBuffer;
}
@Override
- public void setSequenceId(long seqId) {
- this.seqId = seqId;
+ public int getRowPositionInByteBuffer() {
+ return Bytes.SIZEOF_SHORT;
+ }
+
+ @Override
+ public ByteBuffer getFamilyByteBuffer() {
+ return this.keyBuffer;
+ }
+
+ @Override
+ public int getFamilyPositionInByteBuffer() {
+ return this.familyOffset;
+ }
+
+ @Override
+ public ByteBuffer getQualifierByteBuffer() {
+ return this.keyBuffer;
+ }
+
+ @Override
+ public int getQualifierPositionInByteBuffer() {
+ return this.qualifierOffset;
+ }
+
+ @Override
+ public ByteBuffer getValueByteBuffer() {
+ return this.valueBuffer;
+ }
+
+ @Override
+ public int getValuePositionInByteBuffer() {
+ return this.valueOffset;
+ }
+
+ @Override
+ public ByteBuffer getTagsByteBuffer() {
+ return this.tagsBuffer;
+ }
+
+ @Override
+ public int getTagsPositionInByteBuffer() {
+ return this.tagsOffset;
}
@Override
@@ -483,6 +646,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
@Override
+ public void setSequenceId(long seqId) {
+ this.seqId = seqId;
+ }
+
+ @Override
public int write(OutputStream out) throws IOException {
return write(out, true);
}
@@ -492,26 +660,19 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
tagsLength, withTags);
writeInt(out, lenToWrite);
- writeInt(out, keyOnlyBuffer.length);
+ writeInt(out, keyBuffer.capacity());
writeInt(out, valueLength);
// Write key
- out.write(keyOnlyBuffer);
+ out.write(keyBuffer.array());
// Write value
- assert this.currentBuffer.hasArray();
- out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.valueOffset,
- this.valueLength);
+ writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength);
if (withTags) {
// 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int.
// As this is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (this.tagsLength >> 8)));
out.write((byte) (0xff & this.tagsLength));
- if (this.tagCompressionContext != null) {
- out.write(cloneTagsBuffer);
- } else {
- out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + this.tagsOffset,
- this.tagsLength);
- }
+ writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
}
return lenToWrite + Bytes.SIZEOF_INT;
}
@@ -527,16 +688,31 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
}
+ private static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length)
+ throws IOException {
+ // We have write which takes ByteBuffer in ByteBufferOutputStream so that it can directly write
+ // bytes from the src ByteBuffer to the destination ByteBuffer. This avoid need for temp array
+ // creation and copy
+ if (out instanceof ByteBufferOutputStream) {
+ ((ByteBufferOutputStream) out).write(b, offset, length);
+ } else {
+ ByteBufferUtils.copyBufferToStream(out, b, offset, length);
+ }
+ }
+
protected abstract static class
BufferedEncodedSeeker
implements EncodedSeeker {
protected HFileBlockDecodingContext decodingCtx;
protected final CellComparator comparator;
- protected ByteBuffer currentBuffer;
- protected STATE current = createSeekerState(); // always valid
- protected STATE previous = createSeekerState(); // may not be valid
+ protected ByteBuff currentBuffer;
protected TagCompressionContext tagCompressionContext = null;
protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
+ // A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
+ // many object creations.
+ protected final Pair tmpPair = new Pair();
+ protected STATE current = createSeekerState(); // always valid
+ protected STATE previous = createSeekerState(); // may not be valid
public BufferedEncodedSeeker(CellComparator comparator,
HFileBlockDecodingContext decodingCtx) {
@@ -566,7 +742,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
@Override
- public void setCurrentBuffer(ByteBuffer buffer) {
+ public void setCurrentBuffer(ByteBuff buffer) {
if (this.tagCompressionContext != null) {
this.tagCompressionContext.clear();
}
@@ -589,9 +765,10 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
@Override
public ByteBuffer getValueShallowCopy() {
- ByteBuffer dup = currentBuffer.duplicate();
- dup.position(current.valueOffset);
- dup.limit(current.valueOffset + current.valueLength);
+ currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
+ ByteBuffer dup = tmpPair.getFirst().duplicate();
+ dup.position(tmpPair.getSecond());
+ dup.limit(tmpPair.getSecond() + current.valueLength);
return dup.slice();
}
@@ -601,8 +778,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
kvBuffer.putInt(current.keyLength);
kvBuffer.putInt(current.valueLength);
kvBuffer.put(current.keyBuffer, 0, current.keyLength);
- ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.valueOffset,
- current.valueLength);
+ currentBuffer.get(kvBuffer, current.valueOffset, current.valueLength);
if (current.tagsLength > 0) {
// Put short as unsigned
kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
@@ -610,8 +786,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
if (current.tagsOffset != -1) {
// the offset of the tags bytes in the underlying buffer is marked. So the temp
// buffer,tagsBuffer was not been used.
- ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.tagsOffset,
- current.tagsLength);
+ currentBuffer.get(kvBuffer, current.tagsOffset, current.tagsLength);
} else {
// When tagsOffset is marked as -1, tag compression was present and so the tags were
// uncompressed into temp buffer, tagsBuffer. Let us copy it from there
@@ -631,7 +806,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
@Override
public Cell getKeyValue() {
- return current.shallowCopy();
+ return current.toCell();
}
@Override
@@ -657,7 +832,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
protected void decodeTags() {
- current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
if (tagCompressionContext != null) {
if (current.uncompressTags) {
// Tag compression is been used. uncompress it into tagsBuffer
@@ -669,7 +844,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
throw new RuntimeException("Exception while uncompressing tags", e);
}
} else {
- ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
+ currentBuffer.skip(current.tagsCompressedLength);
current.uncompressTags = true;// Reset this.
}
current.tagsOffset = -1;
@@ -677,7 +852,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
// When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
// Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
current.tagsOffset = currentBuffer.position();
- ByteBufferUtils.skip(currentBuffer, current.tagsLength);
+ currentBuffer.skip(current.tagsLength);
}
}
@@ -844,7 +1019,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
protected STATE createSeekerState() {
// This will fail for non-default seeker state if the subclass does not
// override this method.
- return (STATE) new SeekerState();
+ return (STATE) new SeekerState(this.tmpPair);
}
abstract protected void decodeFirst();
@@ -1017,4 +1192,13 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
encodingCtx.postEncoding(BlockType.DATA);
}
}
+
+ protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) {
+ if (key.hasArray()) {
+ return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(),
+ keyLength);
+ } else {
+ return new OffheapKeyOnlyKeyValue(key, key.position(), keyLength);
+ }
+ }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 662be29..9310f32 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -71,8 +71,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
int keyLength = block.getIntStrictlyForward(Bytes.SIZEOF_INT);
int pos = 3 * Bytes.SIZEOF_INT;
ByteBuffer key = block.asSubByteBuffer(pos + keyLength).duplicate();
- // TODO : to be changed here for BBCell
- return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + pos, keyLength);
+ return createFirstKeyCell(key, keyLength);
}
@Override
@@ -91,14 +90,14 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
current.ensureSpaceForKey();
currentBuffer.get(current.keyBuffer, 0, current.keyLength);
current.valueOffset = currentBuffer.position();
- ByteBufferUtils.skip(currentBuffer, current.valueLength);
+ currentBuffer.skip(current.valueLength);
if (includesTags()) {
// Read short as unsigned, high byte first
current.tagsLength = ((currentBuffer.get() & 0xff) << 8) ^ (currentBuffer.get() & 0xff);
- ByteBufferUtils.skip(currentBuffer, current.tagsLength);
+ currentBuffer.skip(current.tagsLength);
}
if (includesMvcc()) {
- current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+ current.memstoreTS = ByteBuff.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
@@ -107,7 +106,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder {
@Override
protected void decodeFirst() {
- ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+ currentBuffer.skip(Bytes.SIZEOF_INT);
current.lastCommonPrefix = 0;
decodeNext();
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
index ce71308..38c6633 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
@@ -138,7 +138,7 @@ public interface DataBlockEncoder {
* Set on which buffer there will be done seeking.
* @param buffer Used for seeking.
*/
- void setCurrentBuffer(ByteBuffer buffer);
+ void setCurrentBuffer(ByteBuff buffer);
/**
* From the current position creates a cell using the key part
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index 90b8e6e..cecb594 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Compress using:
@@ -362,9 +363,14 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
protected static class DiffSeekerState extends SeekerState {
+
private int rowLengthWithSize;
private long timestamp;
+ public DiffSeekerState(Pair tmpPair) {
+ super(tmpPair);
+ }
+
@Override
protected void copyFromNext(SeekerState that) {
super.copyFromNext(that);
@@ -389,14 +395,12 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
if (!isFirst) {
type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
}
- current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
- current.valueLength =
- ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
}
- current.lastCommonPrefix =
- ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
current.ensureSpaceForKey();
@@ -446,8 +450,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
int timestampFitInBytes = 1 +
((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
- long timestampOrDiff =
- ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
+ long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes);
if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
timestampOrDiff = -timestampOrDiff;
}
@@ -467,13 +470,13 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
current.valueOffset = currentBuffer.position();
- ByteBufferUtils.skip(currentBuffer, current.valueLength);
+ currentBuffer.skip(current.valueLength);
if (includesTags()) {
decodeTags();
}
if (includesMvcc()) {
- current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+ current.memstoreTS = ByteBuff.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
@@ -482,7 +485,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected void decodeFirst() {
- ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+ currentBuffer.skip(Bytes.SIZEOF_INT);
// read column family
byte familyNameLength = currentBuffer.get();
@@ -500,7 +503,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected DiffSeekerState createSeekerState() {
- return new DiffSeekerState();
+ return new DiffSeekerState(this.tmpPair);
}
};
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index fa4adbd..6855a04 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
/**
* Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
@@ -364,8 +365,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
ByteBuff.readCompressedInt(block); // commonLength
ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
block.reset();
- // TODO : Change to BBCell.
- return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
+ return createFirstKeyCell(key, keyLength);
}
@Override
@@ -379,6 +379,10 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
private int rowLengthWithSize;
private int familyLengthWithSize;
+ public FastDiffSeekerState(Pair tmpPair) {
+ super(tmpPair);
+ }
+
@Override
protected void copyFromNext(SeekerState that) {
super.copyFromNext(that);
@@ -404,14 +408,12 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
current.prevTimestampAndType, 0,
current.prevTimestampAndType.length);
}
- current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
- current.valueLength =
- ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
}
- current.lastCommonPrefix =
- ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
current.ensureSpaceForKey();
@@ -491,14 +493,14 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
// handle value
if ((flag & FLAG_SAME_VALUE) == 0) {
current.valueOffset = currentBuffer.position();
- ByteBufferUtils.skip(currentBuffer, current.valueLength);
+ currentBuffer.skip(current.valueLength);
}
if (includesTags()) {
decodeTags();
}
if (includesMvcc()) {
- current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+ current.memstoreTS = ByteBuff.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
@@ -507,7 +509,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected void decodeFirst() {
- ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+ currentBuffer.skip(Bytes.SIZEOF_INT);
decode(true);
}
@@ -518,7 +520,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected FastDiffSeekerState createSeekerState() {
- return new FastDiffSeekerState();
+ return new FastDiffSeekerState(this.tmpPair);
}
};
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index 6e89de4..6f5acd5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -186,8 +186,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
}
ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
block.reset();
- // TODO : Change to BBCell
- return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength);
+ return createFirstKeyCell(key, keyLength);
}
@Override
@@ -201,21 +200,20 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
return new BufferedEncodedSeeker(comparator, decodingCtx) {
@Override
protected void decodeNext() {
- current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
- current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
- current.lastCommonPrefix =
- ByteBufferUtils.readCompressedInt(currentBuffer);
+ current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
+ current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
+ current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
current.keyLength += current.lastCommonPrefix;
current.ensureSpaceForKey();
currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
current.keyLength - current.lastCommonPrefix);
current.valueOffset = currentBuffer.position();
- ByteBufferUtils.skip(currentBuffer, current.valueLength);
+ currentBuffer.skip(current.valueLength);
if (includesTags()) {
decodeTags();
}
if (includesMvcc()) {
- current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+ current.memstoreTS = ByteBuff.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
@@ -224,7 +222,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
@Override
protected void decodeFirst() {
- ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+ currentBuffer.skip(Bytes.SIZEOF_INT);
decodeNext();
}
};
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
index 0b442a5..6e13b44 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
@@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.io.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.base.Preconditions;
@@ -84,7 +84,7 @@ public class StreamUtils {
return result;
}
- public static int readRawVarint32(ByteBuffer input) throws IOException {
+ public static int readRawVarint32(ByteBuff input) throws IOException {
byte tmp = input.get();
if (tmp >= 0) {
return tmp;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 14e77a7..4398f5d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
/**
* An abstract class that abstracts out as to how the byte buffers are used,
@@ -435,4 +436,23 @@ public abstract class ByteBuff {
}
return tmpLength;
}
+
+ /**
+ * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
+ * {@link ByteBuff}.
+ */
+ public static long readVLong(ByteBuff in) {
+ byte firstByte = in.get();
+ int len = WritableUtils.decodeVIntSize(firstByte);
+ if (len == 1) {
+ return firstByte;
+ }
+ long i = 0;
+ for (int idx = 0; idx < len-1; idx++) {
+ byte b = in.get();
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+ }
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
index 841c468..f4c4afe 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -60,11 +61,11 @@ public class TestTagCompressionContext {
byte[] dest = new byte[tagsLength1];
ByteBuffer ob = ByteBuffer.wrap(baos.toByteArray());
- context.uncompressTags(ob, dest, 0, tagsLength1);
+ context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1);
assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
tagsLength1));
dest = new byte[tagsLength2];
- context.uncompressTags(ob, dest, 0, tagsLength2);
+ context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2);
assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
tagsLength2));
}
diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
index 93610c4..f6ceec3 100644
--- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
+++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.codec.prefixtree.decode.PrefixTreeArraySearcher;
import org.apache.hadoop.hbase.codec.prefixtree.scanner.CellScannerPosition;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -55,8 +56,8 @@ public class PrefixTreeSeeker implements EncodedSeeker {
}
@Override
- public void setCurrentBuffer(ByteBuffer fullBlockBuffer) {
- block = fullBlockBuffer;
+ public void setCurrentBuffer(ByteBuff fullBlockBuffer) {
+ block = fullBlockBuffer.asSubByteBuffer(fullBlockBuffer.limit());
// TODO : change to Bytebuff
ptSearcher = DecoderFactory.checkOut(block, includeMvccVersion);
rewind();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index ae2f6c1..4188c89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1454,8 +1454,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
+ DataBlockEncoding.getNameFromId(dataBlockEncoderId));
}
ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
- // TODO : Change the DBEs to work with ByteBuffs
- seeker.setCurrentBuffer(encodedBuffer.asSubByteBuffer(encodedBuffer.limit()));
+ seeker.setCurrentBuffer(encodedBuffer);
blockFetches++;
// Reset the next indexed key
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index e8b79a8..feec5f8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HRegionLocator;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
@@ -249,6 +248,19 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return Collections.unmodifiableList(configurations);
}
+ public static List