.../java/org/apache/hadoop/hbase/CellUtil.java | 200 ++++++++++++++++++--- .../io/encoding/BufferedDataBlockEncoder.java | 4 +- .../hbase/io/encoding/CopyKeyDataBlockEncoder.java | 4 +- .../hbase/io/encoding/DiffKeyDeltaEncoder.java | 10 +- .../hbase/io/encoding/FastDiffDeltaEncoder.java | 13 +- .../hbase/io/encoding/PrefixKeyDeltaEncoder.java | 9 +- .../apache/hadoop/hbase/util/ByteBufferUtils.java | 24 ++- .../hbase/util/test/RedundantKVGenerator.java | 155 ++++++++++++++++ .../hbase/io/hfile/NoOpDataBlockEncoder.java | 4 +- .../hbase/io/hfile/TestHFileDataBlockEncoder.java | 40 +++++ 10 files changed, 420 insertions(+), 43 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index fc53893..0d34137 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -595,26 +595,29 @@ public final class CellUtil { } public static boolean matchingValue(final Cell left, final Cell right) { - int lvlength = left.getValueLength(); - int rvlength = right.getValueLength(); + return matchingValue(left, right, left.getValueLength(), right.getValueLength()); + } + + public static boolean matchingValue(final Cell left, final Cell right, int lvlength, + int rvlength) { if (left instanceof ByteBufferedCell && right instanceof ByteBufferedCell) { return ByteBufferUtils.equals(((ByteBufferedCell) left).getValueByteBuffer(), - ((ByteBufferedCell) left).getValuePosition(), lvlength, - ((ByteBufferedCell) right).getValueByteBuffer(), - ((ByteBufferedCell) right).getValuePosition(), rvlength); + ((ByteBufferedCell) left).getValuePosition(), lvlength, + ((ByteBufferedCell) right).getValueByteBuffer(), + ((ByteBufferedCell) right).getValuePosition(), rvlength); } if (left instanceof ByteBufferedCell) { return ByteBufferUtils.equals(((ByteBufferedCell) left).getValueByteBuffer(), - ((ByteBufferedCell) left).getValuePosition(), lvlength, - right.getValueArray(), right.getValueOffset(), rvlength); + ((ByteBufferedCell) left).getValuePosition(), lvlength, right.getValueArray(), + right.getValueOffset(), rvlength); } if (right instanceof ByteBufferedCell) { return ByteBufferUtils.equals(((ByteBufferedCell) right).getValueByteBuffer(), - ((ByteBufferedCell) right).getValuePosition(), rvlength, - left.getValueArray(), left.getValueOffset(), lvlength); + ((ByteBufferedCell) right).getValuePosition(), rvlength, left.getValueArray(), + left.getValueOffset(), lvlength); } return Bytes.equals(left.getValueArray(), left.getValueOffset(), lvlength, - right.getValueArray(), right.getValueOffset(), rvlength); + right.getValueArray(), right.getValueOffset(), rvlength); } public static boolean matchingValue(final Cell left, final byte[] buf) { @@ -879,17 +882,147 @@ public final class CellUtil { */ public static void writeFlatKey(Cell cell, DataOutputStream out) throws IOException { short rowLen = cell.getRowLength(); - out.writeShort(rowLen); - out.write(cell.getRowArray(), cell.getRowOffset(), rowLen); byte fLen = cell.getFamilyLength(); - out.writeByte(fLen); - out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen); - out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + int qLen = cell.getQualifierLength(); + // Using just one if/else loop instead of every time checking before writing every + // component of cell + if (cell instanceof ByteBufferedCell) { + out.writeShort(rowLen); + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getRowByteBuffer(), + ((ByteBufferedCell) cell).getRowPosition(), rowLen); + out.writeByte(fLen); + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getFamilyByteBuffer(), + ((ByteBufferedCell) cell).getFamilyPosition(), fLen); + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getQualifierByteBuffer(), + ((ByteBufferedCell) cell).getQualifierPosition(), qLen); + } else { + out.writeShort(rowLen); + out.write(cell.getRowArray(), cell.getRowOffset(), rowLen); + out.writeByte(fLen); + out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen); + out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qLen); + } out.writeLong(cell.getTimestamp()); out.writeByte(cell.getTypeByte()); } /** + * Writes the row from the given cell to the output stream + * @param out The dataoutputstream to which the data has to be written + * @param cell The cell whose contents has to be written + * @param rlength the row length + * @throws IOException + */ + public static void writeRow(DataOutputStream out, Cell cell, short rlength) throws IOException { + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getRowByteBuffer(), + ((ByteBufferedCell) cell).getRowPosition(), rlength); + } else { + out.write(cell.getRowArray(), cell.getRowOffset(), rlength); + } + } + + /** + * Writes the row from the given cell to the output stream excluding the common prefix + * @param out The dataoutputstream to which the data has to be written + * @param cell The cell whose contents has to be written + * @param rlength the row length + * @throws IOException + */ + public static void writeRowSkippingBytes(DataOutputStream out, Cell cell, short rlength, + int commonPrefix) throws IOException { + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getRowByteBuffer(), + ((ByteBufferedCell) cell).getRowPosition() + commonPrefix, rlength - commonPrefix); + } else { + out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rlength - commonPrefix); + } + } + + /** + * Writes the family from the given cell to the output stream + * @param out The dataoutputstream to which the data has to be written + * @param cell The cell whose contents has to be written + * @param flength the family length + * @throws IOException + */ + public static void writeFamily(DataOutputStream out, Cell cell, byte flength) throws IOException { + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getFamilyByteBuffer(), + ((ByteBufferedCell) cell).getFamilyPosition(), flength); + } else { + out.write(cell.getFamilyArray(), cell.getFamilyOffset(), flength); + } + } + + /** + * Writes the qualifier from the given cell to the output stream + * @param out The dataoutputstream to which the data has to be written + * @param cell The cell whose contents has to be written + * @param qlength the qualifier length + * @throws IOException + */ + public static void writeQualifier(DataOutputStream out, Cell cell, int qlength) + throws IOException { + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getQualifierByteBuffer(), + ((ByteBufferedCell) cell).getQualifierPosition(), qlength); + } else { + out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qlength); + } + } + + /** + * Writes the qualifier from the given cell to the output stream excluding the common prefix + * @param out The dataoutputstream to which the data has to be written + * @param cell The cell whose contents has to be written + * @param qlength the qualifier length + * @throws IOException + */ + public static void writeQualifierSkippingBytes(DataOutputStream out, Cell cell, + int qlength, int commonPrefix) throws IOException { + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getQualifierByteBuffer(), + ((ByteBufferedCell) cell).getQualifierPosition() + commonPrefix, qlength - commonPrefix); + } else { + out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonPrefix, + qlength - commonPrefix); + } + } + + /** + * Writes the value from the given cell to the output stream + * @param out The dataoutputstream to which the data has to be written + * @param cell The cell whose contents has to be written + * @param vlength the value length + * @throws IOException + */ + public static void writeValue(DataOutputStream out, Cell cell, int vlength) throws IOException { + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getValueByteBuffer(), + ((ByteBufferedCell) cell).getValuePosition(), vlength); + } else { + out.write(cell.getValueArray(), cell.getValueOffset(), vlength); + } + } + + /** + * Writes the tag from the given cell to the output stream + * @param out The dataoutputstream to which the data has to be written + * @param cell The cell whose contents has to be written + * @param tagsLength the tag length + * @throws IOException + */ + public static void writeTags(DataOutputStream out, Cell cell, int tagsLength) throws IOException { + if (cell instanceof ByteBufferedCell) { + ByteBufferUtils.copyBufferToStream(out, ((ByteBufferedCell) cell).getTagsByteBuffer(), + ((ByteBufferedCell) cell).getTagsPosition(), tagsLength); + } else { + out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + } + } + + /** * @param cell * @return The Key portion of the passed cell as a String. */ @@ -951,7 +1084,7 @@ public final class CellUtil { commonPrefix -= KeyValue.ROW_LENGTH_SIZE; } if (rLen > commonPrefix) { - out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rLen - commonPrefix); + writeRowSkippingBytes(out, cell, rLen, commonPrefix); } } @@ -982,8 +1115,18 @@ public final class CellUtil { Bytes.toBytes(rLen2), 0, KeyValue.ROW_LENGTH_SIZE); } // Compare the RKs - int rkCommonPrefix = ByteBufferUtils.findCommonPrefix(c1.getRowArray(), c1.getRowOffset(), + int rkCommonPrefix = 0; + if (c1 instanceof ByteBufferedCell && c2 instanceof ByteBufferedCell) { + rkCommonPrefix = ByteBufferUtils.findCommonPrefix(((ByteBufferedCell) c1).getRowByteBuffer(), + ((ByteBufferedCell) c1).getRowPosition(), rLen1, ((ByteBufferedCell) c2).getRowByteBuffer(), + ((ByteBufferedCell) c2).getRowPosition(), rLen2); + } else { + // There cannot be a case where one cell is BBCell and other is KeyValue. This flow comes either + // in flush or compactions. In flushes both cells are KV and in case of compaction it will be either + // KV or BBCell + rkCommonPrefix = ByteBufferUtils.findCommonPrefix(c1.getRowArray(), c1.getRowOffset(), rLen1, c2.getRowArray(), c2.getRowOffset(), rLen2); + } commonPrefix += rkCommonPrefix; if (rkCommonPrefix != rLen1) { // Early out when RK is not fully matching. @@ -1004,8 +1147,17 @@ public final class CellUtil { // CF lengths are same so there is one more byte common in key part commonPrefix += KeyValue.FAMILY_LENGTH_SIZE; // Compare the CF names - int fCommonPrefix = ByteBufferUtils.findCommonPrefix(c1.getFamilyArray(), - c1.getFamilyOffset(), fLen1, c2.getFamilyArray(), c2.getFamilyOffset(), fLen2); + int fCommonPrefix; + if (c1 instanceof ByteBufferedCell && c2 instanceof ByteBufferedCell) { + fCommonPrefix = + ByteBufferUtils.findCommonPrefix(((ByteBufferedCell) c1).getFamilyByteBuffer(), + ((ByteBufferedCell) c1).getFamilyPosition(), fLen1, + ((ByteBufferedCell) c2).getFamilyByteBuffer(), + ((ByteBufferedCell) c2).getFamilyPosition(), fLen2); + } else { + fCommonPrefix = ByteBufferUtils.findCommonPrefix(c1.getFamilyArray(), c1.getFamilyOffset(), + fLen1, c2.getFamilyArray(), c2.getFamilyOffset(), fLen2); + } commonPrefix += fCommonPrefix; if (fCommonPrefix != fLen1) { return commonPrefix; @@ -1014,8 +1166,16 @@ public final class CellUtil { // Compare the Qualifiers int qLen1 = c1.getQualifierLength(); int qLen2 = c2.getQualifierLength(); - int qCommon = ByteBufferUtils.findCommonPrefix(c1.getQualifierArray(), c1.getQualifierOffset(), + int qCommon; + if (c1 instanceof ByteBufferedCell && c2 instanceof ByteBufferedCell) { + qCommon = ByteBufferUtils.findCommonPrefix(((ByteBufferedCell) c1).getQualifierByteBuffer(), + ((ByteBufferedCell) c1).getQualifierPosition(), qLen1, + ((ByteBufferedCell) c2).getQualifierByteBuffer(), + ((ByteBufferedCell) c2).getQualifierPosition(), qLen2); + } else { + qCommon = ByteBufferUtils.findCommonPrefix(c1.getQualifierArray(), c1.getQualifierOffset(), qLen1, c2.getQualifierArray(), c2.getQualifierOffset(), qLen2); + } commonPrefix += qCommon; if (!withTsType || Math.max(qLen1, qLen2) != qCommon) { return commonPrefix; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 8919d01..112f258 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -1002,10 +1002,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { // When tag compression is enabled, tagCompressionContext will have a not null value. Write // the tags using Dictionary compression in such a case if (tagCompressionContext != null) { + // TODO : Make Dictionary interface to work with BBs and then change the corresponding + // compress tags code to work with BB tagCompressionContext .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength); } else { - out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + CellUtil.writeTags(out, cell, tagsLength); } } size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java index de2da5a..178f65d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java @@ -48,14 +48,14 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { out.writeInt(klength); out.writeInt(vlength); CellUtil.writeFlatKey(cell, out); - out.write(cell.getValueArray(), cell.getValueOffset(), vlength); + CellUtil.writeValue(out, cell, vlength); int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; // Write the additional tag into the stream if (encodingContext.getHFileContext().isIncludesTags()) { int tagsLength = cell.getTagsLength(); out.writeShort(tagsLength); if (tagsLength > 0) { - out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + CellUtil.writeTags(out, cell, tagsLength); } size += tagsLength + KeyValue.TAGS_LENGTH_SIZE; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java index 0542277..fe9e518 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java @@ -229,7 +229,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { // put column family byte familyLength = cell.getFamilyLength(); out.write(familyLength); - out.write(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength); + CellUtil.writeFamily(out, cell, familyLength); } else { // Finding common prefix int preKeyLength = KeyValueUtil.keyLength(prevCell); @@ -282,7 +282,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { // Previous and current rows are different. Copy the differing part of // the row, skip the column family, and copy the qualifier. CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); - out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + CellUtil.writeQualifier(out, cell, cell.getQualifierLength()); } else { // The common part includes the whole row. As the column family is the // same across the whole file, it will automatically be included in the @@ -290,8 +290,8 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { // What we write here is the non common part of the qualifier int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); - out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix, - cell.getQualifierLength() - commonQualPrefix); + CellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(), + commonQualPrefix); } if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes); @@ -302,7 +302,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { if ((flag & FLAG_SAME_TYPE) == 0) { out.write(cell.getTypeByte()); } - out.write(cell.getValueArray(), cell.getValueOffset(), vLength); + CellUtil.writeValue(out, cell, vLength); return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java index 50794e6..b1f1965 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java @@ -264,7 +264,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { ByteBufferUtils.putCompressedInt(out, 0); CellUtil.writeFlatKey(cell, out); // Write the value part - out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + CellUtil.writeValue(out, cell, cell.getValueLength()); } else { int preKeyLength = KeyValueUtil.keyLength(prevCell); int preValLength = prevCell.getValueLength(); @@ -290,8 +290,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { // Check if current and previous values are the same. Compare value // length first as an optimization. if (vLength == preValLength - && Bytes.equals(cell.getValueArray(), cell.getValueOffset(), vLength, - prevCell.getValueArray(), prevCell.getValueOffset(), preValLength)) { + && CellUtil.matchingValue(cell, prevCell, vLength, preValLength)) { flag |= FLAG_SAME_VALUE; } @@ -308,7 +307,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { // Previous and current rows are different. Copy the differing part of // the row, skip the column family, and copy the qualifier. CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); - out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + CellUtil.writeQualifier(out, cell, cell.getQualifierLength()); } else { // The common part includes the whole row. As the column family is the // same across the whole file, it will automatically be included in the @@ -316,8 +315,8 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { // What we write here is the non common part of the qualifier int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); - out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix, - cell.getQualifierLength() - commonQualPrefix); + CellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(), + commonQualPrefix); } // Write non common ts part out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix); @@ -329,7 +328,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { // Write the value if it is not the same as before. if ((flag & FLAG_SAME_VALUE) == 0) { - out.write(cell.getValueArray(), cell.getValueOffset(), vLength); + CellUtil.writeValue(out, cell, vLength); } } return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java index 6f5acd5..842894f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java @@ -69,7 +69,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { writeKeyExcludingCommon(cell, common, out); } // Write the value part - out.write(cell.getValueArray(), cell.getValueOffset(), vlength); + CellUtil.writeValue(out, cell, vlength); int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; size += afterEncodingKeyValue(cell, out, encodingContext); state.prevCell = cell; @@ -85,8 +85,8 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { CellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); byte fLen = cell.getFamilyLength(); out.writeByte(fLen); - out.write(cell.getFamilyArray(), cell.getFamilyOffset(), fLen); - out.write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + CellUtil.writeFamily(out, cell, fLen); + CellUtil.writeQualifier(out, cell, cell.getQualifierLength()); out.writeLong(cell.getTimestamp()); out.writeByte(cell.getTypeByte()); } else { @@ -98,8 +98,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { int commonQualPrefix = Math.min(commonPrefix, qLen); int qualPartLenToWrite = qLen - commonQualPrefix; if (qualPartLenToWrite > 0) { - out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonQualPrefix, - qualPartLenToWrite); + CellUtil.writeQualifierSkippingBytes(out, cell, qLen, commonQualPrefix); } commonPrefix -= commonQualPrefix; // Common part in TS also? diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 5290d5e..24105ab 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -183,7 +183,7 @@ public final class ByteBufferUtils { length); } else { for (int i = 0; i < length; ++i) { - out.write(in.get(offset + i)); + out.write(toByte(in, offset + i)); } } } @@ -470,6 +470,28 @@ public final class ByteBufferUtils { } /** + * Find length of common prefix in two arrays. + * @param left ByteBuffer to be compared. + * @param leftOffset Offset in left ByteBuffer. + * @param leftLength Length of left ByteBuffer. + * @param right ByteBuffer to be compared. + * @param rightOffset Offset in right ByteBuffer. + * @param rightLength Length of right ByteBuffer. + */ + public static int findCommonPrefix(ByteBuffer left, int leftOffset, int leftLength, + ByteBuffer right, int rightOffset, int rightLength) { + int length = Math.min(leftLength, rightLength); + int result = 0; + + while (result < length && ByteBufferUtils.toByte(left, leftOffset + result) == ByteBufferUtils + .toByte(right, rightOffset + result)) { + result++; + } + + return result; + } + + /** * Check whether two parts in the same buffer are equal. * @param buffer In which buffer there are parts * @param offsetLeft Beginning of first part. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/RedundantKVGenerator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/RedundantKVGenerator.java index fa98f70..b44a724 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/RedundantKVGenerator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/RedundantKVGenerator.java @@ -24,8 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferUtils; @@ -291,6 +293,159 @@ public class RedundantKVGenerator { } /** + * Generate test data useful to test encoders. + * @param howMany How many Key values should be generated. + * @return sorted list of key values + */ + public List generateTestExtendedOffheapKeyValues(int howMany, boolean useTags) { + List result = new ArrayList(); + List rows = generateRows(); + Map> rowsToQualifier = new HashMap>(); + + if (family == null) { + family = new byte[columnFamilyLength]; + randomizer.nextBytes(family); + } + + long baseTimestamp = Math.abs(randomizer.nextInt()) / baseTimestampDivide; + + byte[] value = new byte[valueLength]; + + for (int i = 0; i < howMany; ++i) { + long timestamp = baseTimestamp; + if(timestampDiffSize > 0){ + timestamp += randomizer.nextInt(timestampDiffSize); + } + Integer rowId = randomizer.nextInt(rows.size()); + byte[] row = rows.get(rowId); + + // generate qualifier, sometimes it is same, sometimes similar, + // occasionally completely different + byte[] qualifier; + float qualifierChance = randomizer.nextFloat(); + if (!rowsToQualifier.containsKey(rowId) + || qualifierChance > chanceForSameQualifier + chanceForSimilarQualifier) { + int qualifierLength = averageQualifierLength; + qualifierLength += randomizer.nextInt(2 * qualifierLengthVariance + 1) + - qualifierLengthVariance; + qualifier = new byte[qualifierLength]; + randomizer.nextBytes(qualifier); + + // add it to map + if (!rowsToQualifier.containsKey(rowId)) { + rowsToQualifier.put(rowId, new ArrayList()); + } + rowsToQualifier.get(rowId).add(qualifier); + } else if (qualifierChance > chanceForSameQualifier) { + // similar qualifier + List previousQualifiers = rowsToQualifier.get(rowId); + byte[] originalQualifier = previousQualifiers.get(randomizer.nextInt(previousQualifiers + .size())); + + qualifier = new byte[originalQualifier.length]; + int commonPrefix = randomizer.nextInt(qualifier.length); + System.arraycopy(originalQualifier, 0, qualifier, 0, commonPrefix); + for (int j = commonPrefix; j < qualifier.length; ++j) { + qualifier[j] = (byte) (randomizer.nextInt() & 0xff); + } + + rowsToQualifier.get(rowId).add(qualifier); + } else { + // same qualifier + List previousQualifiers = rowsToQualifier.get(rowId); + qualifier = previousQualifiers.get(randomizer.nextInt(previousQualifiers.size())); + } + + if (randomizer.nextFloat() < chanceForZeroValue) { + for (int j = 0; j < value.length; ++j) { + value[j] = (byte) 0; + } + } else { + randomizer.nextBytes(value); + } + if (useTags) { + KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp, value, + new Tag[] { new Tag((byte) 1, "value1") }); + ByteBuffer offheapKVBB = ByteBuffer.allocateDirect(keyValue.getLength()); + ByteBufferUtils.copyFromArrayToBuffer(offheapKVBB, keyValue.getBuffer(), + keyValue.getOffset(), keyValue.getLength()); + OffheapKeyValue offheapKV = + new ExtendedOffheapKeyValue(offheapKVBB, 0, keyValue.getLength(), true, 0); + result.add(offheapKV); + } else { + KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp, value); + ByteBuffer offheapKVBB = ByteBuffer.allocateDirect(keyValue.getLength()); + ByteBufferUtils.copyFromArrayToBuffer(offheapKVBB, keyValue.getBuffer(), + keyValue.getOffset(), keyValue.getLength()); + OffheapKeyValue offheapKV = + new ExtendedOffheapKeyValue(offheapKVBB, 0, keyValue.getLength(), false, 0); + result.add(offheapKV); + } + } + + Collections.sort(result, CellComparator.COMPARATOR); + + return result; + } + + static class ExtendedOffheapKeyValue extends OffheapKeyValue { + public ExtendedOffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags, + long seqId) { + super(buf, offset, length, hasTags, seqId); + } + + @Override + public byte[] getRowArray() { + throw new IllegalArgumentException("getRowArray operation is not allowed"); + } + + @Override + public int getRowOffset() { + throw new IllegalArgumentException("getRowOffset operation is not allowed"); + } + + @Override + public byte[] getFamilyArray() { + throw new IllegalArgumentException("getFamilyArray operation is not allowed"); + } + + @Override + public int getFamilyOffset() { + throw new IllegalArgumentException("getFamilyOffset operation is not allowed"); + } + + @Override + public byte[] getQualifierArray() { + throw new IllegalArgumentException("getQualifierArray operation is not allowed"); + } + + @Override + public int getQualifierOffset() { + throw new IllegalArgumentException("getQualifierOffset operation is not allowed"); + } + + @Override + public byte[] getValueArray() { + throw new IllegalArgumentException("getValueArray operation is not allowed"); + } + + @Override + public int getValueOffset() { + throw new IllegalArgumentException("getValueOffset operation is not allowed"); + } + + @Override + public byte[] getTagsArray() { + throw new IllegalArgumentException("getTagsArray operation is not allowed"); + } + + @Override + public int getTagsOffset() { + throw new IllegalArgumentException("getTagsOffset operation is not allowed"); + } + } + + /** * Convert list of KeyValues to byte buffer. * @param keyValues list of KeyValues to be converted. * @return buffer with content from key values diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java index f75f6e9..f5e2b61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpDataBlockEncoder.java @@ -53,14 +53,14 @@ public class NoOpDataBlockEncoder implements HFileDataBlockEncoder { out.writeInt(klength); out.writeInt(vlength); CellUtil.writeFlatKey(cell, out); - out.write(cell.getValueArray(), cell.getValueOffset(), vlength); + CellUtil.writeValue(out, cell, vlength); int encodedKvSize = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; // Write the additional tag into the stream if (encodingCtx.getHFileContext().isIncludesTags()) { int tagsLength = cell.getTagsLength(); out.writeShort(tagsLength); if (tagsLength > 0) { - out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + CellUtil.writeTags(out, cell, tagsLength); } encodedKvSize += tagsLength + KeyValue.TAGS_LENGTH_SIZE; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java index 3cdc92b..2523a8c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileDataBlockEncoder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -27,6 +28,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.HeapSize; @@ -141,6 +143,29 @@ public class TestHFileDataBlockEncoder { testEncodingInternals(true); } + /** + * Test encoding with offheap keyvalue. This test just verifies if the encoders + * work with DBB and does not use the getXXXArray() API + * @throws IOException + */ + @Test + public void testEncodingWithOffheapKeyValue() throws IOException { + // usually we have just block without headers, but don't complicate that + if(blockEncoder.getDataBlockEncoding() == DataBlockEncoding.PREFIX_TREE) { + // This is a TODO: Only after PrefixTree is fixed we can remove this check + return; + } + try { + List kvs = generator.generateTestExtendedOffheapKeyValues(60, true); + HFileContext meta = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS) + .withIncludesTags(true).withHBaseCheckSum(true).withCompression(Algorithm.NONE) + .withBlockSize(0).withChecksumType(ChecksumType.NULL).build(); + writeBlock(kvs, meta, true); + } catch (IllegalArgumentException e) { + fail("No exception should have been thrown"); + } + } + private void testEncodingInternals(boolean useTag) throws IOException { // usually we have just block without headers, but don't complicate that List kvs = generator.generateTestKeyValues(60, useTag); @@ -201,6 +226,21 @@ public class TestHFileDataBlockEncoder { HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), block.getHFileContext()); } + private void writeBlock(List kvs, HFileContext fileContext, boolean useTags) + throws IOException { + HFileBlockEncodingContext context = new HFileBlockDefaultEncodingContext( + blockEncoder.getDataBlockEncoding(), HConstants.HFILEBLOCK_DUMMY_HEADER, + fileContext); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER); + DataOutputStream dos = new DataOutputStream(baos); + blockEncoder.startBlockEncoding(context, dos); + for (Cell kv : kvs) { + blockEncoder.encode(kv, context, dos); + } + } + /** * @return All possible data block encoding configurations */