.../hadoop/hbase/io/TagCompressionContext.java | 39 +++++++++++++ .../io/encoding/BufferedDataBlockEncoder.java | 9 ++- .../apache/hadoop/hbase/io/util/Dictionary.java | 27 +++++++++ .../apache/hadoop/hbase/io/util/LRUDictionary.java | 34 ++++++++++++ .../apache/hadoop/hbase/util/ByteBufferUtils.java | 34 ++++++++++++ .../hadoop/hbase/io/TestTagCompressionContext.java | 64 ++++++++++++++++++++++ 6 files changed, 205 insertions(+), 2 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java index 05c4ad1..d477c3d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java @@ -94,6 +94,31 @@ public class TagCompressionContext { } /** + * Compress tags one by one and writes to the OutputStream. + * @param out Stream to which the compressed tags to be written + * @param in Source buffer where tags are available + * @param offset Offset for the tags byte buffer + * @param length Length of all tag bytes + * @throws IOException + */ + public void compressTags(OutputStream out, ByteBuffer in, int offset, int length) throws IOException { + if (in.hasArray()) { + compressTags(out, in.array(), offset + in.position(), length); + ByteBufferUtils.skip(in, length); + } else { + int pos = offset; + int endOffset = pos + length; + assert pos < endOffset; + while (pos < endOffset) { + int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE); + pos += Tag.TAG_LENGTH_SIZE; + write(in, pos, tagLen, out); + pos += tagLen; + } + } + } + + /** * Uncompress tags from the InputStream and writes to the destination array. * @param src Stream where the compressed tags are available * @param dest Destination array where to write the uncompressed tags @@ -192,4 +217,18 @@ public class TagCompressionContext { StreamUtils.writeShort(out, dictIdx); } } + + private void write(ByteBuffer data, int offset, int length, OutputStream out) throws IOException { + short dictIdx = Dictionary.NOT_IN_DICTIONARY; + if (tagDict != null) { + dictIdx = tagDict.findEntry(data, offset, length); + } + if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { + out.write(Dictionary.NOT_IN_DICTIONARY); + StreamUtils.writeRawVInt32(out, length); + ByteBufferUtils.writeByteBuffer(out, data, offset, length); + } else { + StreamUtils.writeShort(out, dictIdx); + } + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 112f258..85bc5e9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -1004,8 +1004,13 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { if (tagCompressionContext != null) { // TODO : Make Dictionary interface to work with BBs and then change the corresponding // compress tags code to work with BB - tagCompressionContext - .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + if (cell instanceof ByteBufferedCell) { + tagCompressionContext.compressTags(out, ((ByteBufferedCell) cell).getTagsByteBuffer(), + ((ByteBufferedCell) cell).getTagsPosition(), tagsLength); + } else { + tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), + tagsLength); + } } else { CellUtil.writeTags(out, cell, tagsLength); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java index 4a3d42f..d7930e6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.io.util; +import java.nio.ByteBuffer; + import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -51,6 +53,17 @@ public interface Dictionary { short findEntry(byte[] data, int offset, int length); /** + * Finds the index of an entry. + * If no entry found, we add it. + * + * @param data the ByteBuffer that we're looking up + * @param offset Offset into data to add to Dictionary. + * @param length Length beyond offset that comprises entry; must be > 0. + * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found + */ + short findEntry(ByteBuffer data, int offset, int length); + + /** * Adds an entry to the dictionary. * Be careful using this method. It will add an entry to the * dictionary even if it already has an entry for the same data. @@ -66,6 +79,20 @@ public interface Dictionary { short addEntry(byte[] data, int offset, int length); /** + * Adds an entry to the dictionary. + * Be careful using this method. It will add an entry to the + * dictionary even if it already has an entry for the same data. + * Call {{@link #findEntry(ByteBuffer, int, int)}} to add without duplicating + * dictionary entries. + * + * @param data the entry to add + * @param offset Offset into data to add to Dictionary. + * @param length Length beyond offset that comprises entry; must be > 0. + * @return the index of the entry + */ + short addEntry(ByteBuffer data, int offset, int length); + + /** * Flushes the dictionary, empties all values. */ void clear(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java index 8562cf0..26e94c4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hbase.io.util; +import java.nio.ByteBuffer; import java.util.HashMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import com.google.common.base.Preconditions; @@ -95,6 +97,10 @@ public class LRUDictionary implements Dictionary { byte[] stored = new byte[length]; Bytes.putBytes(stored, 0, array, offset, length); + return putInternal(stored); + } + + private short putInternal(byte[] stored) { if (currSize < initSize) { // There is space to add without evicting. if (indexToNode[currSize] == null) { @@ -115,6 +121,13 @@ public class LRUDictionary implements Dictionary { } } + private short put(ByteBuffer buf, int offset, int length) { + // Copy the contents + byte[] stored = new byte[length]; + ByteBufferUtils.copyFromBufferToArray(stored, buf, offset, 0, length); + return putInternal(stored); + } + private short findIdx(byte[] array, int offset, int length) { Short s; final Node comparisonNode = new Node(); @@ -127,6 +140,12 @@ public class LRUDictionary implements Dictionary { } } + private short findIdx(ByteBuffer buf, int offset, int length) { + byte[] stored = new byte[length]; + ByteBufferUtils.copyFromBufferToArray(stored, buf, offset, 0, length); + return findIdx(stored, 0, length); + } + private byte[] get(short idx) { Preconditions.checkElementIndex(idx, currSize); moveToHead(indexToNode[idx]); @@ -216,4 +235,19 @@ public class LRUDictionary implements Dictionary { } } } + + @Override + public short findEntry(ByteBuffer data, int offset, int length) { + short ret = backingStore.findIdx(data, offset, length); + if (ret == NOT_IN_DICTIONARY) { + addEntry(data, offset, length); + } + return ret; + } + + @Override + public short addEntry(ByteBuffer data, int offset, int length) { + if (length <= 0) return NOT_IN_DICTIONARY; + return backingStore.put(data, offset, length); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 7bcc872..889efef 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -316,6 +316,28 @@ public final class ByteBufferUtils { } /** + * Converts a ByteBuffer to an int value + * @param buf byte buffer + * @param offset offset into array + * @param length how many bytes should be considered for creating int + * @return the int value + * @throws IllegalArgumentException if there's not enough room in the buffer at the offset + * indicated. + */ + public static int readAsInt(ByteBuffer buf, int offset, final int length) { + if (offset + length > buf.limit()) { + throw new IllegalArgumentException("offset (" + offset + ") + length (" + length + + ") exceed the" + " capacity of the array: " + buf.limit()); + } + int n = 0; + for (int i = offset; i < (offset + length); i++) { + n <<= 8; + n ^= ByteBufferUtils.toByte(buf, i) & 0xFF; + } + return n; + } + + /** * Read long which was written to fitInBytes bytes and increment position. * @param fitInBytes In how many bytes given long is stored. * @return The value of parsed long. @@ -577,6 +599,18 @@ public final class ByteBufferUtils { return compareTo(buf1, o1, l1, buf2, o2, l2) == 0; } + /** + * @param buf array to hash + * @param offset offset to start from + * @param length length to hash + * */ + public static int hashCode(ByteBuffer buf, int offset, int length) { + int hash = 1; + for (int i = offset; i < offset + length; i++) + hash = (31 * hash) + (int) buf.get(i); + return hash; + } + public static int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) { if (UNSAFE_UNALIGNED) { long offset1Adj, offset2Adj; diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java index f4c4afe..6edeee2 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java @@ -26,12 +26,16 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hbase.ByteBufferedCell; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -71,6 +75,30 @@ public class TestTagCompressionContext { } @Test + public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); + ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2); + int tagsLength1 = kv1.getTagsLength(); + context.compressTags(baos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1); + ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3); + int tagsLength2 = kv2.getTagsLength(); + context.compressTags(baos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2); + + context.clear(); + + byte[] dest = new byte[tagsLength1]; + ByteBuffer ob = ByteBuffer.wrap(baos.toByteArray()); + context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1); + assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0, + tagsLength1)); + dest = new byte[tagsLength2]; + context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2); + assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0, + tagsLength2)); + } + + @Test public void testCompressUncompressTags2() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); @@ -94,6 +122,30 @@ public class TestTagCompressionContext { tagsLength2)); } + @Test + public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); + ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1); + int tagsLength1 = kv1.getTagsLength(); + context.compressTags(baos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1); + ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3); + int tagsLength2 = kv2.getTagsLength(); + context.compressTags(baos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2); + + context.clear(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + byte[] dest = new byte[tagsLength1]; + context.uncompressTags(bais, dest, 0, tagsLength1); + assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0, + tagsLength1)); + dest = new byte[tagsLength2]; + context.uncompressTags(bais, dest, 0, tagsLength2); + assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0, + tagsLength2)); + } + private KeyValue createKVWithTags(int noOfTags) { List tags = new ArrayList(); for (int i = 0; i < noOfTags; i++) { @@ -102,4 +154,16 @@ public class TestTagCompressionContext { KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags); return kv; } + + private Cell createOffheapKVWithTags(int noOfTags) { + List tags = new ArrayList(); + for (int i = 0; i < noOfTags; i++) { + tags.add(new Tag((byte) i, "tagValue" + i)); + } + KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags); + ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length); + ByteBufferUtils.copyFromArrayToBuffer(dbb, kv.getBuffer(), 0, kv.getBuffer().length); + OffheapKeyValue offheapKV = new OffheapKeyValue(dbb, 0, kv.getBuffer().length, true, 0); + return offheapKV; + } }