.../hadoop/hbase/io/TagCompressionContext.java | 44 +++++++++++++-
.../io/encoding/BufferedDataBlockEncoder.java | 9 ++-
.../apache/hadoop/hbase/io/util/Dictionary.java | 16 ++++-
.../apache/hadoop/hbase/io/util/LRUDictionary.java | 37 +++++++++---
.../hadoop/hbase/io/TestTagCompressionContext.java | 70 +++++++++++++++++++++-
.../hadoop/hbase/io/util/TestLRUDictionary.java | 8 +--
.../hadoop/hbase/regionserver/wal/Compressor.java | 4 +-
.../hbase/regionserver/wal/WALCellCodec.java | 4 +-
8 files changed, 169 insertions(+), 23 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
index 05c4ad1..b2cfb92 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
@@ -94,6 +94,32 @@ public class TagCompressionContext {
}
/**
+ * Compress tags one by one and writes to the OutputStream.
+ * @param out Stream to which the compressed tags to be written
+ * @param in Source buffer where tags are available
+ * @param offset Offset for the tags byte buffer
+ * @param length Length of all tag bytes
+ * @throws IOException
+ */
+ public void compressTags(OutputStream out, ByteBuffer in, int offset, int length)
+ throws IOException {
+ if (in.hasArray()) {
+ compressTags(out, in.array(), offset + in.position(), length);
+ ByteBufferUtils.skip(in, length);
+ } else {
+ int pos = offset;
+ int endOffset = pos + length;
+ assert pos < endOffset;
+ while (pos < endOffset) {
+ int tagLen = ByteBufferUtils.readAsInt(in, pos, Tag.TAG_LENGTH_SIZE);
+ pos += Tag.TAG_LENGTH_SIZE;
+ write(in, pos, tagLen, out);
+ pos += tagLen;
+ }
+ }
+ }
+
+ /**
* Uncompress tags from the InputStream and writes to the destination array.
* @param src Stream where the compressed tags are available
* @param dest Destination array where to write the uncompressed tags
@@ -110,7 +136,7 @@ public class TagCompressionContext {
int tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
IOUtils.readFully(src, dest, offset, tagLen);
- tagDict.addEntry(dest, offset, tagLen);
+ tagDict.addEntry(dest, offset, tagLen, true);
offset += tagLen;
} else {
short dictIdx = StreamUtils.toShort(status, (byte) src.read());
@@ -145,7 +171,7 @@ public class TagCompressionContext {
tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
src.get(dest, offset, tagLen);
- tagDict.addEntry(dest, offset, tagLen);
+ tagDict.addEntry(dest, offset, tagLen, true);
offset += tagLen;
} else {
short dictIdx = StreamUtils.toShort(status, src.get());
@@ -192,4 +218,18 @@ public class TagCompressionContext {
StreamUtils.writeShort(out, dictIdx);
}
}
+
+ private void write(ByteBuffer data, int offset, int length, OutputStream out) throws IOException {
+ short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+ if (tagDict != null) {
+ dictIdx = tagDict.findEntry(data, offset, length);
+ }
+ if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+ out.write(Dictionary.NOT_IN_DICTIONARY);
+ StreamUtils.writeRawVInt32(out, length);
+ ByteBufferUtils.copyBufferToStream(out, data, offset, length);
+ } else {
+ StreamUtils.writeShort(out, dictIdx);
+ }
+ }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 33e38c7..2f1725b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -1004,8 +1004,13 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
if (tagCompressionContext != null) {
// TODO : Make Dictionary interface to work with BBs and then change the corresponding
// compress tags code to work with BB
- tagCompressionContext
- .compressTags(out, cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
+ if (cell instanceof ByteBufferedCell) {
+ tagCompressionContext.compressTags(out, ((ByteBufferedCell) cell).getTagsByteBuffer(),
+ ((ByteBufferedCell) cell).getTagsPosition(), tagsLength);
+ } else {
+ tagCompressionContext.compressTags(out, cell.getTagsArray(), cell.getTagsOffset(),
+ tagsLength);
+ }
} else {
CellUtil.writeTags(out, cell, tagsLength);
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
index 4a3d42f..c186f2d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.io.util;
+import java.nio.ByteBuffer;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
@@ -51,6 +53,17 @@ public interface Dictionary {
short findEntry(byte[] data, int offset, int length);
/**
+ * Finds the index of an entry.
+ * If no entry found, we add it.
+ *
+ * @param data the ByteBuffer that we're looking up
+ * @param offset Offset into data to add to Dictionary.
+ * @param length Length beyond offset that comprises entry; must be > 0.
+ * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
+ */
+ short findEntry(ByteBuffer data, int offset, int length);
+
+ /**
* Adds an entry to the dictionary.
* Be careful using this method. It will add an entry to the
* dictionary even if it already has an entry for the same data.
@@ -60,10 +73,11 @@ public interface Dictionary {
* @param data the entry to add
* @param offset Offset into data to add to Dictionary.
* @param length Length beyond offset that comprises entry; must be > 0.
+ * @param copy true indicates the incoming data should be copied to a new byte array.
* @return the index of the entry
*/
- short addEntry(byte[] data, int offset, int length);
+ short addEntry(byte[] data, int offset, int length, boolean copy);
/**
* Flushes the dictionary, empties all values.
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
index 8562cf0..40573b9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.hbase.io.util;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Preconditions;
@@ -51,15 +53,15 @@ public class LRUDictionary implements Dictionary {
public short findEntry(byte[] data, int offset, int length) {
short ret = backingStore.findIdx(data, offset, length);
if (ret == NOT_IN_DICTIONARY) {
- addEntry(data, offset, length);
+ addEntry(data, offset, length, true);
}
return ret;
}
@Override
- public short addEntry(byte[] data, int offset, int length) {
+ public short addEntry(byte[] data, int offset, int length, boolean copy) {
if (length <= 0) return NOT_IN_DICTIONARY;
- return backingStore.put(data, offset, length);
+ return backingStore.put(data, offset, length, copy);
}
@Override
@@ -89,12 +91,19 @@ public class LRUDictionary implements Dictionary {
indexToNode = new Node[initialSize];
}
- private short put(byte[] array, int offset, int length) {
- // We copy the bytes we want, otherwise we might be holding references to
- // massive arrays in our dictionary (or those arrays might change)
- byte[] stored = new byte[length];
- Bytes.putBytes(stored, 0, array, offset, length);
+ private short put(byte[] array, int offset, int length, boolean copy) {
+ if (copy) {
+ // We copy the bytes we want, otherwise we might be holding references to
+ // massive arrays in our dictionary (or those arrays might change)
+ byte[] stored = new byte[length];
+ Bytes.putBytes(stored, 0, array, offset, length);
+ return putInternal(stored);
+ } else {
+ return putInternal(array);
+ }
+ }
+ private short putInternal(byte[] stored) {
if (currSize < initSize) {
// There is space to add without evicting.
if (indexToNode[currSize] == null) {
@@ -115,6 +124,7 @@ public class LRUDictionary implements Dictionary {
}
}
+
private short findIdx(byte[] array, int offset, int length) {
Short s;
final Node comparisonNode = new Node();
@@ -216,4 +226,15 @@ public class LRUDictionary implements Dictionary {
}
}
}
+
+ @Override
+ public short findEntry(ByteBuffer data, int offset, int length) {
+ byte[] stored = new byte[length];
+ ByteBufferUtils.copyFromBufferToArray(stored, data, offset, 0, length);
+ short ret = backingStore.findIdx(stored, 0, length);
+ if (ret == NOT_IN_DICTIONARY) {
+ addEntry(stored, 0, length, false);
+ }
+ return ret;
+ }
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
index 6c46cf2..8f20ecb 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
@@ -21,18 +21,22 @@ package org.apache.hadoop.hbase.io;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.OffheapKeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.ArrayBackedTag;
+import org.apache.hadoop.hbase.ByteBufferedCell;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -72,6 +76,31 @@ public class TestTagCompressionContext {
}
@Test
+ public void testCompressUncompressTagsWithOffheapKeyValue1() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
+ TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
+ ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(2);
+ int tagsLength1 = kv1.getTagsLength();
+ context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1);
+ ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3);
+ int tagsLength2 = kv2.getTagsLength();
+ context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2);
+
+ context.clear();
+
+ byte[] dest = new byte[tagsLength1];
+ ByteBuffer ob = ByteBuffer.wrap(baos.getBuffer());
+ context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1);
+ assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
+ tagsLength1));
+ dest = new byte[tagsLength2];
+ context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2);
+ assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
+ tagsLength2));
+ }
+
+ @Test
public void testCompressUncompressTags2() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
@@ -84,7 +113,32 @@ public class TestTagCompressionContext {
context.clear();
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer());
+ byte[] dest = new byte[tagsLength1];
+ context.uncompressTags(bais, dest, 0, tagsLength1);
+ assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
+ tagsLength1));
+ dest = new byte[tagsLength2];
+ context.uncompressTags(bais, dest, 0, tagsLength2);
+ assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), tagsLength2, dest, 0,
+ tagsLength2));
+ }
+
+ @Test
+ public void testCompressUncompressTagsWithOffheapKeyValue2() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream daos = new ByteBufferSupportDataOutputStream(baos);
+ TagCompressionContext context = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
+ ByteBufferedCell kv1 = (ByteBufferedCell)createOffheapKVWithTags(1);
+ int tagsLength1 = kv1.getTagsLength();
+ context.compressTags(daos, kv1.getTagsByteBuffer(), kv1.getTagsPosition(), tagsLength1);
+ ByteBufferedCell kv2 = (ByteBufferedCell)createOffheapKVWithTags(3);
+ int tagsLength2 = kv2.getTagsLength();
+ context.compressTags(daos, kv2.getTagsByteBuffer(), kv2.getTagsPosition(), tagsLength2);
+
+ context.clear();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.getBuffer());
byte[] dest = new byte[tagsLength1];
context.uncompressTags(bais, dest, 0, tagsLength1);
assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), tagsLength1, dest, 0,
@@ -103,4 +157,16 @@ public class TestTagCompressionContext {
KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
return kv;
}
+
+ private Cell createOffheapKVWithTags(int noOfTags) {
+ List tags = new ArrayList();
+ for (int i = 0; i < noOfTags; i++) {
+ tags.add(new ArrayBackedTag((byte) i, "tagValue" + i));
+ }
+ KeyValue kv = new KeyValue(ROW, CF, Q, 1234L, V, tags);
+ ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length);
+ ByteBufferUtils.copyFromArrayToBuffer(dbb, kv.getBuffer(), 0, kv.getBuffer().length);
+ OffheapKeyValue offheapKV = new OffheapKeyValue(dbb, 0, kv.getBuffer().length, true, 0);
+ return offheapKV;
+ }
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
index 9569ba8..2cc06c7 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
@@ -60,7 +60,7 @@ public class TestLRUDictionary {
assertEquals(Dictionary.NOT_IN_DICTIONARY,
testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
assertEquals(Dictionary.NOT_IN_DICTIONARY,
- testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+ testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0, true));
}
@Test
@@ -69,9 +69,9 @@ public class TestLRUDictionary {
// HConstants. Assert that when we add, we get new index. Thats how it
// works.
int len = HConstants.CATALOG_FAMILY.length;
- int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
- assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
- assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+ int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len, true);
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len, true));
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len, true));
}
@Test
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index 4032cde..db4fb77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -112,7 +112,7 @@ public class Compressor {
// if this isn't in the dictionary, we need to add to the dictionary.
byte[] arr = new byte[length];
in.readFully(arr);
- if (dict != null) dict.addEntry(arr, 0, length);
+ if (dict != null) dict.addEntry(arr, 0, length, true);
return arr;
} else {
// Status here is the higher-order byte of index of the dictionary entry
@@ -149,7 +149,7 @@ public class Compressor {
// if this isn't in the dictionary, we need to add to the dictionary.
int length = WritableUtils.readVInt(in);
in.readFully(to, offset, length);
- dict.addEntry(to, offset, length);
+ dict.addEntry(to, offset, length, true);
return length;
} else {
// the status byte also acts as the higher order byte of the dictionary
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 26e0e04..7d7418a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -170,7 +170,7 @@ public class WALCellCodec implements Codec {
if (bytesRead != arr.length) {
throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
}
- if (dict != null) dict.addEntry(arr, 0, arr.length);
+ if (dict != null) dict.addEntry(arr, 0, arr.length, true);
return arr;
} else {
// Status here is the higher-order byte of index of the dictionary entry.
@@ -307,7 +307,7 @@ public class WALCellCodec implements Codec {
// if this isn't in the dictionary, we need to add to the dictionary.
int length = StreamUtils.readRawVarint32(in);
IOUtils.readFully(in, to, offset, length);
- dict.addEntry(to, offset, length);
+ dict.addEntry(to, offset, length, true);
return length;
} else {
// the status byte also acts as the higher order byte of the dictionary entry.