.../hadoop/hbase/io/TagCompressionContext.java | 44 +++++++++++++- .../io/encoding/BufferedDataBlockEncoder.java | 9 ++- .../apache/hadoop/hbase/io/util/Dictionary.java | 16 ++++- .../apache/hadoop/hbase/io/util/LRUDictionary.java | 37 +++++++++--- .../hadoop/hbase/io/TestTagCompressionContext.java | 70 +++++++++++++++++++++- .../hadoop/hbase/io/util/TestLRUDictionary.java | 8 +-- .../hadoop/hbase/regionserver/wal/Compressor.java | 4 +- .../hbase/regionserver/wal/WALCellCodec.java | 4 +- 8 files changed, 169 insertions(+), 23 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java index 05c4ad1..b2cfb92 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java @@ -94,6 +94,32 @@ public class TagCompressionContext { } /** + * Compress tags one by one and writes to the OutputStream. + * @param out Stream to which the compressed tags to be written + * @param in Source buffer where tags are available + * @param offset Offset for the tags byte buffer + * @param length Length of all tag bytes + * @throws IOException + */ + public void compressTags(OutputStream out, ByteBuffer in, int offset, int length) + throws IOException { + if (in.hasArray()) { + compressTags(out, in.array(), offset + in.position(), length); + ByteBufferUtils.skip(in, length); + } else { + int pos = offset; + int endOffset = pos + length; + assert pos < endOffset; + while (pos < endOffset) { + int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE); + pos += Tag.TAG_LENGTH_SIZE; + write(in, pos, tagLen, out); + pos += tagLen; + } + } + } + + /** * Uncompress tags from the InputStream and writes to the destination array. * @param src Stream where the compressed tags are available * @param dest Destination array where to write the uncompressed tags @@ -110,7 +136,7 @@ public class TagCompressionContext { int tagLen = StreamUtils.readRawVarint32(src); offset = Bytes.putAsShort(dest, offset, tagLen); IOUtils.readFully(src, dest, offset, tagLen); - tagDict.addEntry(dest, offset, tagLen); + tagDict.addEntry(dest, offset, tagLen, true); offset += tagLen; } else { short dictIdx = StreamUtils.toShort(status, (byte) src.read()); @@ -145,7 +171,7 @@ public class TagCompressionContext { tagLen = StreamUtils.readRawVarint32(src); offset = Bytes.putAsShort(dest, offset, tagLen); src.get(dest, offset, tagLen); - tagDict.addEntry(dest, offset, tagLen); + tagDict.addEntry(dest, offset, tagLen, true); offset += tagLen; } else { short dictIdx = StreamUtils.toShort(status, src.get()); @@ -192,4 +218,18 @@ public class TagCompressionContext { StreamUtils.writeShort(out, dictIdx); } } + + private void write(ByteBuffer data, int offset, int length, OutputStream out) throws IOException { + short dictIdx = Dictionary.NOT_IN_DICTIONARY; + if (tagDict != null) { + dictIdx = tagDict.findEntry(data, offset, length); + } + if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { + out.write(Dictionary.NOT_IN_DICTIONARY); + StreamUtils.writeRawVInt32(out, length); + ByteBufferUtils.copyBufferToStream(out, data, offset, length); + } else { + StreamUtils.writeShort(out, dictIdx); + } + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 33e38c7..2f1725b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -1004,8 +1004,13 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { if (tagCompressionContext != null) { // TODO : Make Dictionary interface to work with BBs and then change the corresponding // compress tags code to work with BB - tagCompressionContext - .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength); + if (cell instanceof ByteBufferedCell) { + tagCompressionContext.compressTags(out, ((ByteBufferedCell) cell).getTagsByteBuffer(), + ((ByteBufferedCell) cell).getTagsPosition(), tagsLength); + } else { + tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), + tagsLength); + } } else { CellUtil.writeTags(out, cell, tagsLength); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java index 4a3d42f..c186f2d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.io.util; +import java.nio.ByteBuffer; + import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -51,6 +53,17 @@ public interface Dictionary { short findEntry(byte[] data, int offset, int length); /** + * Finds the index of an entry. + * If no entry found, we add it. + * + * @param data the ByteBuffer that we're looking up + * @param offset Offset into data to add to Dictionary. + * @param length Length beyond offset that comprises entry; must be > 0. + * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found + */ + short findEntry(ByteBuffer data, int offset, int length); + + /** * Adds an entry to the dictionary. * Be careful using this method. It will add an entry to the * dictionary even if it already has an entry for the same data. @@ -60,10 +73,11 @@ public interface Dictionary { * @param data the entry to add * @param offset Offset into data to add to Dictionary. * @param length Length beyond offset that comprises entry; must be > 0. + * @param copy true indicates the incoming data should be copied to a new byte array. * @return the index of the entry */ - short addEntry(byte[] data, int offset, int length); + short addEntry(byte[] data, int offset, int length, boolean copy); /** * Flushes the dictionary, empties all values. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java index 8562cf0..40573b9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hbase.io.util; +import java.nio.ByteBuffer; import java.util.HashMap; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import com.google.common.base.Preconditions; @@ -51,15 +53,15 @@ public class LRUDictionary implements Dictionary { public short findEntry(byte[] data, int offset, int length) { short ret = backingStore.findIdx(data, offset, length); if (ret == NOT_IN_DICTIONARY) { - addEntry(data, offset, length); + addEntry(data, offset, length, true); } return ret; } @Override - public short addEntry(byte[] data, int offset, int length) { + public short addEntry(byte[] data, int offset, int length, boolean copy) { if (length <= 0) return NOT_IN_DICTIONARY; - return backingStore.put(data, offset, length); + return backingStore.put(data, offset, length, copy); } @Override @@ -89,12 +91,19 @@ public class LRUDictionary implements Dictionary { indexToNode = new Node[initialSize]; } - private short put(byte[] array, int offset, int length) { - // We copy the bytes we want, otherwise we might be holding references to - // massive arrays in our dictionary (or those arrays might change) - byte[] stored = new byte[length]; - Bytes.putBytes(stored, 0, array, offset, length); + private short put(byte[] array, int offset, int length, boolean copy) { + if (copy) { + // We copy the bytes we want, otherwise we might be holding references to + // massive arrays in our dictionary (or those arrays might change) + byte[] stored = new byte[length]; + Bytes.putBytes(stored, 0, array, offset, length); + return putInternal(stored); + } else { + return putInternal(array); + } + } + private short putInternal(byte[] stored) { if (currSize < initSize) { // There is space to add without evicting. if (indexToNode[currSize] == null) { @@ -115,6 +124,7 @@ public class LRUDictionary implements Dictionary { } } + private short findIdx(byte[] array, int offset, int length) { Short s; final Node comparisonNode = new Node(); @@ -216,4 +226,15 @@ public class LRUDictionary implements Dictionary { } } } + + @Override + public short findEntry(ByteBuffer data, int offset, int length) { + byte[] stored = new byte[length]; + ByteBufferUtils.copyFromBufferToArray(stored, data, offset, 0, length); + short ret = backingStore.findIdx(stored, 0, length); + if (ret == NOT_IN_DICTIONARY) { + addEntry(stored, 0, length, false); + } + return ret; + } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java index 6c46cf2..8f20ecb 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java @@ -21,18 +21,22 @@ package org.apache.hadoop.hbase.io; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.ArrayBackedTag; +import org.apache.hadoop.hbase.ByteBufferedCell; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -72,6 +76,31 @@ public class TestTagCompressionContext { } @Test + public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos); + TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); + ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2); + int tagsLength1 = kv1.getTagsLength(); + context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1); + ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3); + int tagsLength2 = kv2.getTagsLength(); + context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2); + + context.clear(); + + byte[] dest = new byte[tagsLength1]; + ByteBuffer ob = ByteBuffer.wrap(baos.getBuffer()); + context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1); + assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0, + tagsLength1)); + dest = new byte[tagsLength2]; + context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2); + assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0, + tagsLength2)); + } + + @Test public void testCompressUncompressTags2() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); @@ -84,7 +113,32 @@ public class TestTagCompressionContext { context.clear(); - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer()); + byte[] dest = new byte[tagsLength1]; + context.uncompressTags(bais, dest, 0, tagsLength1); + assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0, + tagsLength1)); + dest = new byte[tagsLength2]; + context.uncompressTags(bais, dest, 0, tagsLength2); + assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0, + tagsLength2)); + } + + @Test + public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos); + TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE); + ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1); + int tagsLength1 = kv1.getTagsLength(); + context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1); + ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3); + int tagsLength2 = kv2.getTagsLength(); + context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2); + + context.clear(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer()); byte[] dest = new byte[tagsLength1]; context.uncompressTags(bais, dest, 0, tagsLength1); assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0, @@ -103,4 +157,16 @@ public class TestTagCompressionContext { KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags); return kv; } + + private Cell createOffheapKVWithTags(int noOfTags) { + List tags = new ArrayList(); + for (int i = 0; i < noOfTags; i++) { + tags.add(new ArrayBackedTag((byte) i, "tagValue" + i)); + } + KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags); + ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length); + ByteBufferUtils.copyFromArrayToBuffer(dbb, kv.getBuffer(), 0, kv.getBuffer().length); + OffheapKeyValue offheapKV = new OffheapKeyValue(dbb, 0, kv.getBuffer().length, true, 0); + return offheapKV; + } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java index 9569ba8..2cc06c7 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java @@ -60,7 +60,7 @@ public class TestLRUDictionary { assertEquals(Dictionary.NOT_IN_DICTIONARY, testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0)); assertEquals(Dictionary.NOT_IN_DICTIONARY, - testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0)); + testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0, true)); } @Test @@ -69,9 +69,9 @@ public class TestLRUDictionary { // HConstants. Assert that when we add, we get new index. Thats how it // works. int len = HConstants.CATALOG_FAMILY.length; - int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len); - assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len)); - assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len)); + int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len, true); + assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len, true)); + assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len, true)); } @Test diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java index 4032cde..db4fb77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -112,7 +112,7 @@ public class Compressor { // if this isn't in the dictionary, we need to add to the dictionary. byte[] arr = new byte[length]; in.readFully(arr); - if (dict != null) dict.addEntry(arr, 0, length); + if (dict != null) dict.addEntry(arr, 0, length, true); return arr; } else { // Status here is the higher-order byte of index of the dictionary entry @@ -149,7 +149,7 @@ public class Compressor { // if this isn't in the dictionary, we need to add to the dictionary. int length = WritableUtils.readVInt(in); in.readFully(to, offset, length); - dict.addEntry(to, offset, length); + dict.addEntry(to, offset, length, true); return length; } else { // the status byte also acts as the higher order byte of the dictionary diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 26e0e04..7d7418a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -170,7 +170,7 @@ public class WALCellCodec implements Codec { if (bytesRead != arr.length) { throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead); } - if (dict != null) dict.addEntry(arr, 0, arr.length); + if (dict != null) dict.addEntry(arr, 0, arr.length, true); return arr; } else { // Status here is the higher-order byte of index of the dictionary entry. @@ -307,7 +307,7 @@ public class WALCellCodec implements Codec { // if this isn't in the dictionary, we need to add to the dictionary. int length = StreamUtils.readRawVarint32(in); IOUtils.readFully(in, to, offset, length); - dict.addEntry(to, offset, length); + dict.addEntry(to, offset, length, true); return length; } else { // the status byte also acts as the higher order byte of the dictionary entry.