diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 48caea3..dbe3b30 100644
--- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -805,6 +805,13 @@ public final class HConstants {
public static final String ENABLE_WAL_COMPRESSION =
"hbase.regionserver.wal.enablecompression";
+ /**
+ * Configuration name of tags compression in HLog. Tag compression will happen only when this
+ * config is enabled along with HLog compression.
+ */
+ public static final String ENABLE_WAL_TAGS_COMPRESSION =
+ "hbase.regionserver.wal.tags.enablecompression";
+
/** Region in Transition metrics threshold time */
public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold";
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
new file mode 100644
index 0000000..c0d1c8d
--- /dev/null
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
+ * will be used for compressing tags while writing into HFile and WALs.
+ */
+@InterfaceAudience.Private
+public class TagCompressionContext {
+ private final Dictionary tagDict;
+
+ public TagCompressionContext(Class extends Dictionary> dictType) throws SecurityException,
+ NoSuchMethodException, InstantiationException, IllegalAccessException,
+ InvocationTargetException {
+ Constructor extends Dictionary> dictConstructor = dictType.getConstructor();
+ tagDict = dictConstructor.newInstance();
+ tagDict.init(Short.MAX_VALUE);
+ }
+
+ public void clear() {
+ tagDict.clear();
+ }
+
+ /**
+ *
+ * @param out
+ * @param in
+ * @param tagsLength
+ * @throws IOException
+ */
+ public void compressTags(OutputStream out, ByteBuffer in, short tagsLength) throws IOException {
+ if (in.hasArray()) {
+ compressTags(out, in.array(), in.arrayOffset() + in.position(), tagsLength);
+ ByteBufferUtils.skip(in, tagsLength);
+ } else {
+ byte[] tagBuf = new byte[tagsLength];
+ in.get(tagBuf);
+ compressTags(out, tagBuf, 0, tagsLength);
+ }
+ }
+
+ /**
+ *
+ * @param out
+ * @param in
+ * @param offset
+ * @param tagsLength
+ * @throws IOException
+ */
+ public void compressTags(OutputStream out, byte[] in, int offset, short tagsLength)
+ throws IOException {
+ int pos = offset;
+ int endOffset = pos + tagsLength;
+ while (pos < endOffset) {
+ short tagLen = Bytes.toShort(in, pos);
+ pos += Tag.TAG_LENGTH_SIZE;
+ write(in, pos, tagLen, out);
+ pos += tagLen;
+ }
+ }
+
+ /**
+ *
+ * @param src
+ * @param dest
+ * @param offset
+ * @param tagsLength
+ * @throws IOException
+ */
+ public void uncompressTags(ByteBuffer src, byte[] dest, int offset, int tagsLength)
+ throws IOException {
+ int endOffset = offset + tagsLength;
+ while (offset < endOffset) {
+ byte status = src.get();
+ short tagLen;
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ // We are writing short as tagLen. So can downcast this without any risk.
+ tagLen = (short) StreamUtils.readRawVarint32(src);
+ offset = Bytes.putShort(dest, offset, tagLen);
+ src.get(dest, offset, tagLen);
+ tagDict.addEntry(dest, offset, tagLen);
+ offset += tagLen;
+ } else {
+ short dictIdx = StreamUtils.toShort(status, src.get());
+ byte[] entry = tagDict.getEntry(dictIdx);
+ if (entry == null) {
+ throw new IOException("Missing dictionary entry for index " + dictIdx);
+ }
+ tagLen = (short) entry.length;
+ offset = Bytes.putShort(dest, offset, tagLen);
+ System.arraycopy(entry, 0, dest, offset, tagLen);
+ ByteBufferUtils.skip(src, tagLen);
+ offset += entry.length;
+ }
+ }
+ }
+
+ /**
+ *
+ * @param src
+ * @param dest
+ * @param tagsLength
+ * @throws IOException
+ */
+ public void uncompressTags(InputStream src, ByteBuffer dest, short tagsLength) throws IOException {
+ if (dest.hasArray()) {
+ uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), tagsLength);
+ } else {
+ byte[] tagBuf = new byte[tagsLength];
+ uncompressTags(src, tagBuf, 0, tagsLength);
+ dest.put(tagBuf);
+ }
+ }
+
+ /**
+ *
+ * @param src
+ * @param dest
+ * @param offset
+ * @param tagsLength
+ * @throws IOException
+ */
+ public void uncompressTags(InputStream src, byte[] dest, int offset, short tagsLength)
+ throws IOException {
+ int endOffset = offset + tagsLength;
+ while (offset < endOffset) {
+ byte status = (byte) src.read();
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ // We are writing short as tagLen. So can downcast this without any risk.
+ short tagLen = (short) StreamUtils.readRawVarint32(src);
+ offset = Bytes.putShort(dest, offset, tagLen);
+ IOUtils.readFully(src, dest, offset, tagLen);
+ tagDict.addEntry(dest, offset, tagLen);
+ offset += tagLen;
+ } else {
+ short dictIdx = StreamUtils.toShort(status, (byte) src.read());
+ byte[] entry = tagDict.getEntry(dictIdx);
+ if (entry == null) {
+ throw new IOException("Missing dictionary entry for index " + dictIdx);
+ }
+ offset = Bytes.putShort(dest, offset, (short) entry.length);
+ System.arraycopy(entry, 0, dest, offset, entry.length);
+ offset += entry.length;
+ }
+ }
+ }
+
+ private void write(byte[] data, int offset, short length, OutputStream out) throws IOException {
+ short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+ if (tagDict != null) {
+ dictIdx = tagDict.findEntry(data, offset, length);
+ }
+ if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+ out.write(Dictionary.NOT_IN_DICTIONARY);
+ StreamUtils.writeRawVInt32(out, length);
+ out.write(data, offset, length);
+ } else {
+ StreamUtils.writeShort(out, dictIdx);
+ }
+ }
+
+}
\ No newline at end of file
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
new file mode 100644
index 0000000..ee034ed
--- /dev/null
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Dictionary interface
+ *
+ * Dictionary indexes should be either bytes or shorts, only positive. (The
+ * first bit is reserved for detecting whether something is compressed or not).
+ */
+@InterfaceAudience.Private
+public interface Dictionary {
+ byte NOT_IN_DICTIONARY = -1;
+
+ void init(int initialSize);
+ /**
+ * Gets an entry from the dictionary.
+ *
+ * @param idx index of the entry
+ * @return the entry, or null if non existent
+ */
+ byte[] getEntry(short idx);
+
+ /**
+ * Finds the index of an entry.
+ * If no entry found, we add it.
+ *
+ * @param data the byte array that we're looking up
+ * @param offset Offset into data to add to Dictionary.
+ * @param length Length beyond offset that comprises entry; must be > 0.
+ * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
+ */
+ short findEntry(byte[] data, int offset, int length);
+
+ /**
+ * Adds an entry to the dictionary.
+ * Be careful using this method. It will add an entry to the
+ * dictionary even if it already has an entry for the same data.
+ * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating
+ * dictionary entries.
+ *
+ * @param data the entry to add
+ * @param offset Offset into data to add to Dictionary.
+ * @param length Length beyond offset that comprises entry; must be > 0.
+ * @return the index of the entry
+ */
+
+ short addEntry(byte[] data, int offset, int length);
+
+ /**
+ * Flushes the dictionary, empties all values.
+ */
+ void clear();
+}
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
new file mode 100644
index 0000000..3836d74
--- /dev/null
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * WALDictionary using an LRU eviction algorithm. Uses a linked list running
+ * through a hashtable. Currently has max of 2^15 entries. Will start
+ * evicting if exceeds this number The maximum memory we expect this dictionary
+ * to take in the worst case is about:
+ * (2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes (these are some big names) = ~16MB.
+ * If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes.
+ */
+@InterfaceAudience.Private
+public class LRUDictionary implements Dictionary {
+
+ BidirectionalLRUMap backingStore;
+ @Override
+ public byte[] getEntry(short idx) {
+ return backingStore.get(idx);
+ }
+
+ @Override
+ public void init(int initialSize) {
+ backingStore = new BidirectionalLRUMap(initialSize);
+ }
+ @Override
+ public short findEntry(byte[] data, int offset, int length) {
+ short ret = backingStore.findIdx(data, offset, length);
+ if (ret == NOT_IN_DICTIONARY) {
+ addEntry(data, offset, length);
+ }
+ return ret;
+ }
+
+ @Override
+ public short addEntry(byte[] data, int offset, int length) {
+ if (length <= 0) return NOT_IN_DICTIONARY;
+ return backingStore.put(data, offset, length);
+ }
+
+ @Override
+ public void clear() {
+ backingStore.clear();
+ }
+
+ /*
+ * Internal class used to implement LRU eviction and dual lookup (by key and
+ * value).
+ *
+ * This is not thread safe. Don't use in multi-threaded applications.
+ */
+ static class BidirectionalLRUMap {
+ private int currSize = 0;
+
+ // Head and tail of the LRU list.
+ private Node head;
+ private Node tail;
+
+ private HashMap nodeToIndex = new HashMap();
+ private Node[] indexToNode;
+ private int initSize = 0;
+
+ public BidirectionalLRUMap(int initialSize) {
+ initSize = initialSize;
+ indexToNode = new Node[initialSize];
+ for (int i = 0; i < initialSize; i++) {
+ indexToNode[i] = new Node();
+ }
+ }
+
+ private short put(byte[] array, int offset, int length) {
+ // We copy the bytes we want, otherwise we might be holding references to
+ // massive arrays in our dictionary (or those arrays might change)
+ byte[] stored = new byte[length];
+ Bytes.putBytes(stored, 0, array, offset, length);
+
+ if (currSize < initSize) {
+ // There is space to add without evicting.
+ indexToNode[currSize].setContents(stored, 0, stored.length);
+ setHead(indexToNode[currSize]);
+ short ret = (short) currSize++;
+ nodeToIndex.put(indexToNode[ret], ret);
+ return ret;
+ } else {
+ short s = nodeToIndex.remove(tail);
+ tail.setContents(stored, 0, stored.length);
+ // we need to rehash this.
+ nodeToIndex.put(tail, s);
+ moveToHead(tail);
+ return s;
+ }
+ }
+
+ private short findIdx(byte[] array, int offset, int length) {
+ Short s;
+ final Node comparisonNode = new Node();
+ comparisonNode.setContents(array, offset, length);
+ if ((s = nodeToIndex.get(comparisonNode)) != null) {
+ moveToHead(indexToNode[s]);
+ return s;
+ } else {
+ return -1;
+ }
+ }
+
+ private byte[] get(short idx) {
+ Preconditions.checkElementIndex(idx, currSize);
+ moveToHead(indexToNode[idx]);
+ return indexToNode[idx].container;
+ }
+
+ private void moveToHead(Node n) {
+ if (head == n) {
+ // no-op -- it's already the head.
+ return;
+ }
+ // At this point we definitely have prev, since it's not the head.
+ assert n.prev != null;
+ // Unlink prev.
+ n.prev.next = n.next;
+
+ // Unlink next
+ if (n.next != null) {
+ n.next.prev = n.prev;
+ } else {
+ assert n == tail;
+ tail = n.prev;
+ }
+ // Node is now removed from the list. Re-add it at the head.
+ setHead(n);
+ }
+
+ private void setHead(Node n) {
+ // assume it's already unlinked from the list at this point.
+ n.prev = null;
+ n.next = head;
+ if (head != null) {
+ assert head.prev == null;
+ head.prev = n;
+ }
+
+ head = n;
+
+ // First entry
+ if (tail == null) {
+ tail = n;
+ }
+ }
+
+ private void clear() {
+ currSize = 0;
+ nodeToIndex.clear();
+ tail = null;
+ head = null;
+
+ for (Node n : indexToNode) {
+ n.container = null;
+ }
+
+ for (int i = 0; i < initSize; i++) {
+ indexToNode[i].next = null;
+ indexToNode[i].prev = null;
+ }
+ }
+
+ private static class Node {
+ byte[] container;
+ int offset;
+ int length;
+ Node next; // link towards the tail
+ Node prev; // link towards the head
+
+ public Node() {
+ }
+
+ private void setContents(byte[] container, int offset, int length) {
+ this.container = container;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public int hashCode() {
+ return Bytes.hashCode(container, offset, length);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Node)) {
+ return false;
+ }
+
+ Node casted = (Node) other;
+ return Bytes.equals(container, offset, length, casted.container,
+ casted.offset, casted.length);
+ }
+ }
+ }
+}
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
new file mode 100644
index 0000000..c19cb19
--- /dev/null
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * It seems like as soon as somebody sets himself to the task of creating VInt encoding, his mind
+ * blanks out for a split-second and he starts the work by wrapping it in the most convoluted
+ * interface he can come up with. Custom streams that allocate memory, DataOutput that is only used
+ * to write single bytes... We operate on simple streams. Thus, we are going to have a simple
+ * implementation copy-pasted from protobuf Coded*Stream.
+ */
+@InterfaceAudience.Private
+public class StreamUtils {
+
+ public static void writeRawVInt32(OutputStream output, int value) throws IOException {
+ assert value >= 0;
+ while (true) {
+ if ((value & ~0x7F) == 0) {
+ output.write(value);
+ return;
+ } else {
+ output.write((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ }
+ }
+
+ public static int readRawVarint32(InputStream input) throws IOException {
+ byte tmp = (byte) input.read();
+ if (tmp >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = (byte) input.read()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = (byte) input.read()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = (byte) input.read()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = (byte) input.read()) << 28;
+ if (tmp < 0) {
+ // Discard upper 32 bits.
+ for (int i = 0; i < 5; i++) {
+ if (input.read() >= 0) {
+ return result;
+ }
+ }
+ throw new IOException("Malformed varint");
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ public static int readRawVarint32(ByteBuffer input) throws IOException {
+ byte tmp = input.get();
+ if (tmp >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = input.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = input.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = input.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = input.get()) << 28;
+ if (tmp < 0) {
+ // Discard upper 32 bits.
+ for (int i = 0; i < 5; i++) {
+ if (input.get() >= 0) {
+ return result;
+ }
+ }
+ throw new IOException("Malformed varint");
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ public static short toShort(byte hi, byte lo) {
+ short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
+ Preconditions.checkArgument(s >= 0);
+ return s;
+ }
+
+ public static void writeShort(OutputStream out, short v) throws IOException {
+ Preconditions.checkArgument(v >= 0);
+ out.write((byte) (0xff & (v >> 8)));
+ out.write((byte) (0xff & v));
+ }
+}
\ No newline at end of file
diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
new file mode 100644
index 0000000..871c6fc
--- /dev/null
+++ hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import static org.junit.Assert.*;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests LRUDictionary
+ */
+@Category(SmallTests.class)
+public class TestLRUDictionary {
+ LRUDictionary testee;
+
+ @Before
+ public void setUp() throws Exception {
+ testee = new LRUDictionary();
+ testee.init(Short.MAX_VALUE);
+ }
+
+ @Test
+ public void TestContainsNothing() {
+ assertTrue(isDictionaryEmpty(testee));
+ }
+
+ /**
+ * Assert can't add empty array.
+ */
+ @Test
+ public void testPassingEmptyArrayToFindEntry() {
+ assertEquals(Dictionary.NOT_IN_DICTIONARY,
+ testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+ assertEquals(Dictionary.NOT_IN_DICTIONARY,
+ testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+ }
+
+ @Test
+ public void testPassingSameArrayToAddEntry() {
+ // Add random predefined byte array, in this case a random byte array from
+ // HConstants. Assert that when we add, we get new index. Thats how it
+ // works.
+ int len = HConstants.CATALOG_FAMILY.length;
+ int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+ }
+
+ @Test
+ public void testBasic() {
+ Random rand = new Random();
+ byte[] testBytes = new byte[10];
+ rand.nextBytes(testBytes);
+
+ // Verify that our randomly generated array doesn't exist in the dictionary
+ assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1);
+
+ // now since we looked up an entry, we should have added it to the
+ // dictionary, so it isn't empty
+
+ assertFalse(isDictionaryEmpty(testee));
+
+ // Check if we can find it using findEntry
+ short t = testee.findEntry(testBytes, 0, testBytes.length);
+
+ // Making sure we do find what we're looking for
+ assertTrue(t != -1);
+
+ byte[] testBytesCopy = new byte[20];
+
+ Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length);
+
+ // copy byte arrays, make sure that we check that equal byte arrays are
+ // equal without just checking the reference
+ assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t);
+
+ // make sure the entry retrieved is the same as the one put in
+ assertTrue(Arrays.equals(testBytes, testee.getEntry(t)));
+
+ testee.clear();
+
+ // making sure clear clears the dictionary
+ assertTrue(isDictionaryEmpty(testee));
+ }
+
+ @Test
+ public void TestLRUPolicy(){
+ //start by filling the dictionary up with byte arrays
+ for (int i = 0; i < Short.MAX_VALUE; i++) {
+ testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
+ (BigInteger.valueOf(i)).toByteArray().length);
+ }
+
+ // check we have the first element added
+ assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+ BigInteger.ZERO.toByteArray().length) != -1);
+
+ // check for an element we know isn't there
+ assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+ BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1);
+
+ // since we just checked for this element, it should be there now.
+ assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+ BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1);
+
+ // test eviction, that the least recently added or looked at element is
+ // evicted. We looked at ZERO so it should be in the dictionary still.
+ assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+ BigInteger.ZERO.toByteArray().length) != -1);
+ // Now go from beyond 1 to the end.
+ for(int i = 1; i < Short.MAX_VALUE; i++) {
+ assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+ BigInteger.valueOf(i).toByteArray().length) == -1);
+ }
+
+ // check we can find all of these.
+ for (int i = 0; i < Short.MAX_VALUE; i++) {
+ assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+ BigInteger.valueOf(i).toByteArray().length) != -1);
+ }
+ }
+
+ static private boolean isDictionaryEmpty(LRUDictionary dict) {
+ try {
+ dict.getEntry((short)0);
+ return false;
+ } catch (IndexOutOfBoundsException ioobe) {
+ return true;
+ }
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index a76cafa..6983c70 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.util.Dictionary;
/**
* Context that holds the various dictionaries for compression in HLog.
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index f83217a..04046f1 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
deleted file mode 100644
index fd1c264..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * Dictionary interface
- *
- * Dictionary indexes should be either bytes or shorts, only positive. (The
- * first bit is reserved for detecting whether something is compressed or not).
- */
-@InterfaceAudience.Private
-interface Dictionary {
- byte NOT_IN_DICTIONARY = -1;
-
- void init(int initialSize);
- /**
- * Gets an entry from the dictionary.
- *
- * @param idx index of the entry
- * @return the entry, or null if non existent
- */
- byte[] getEntry(short idx);
-
- /**
- * Finds the index of an entry.
- * If no entry found, we add it.
- *
- * @param data the byte array that we're looking up
- * @param offset Offset into data to add to Dictionary.
- * @param length Length beyond offset that comprises entry; must be > 0.
- * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
- */
- short findEntry(byte[] data, int offset, int length);
-
- /**
- * Adds an entry to the dictionary.
- * Be careful using this method. It will add an entry to the
- * dictionary even if it already has an entry for the same data.
- * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating
- * dictionary entries.
- *
- * @param data the entry to add
- * @param offset Offset into data to add to Dictionary.
- * @param length Length beyond offset that comprises entry; must be > 0.
- * @return the index of the entry
- */
-
- short addEntry(byte[] data, int offset, int length);
-
- /**
- * Flushes the dictionary, empties all values.
- */
- void clear();
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
deleted file mode 100644
index 1bea447..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.util.HashMap;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.base.Preconditions;
-
-/**
- * WALDictionary using an LRU eviction algorithm. Uses a linked list running
- * through a hashtable. Currently has max of 2^15 entries. Will start
- * evicting if exceeds this number The maximum memory we expect this dictionary
- * to take in the worst case is about:
- * (2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes (these are some big names) = ~16MB.
- * If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes.
- */
-@InterfaceAudience.Private
-public class LRUDictionary implements Dictionary {
-
- BidirectionalLRUMap backingStore;
- @Override
- public byte[] getEntry(short idx) {
- return backingStore.get(idx);
- }
-
- @Override
- public void init(int initialSize) {
- backingStore = new BidirectionalLRUMap(initialSize);
- }
- @Override
- public short findEntry(byte[] data, int offset, int length) {
- short ret = backingStore.findIdx(data, offset, length);
- if (ret == NOT_IN_DICTIONARY) {
- addEntry(data, offset, length);
- }
- return ret;
- }
-
- @Override
- public short addEntry(byte[] data, int offset, int length) {
- if (length <= 0) return NOT_IN_DICTIONARY;
- return backingStore.put(data, offset, length);
- }
-
- @Override
- public void clear() {
- backingStore.clear();
- }
-
- /*
- * Internal class used to implement LRU eviction and dual lookup (by key and
- * value).
- *
- * This is not thread safe. Don't use in multi-threaded applications.
- */
- static class BidirectionalLRUMap {
- private int currSize = 0;
-
- // Head and tail of the LRU list.
- private Node head;
- private Node tail;
-
- private HashMap nodeToIndex = new HashMap();
- private Node[] indexToNode;
- private int initSize = 0;
-
- public BidirectionalLRUMap(int initialSize) {
- initSize = initialSize;
- indexToNode = new Node[initialSize];
- for (int i = 0; i < initialSize; i++) {
- indexToNode[i] = new Node();
- }
- }
-
- private short put(byte[] array, int offset, int length) {
- // We copy the bytes we want, otherwise we might be holding references to
- // massive arrays in our dictionary (or those arrays might change)
- byte[] stored = new byte[length];
- Bytes.putBytes(stored, 0, array, offset, length);
-
- if (currSize < initSize) {
- // There is space to add without evicting.
- indexToNode[currSize].setContents(stored, 0, stored.length);
- setHead(indexToNode[currSize]);
- short ret = (short) currSize++;
- nodeToIndex.put(indexToNode[ret], ret);
- return ret;
- } else {
- short s = nodeToIndex.remove(tail);
- tail.setContents(stored, 0, stored.length);
- // we need to rehash this.
- nodeToIndex.put(tail, s);
- moveToHead(tail);
- return s;
- }
- }
-
- private short findIdx(byte[] array, int offset, int length) {
- Short s;
- final Node comparisonNode = new Node();
- comparisonNode.setContents(array, offset, length);
- if ((s = nodeToIndex.get(comparisonNode)) != null) {
- moveToHead(indexToNode[s]);
- return s;
- } else {
- return -1;
- }
- }
-
- private byte[] get(short idx) {
- Preconditions.checkElementIndex(idx, currSize);
- moveToHead(indexToNode[idx]);
- return indexToNode[idx].container;
- }
-
- private void moveToHead(Node n) {
- if (head == n) {
- // no-op -- it's already the head.
- return;
- }
- // At this point we definitely have prev, since it's not the head.
- assert n.prev != null;
- // Unlink prev.
- n.prev.next = n.next;
-
- // Unlink next
- if (n.next != null) {
- n.next.prev = n.prev;
- } else {
- assert n == tail;
- tail = n.prev;
- }
- // Node is now removed from the list. Re-add it at the head.
- setHead(n);
- }
-
- private void setHead(Node n) {
- // assume it's already unlinked from the list at this point.
- n.prev = null;
- n.next = head;
- if (head != null) {
- assert head.prev == null;
- head.prev = n;
- }
-
- head = n;
-
- // First entry
- if (tail == null) {
- tail = n;
- }
- }
-
- private void clear() {
- currSize = 0;
- nodeToIndex.clear();
- tail = null;
- head = null;
-
- for (Node n : indexToNode) {
- n.container = null;
- }
-
- for (int i = 0; i < initSize; i++) {
- indexToNode[i].next = null;
- indexToNode[i].prev = null;
- }
- }
-
- private static class Node {
- byte[] container;
- int offset;
- int length;
- Node next; // link towards the tail
- Node prev; // link towards the head
-
- public Node() {
- }
-
- private void setContents(byte[] container, int offset, int length) {
- this.container = container;
- this.offset = offset;
- this.length = length;
- }
-
- @Override
- public int hashCode() {
- return Bytes.hashCode(container, offset, length);
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof Node)) {
- return false;
- }
-
- Node casted = (Node) other;
- return Bytes.equals(container, offset, length, casted.container,
- casted.offset, casted.length);
- }
- }
- }
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 26d8e61..c75a233 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -28,8 +28,14 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
@@ -64,11 +70,36 @@ public class ProtobufLogReader extends ReaderBase {
private long walEditsStopOffset;
private boolean trailerPresent;
+ // Context used by the WAL for compressing tags
+ private TagCompressionContext tagCompressionContext;
+
public ProtobufLogReader() {
super();
}
@Override
+ public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
+ throws IOException {
+ super.init(fs, path, conf, stream);
+ if (hasTagsCompression()) {
+ try {
+ if (tagCompressionContext == null) {
+ tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+ } else {
+ tagCompressionContext.clear();
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize TagCompressionContext", e);
+ }
+ }
+ }
+
+ private boolean hasTagsCompression() {
+ // By default tags compression be enabled along with WAL compression.
+ return hasCompression() && conf.getBoolean(HConstants.ENABLE_WAL_TAGS_COMPRESSION, true);
+ }
+
+ @Override
public void close() throws IOException {
if (this.inputStream != null) {
this.inputStream.close();
@@ -178,7 +209,8 @@ public class ProtobufLogReader extends ReaderBase {
@Override
protected void initAfterCompression() throws IOException {
- WALCellCodec codec = WALCellCodec.create(this.conf, this.compressionContext);
+ WALCellCodec codec = WALCellCodec.create(this.conf, this.compressionContext,
+ this.tagCompressionContext);
this.cellDecoder = codec.getDecoder(this.inputStream);
if (this.hasCompression) {
this.byteStringUncompressor = codec.getByteStringUncompressor();
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 1174847..0583f88 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
@@ -50,6 +52,9 @@ public class ProtobufLogWriter extends WriterBase {
// than this size, it is written/read respectively, with a WARN message in the log.
private int trailerWarnSize;
+ // Context used by the WAL for compressing tags
+ private TagCompressionContext tagCompressionContext;
+
public ProtobufLogWriter() {
super();
}
@@ -69,7 +74,18 @@ public class ProtobufLogWriter extends WriterBase {
output.write(ProtobufLogReader.PB_WAL_MAGIC);
WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output);
- WALCellCodec codec = WALCellCodec.create(conf, this.compressionContext);
+ if (doCompress) {
+ // By default tags compression be enabled along with WAL compression.
+ if (conf.getBoolean(HConstants.ENABLE_WAL_TAGS_COMPRESSION, true)) {
+ try {
+ this.tagCompressionContext = new TagCompressionContext(LRUDictionary.class);
+ } catch (Exception e) {
+ throw new IOException("Failed to initiate TagCompressionContext", e);
+ }
+ }
+ }
+ WALCellCodec codec = WALCellCodec.create(conf, this.compressionContext,
+ this.tagCompressionContext);
this.cellEncoder = codec.getEncoder(this.output);
if (doCompress) {
this.compressor = codec.getByteStringCompressor();
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index 72db786..8c05d08 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 4f73f32..57e2465 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -29,11 +29,13 @@ import org.apache.hadoop.hbase.codec.BaseDecoder;
import org.apache.hadoop.hbase.codec.BaseEncoder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.io.IOUtils;
-import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
@@ -46,6 +48,8 @@ public class WALCellCodec implements Codec {
public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
private final CompressionContext compression;
+ private final TagCompressionContext tagCompressionContext;
+
private final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
@Override
public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
@@ -59,9 +63,13 @@ public class WALCellCodec implements Codec {
* @param conf configuration to configure this
* @param compression compression the codec should support, can be null to indicate no
* compression
+ * @param tagCompressionContext context for compressing the tags.
+ * Can be null to indicate no tag compression.
*/
- public WALCellCodec(Configuration conf, CompressionContext compression) {
+ public WALCellCodec(Configuration conf, CompressionContext compression,
+ TagCompressionContext tagCompressionContext) {
this.compression = compression;
+ this.tagCompressionContext = tagCompressionContext;
}
/**
@@ -70,14 +78,16 @@ public class WALCellCodec implements Codec {
* @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
* uses a {@link WALCellCodec}.
* @param compression compression the codec should use
+ * @param tagCompressionContext context for compressing the tags.
* @return a {@link WALCellCodec} ready for use.
* @throws UnsupportedOperationException if the codec cannot be instantiated
*/
- public static WALCellCodec create(Configuration conf, CompressionContext compression)
- throws UnsupportedOperationException {
+ public static WALCellCodec create(Configuration conf, CompressionContext compression,
+ TagCompressionContext tagCompressionContext) throws UnsupportedOperationException {
String className = conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
return ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class,
- CompressionContext.class }, new Object[] { conf, compression });
+ CompressionContext.class, TagCompressionContext.class }, new Object[] { conf, compression,
+ tagCompressionContext });
}
public interface ByteStringCompressor {
@@ -141,9 +151,13 @@ public class WALCellCodec implements Codec {
static class CompressedKvEncoder extends BaseEncoder {
private final CompressionContext compression;
- public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
+ private final TagCompressionContext tagCompressionContext;
+
+ public CompressedKvEncoder(OutputStream out, CompressionContext compression,
+ TagCompressionContext tagCompressionContext) {
super(out);
this.compression = compression;
+ this.tagCompressionContext = tagCompressionContext;
}
@Override
@@ -157,7 +171,8 @@ public class WALCellCodec implements Codec {
StreamUtils.writeRawVInt32(out, kv.getKeyLength());
StreamUtils.writeRawVInt32(out, kv.getValueLength());
// To support tags
- StreamUtils.writeRawVInt32(out, kv.getTagsLength());
+ short tagsLength = kv.getTagsLength();
+ StreamUtils.writeRawVInt32(out, tagsLength);
// Write row, qualifier, and family; use dictionary
// compression as they're likely to have duplicates.
@@ -165,11 +180,24 @@ public class WALCellCodec implements Codec {
write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
- // Write the rest uncompressed.
+ // Write timestamp, type and value as uncompressed.
int pos = kv.getTimestampOffset();
- int remainingLength = kv.getLength() + offset - pos;
- out.write(kvBuffer, pos, remainingLength);
+ int tsTypeValLen = kv.getLength() + offset - pos;
+ if (tagsLength > 0) {
+ tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ }
+ out.write(kvBuffer, pos, tsTypeValLen);
+ if (tagsLength > 0) {
+ if (tagCompressionContext != null) {
+ // Write tags using Dictionary compression
+ tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(), tagsLength);
+ } else {
+ // Tag compression is disabled within the WAL compression. Just write the tags bytes as
+ // it is.
+ out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
+ }
+ }
}
private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
@@ -189,9 +217,13 @@ public class WALCellCodec implements Codec {
static class CompressedKvDecoder extends BaseDecoder {
private final CompressionContext compression;
- public CompressedKvDecoder(InputStream in, CompressionContext compression) {
+ private final TagCompressionContext tagCompressionContext;
+
+ public CompressedKvDecoder(InputStream in, CompressionContext compression,
+ TagCompressionContext tagCompressionContext) {
super(in);
this.compression = compression;
+ this.tagCompressionContext = tagCompressionContext;
}
@Override
@@ -199,7 +231,7 @@ public class WALCellCodec implements Codec {
int keylength = StreamUtils.readRawVarint32(in);
int vlength = StreamUtils.readRawVarint32(in);
- int tagsLength = StreamUtils.readRawVarint32(in);
+ short tagsLength = (short) StreamUtils.readRawVarint32(in);
int length = 0;
if(tagsLength == 0) {
length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
@@ -228,8 +260,23 @@ public class WALCellCodec implements Codec {
elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
pos += elemLen;
- // the rest
- IOUtils.readFully(in, backingArray, pos, length - pos);
+ // timestamp, type and value
+ int tsTypeValLen = length - pos;
+ if (tagsLength > 0) {
+ tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ }
+ IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
+ pos += tsTypeValLen;
+
+ // tags
+ if (tagsLength > 0) {
+ pos = Bytes.putShort(backingArray, pos, tagsLength);
+ if (tagCompressionContext != null) {
+ this.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
+ } else {
+ IOUtils.readFully(in, backingArray, pos, tagsLength);
+ }
+ }
return new KeyValue(backingArray, 0, length);
}
@@ -275,14 +322,14 @@ public class WALCellCodec implements Codec {
@Override
public Decoder getDecoder(InputStream is) {
- return (compression == null)
- ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
+ return (compression == null) ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(
+ is, compression, tagCompressionContext);
}
@Override
public Encoder getEncoder(OutputStream os) {
- return (compression == null)
- ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
+ return (compression == null) ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os,
+ compression, tagCompressionContext);
}
public ByteStringCompressor getByteStringCompressor() {
@@ -294,80 +341,4 @@ public class WALCellCodec implements Codec {
// TODO: ideally this should also encapsulate compressionContext
return this.statelessUncompressor;
}
-
- /**
- * It seems like as soon as somebody sets himself to the task of creating VInt encoding,
- * his mind blanks out for a split-second and he starts the work by wrapping it in the
- * most convoluted interface he can come up with. Custom streams that allocate memory,
- * DataOutput that is only used to write single bytes... We operate on simple streams.
- * Thus, we are going to have a simple implementation copy-pasted from protobuf Coded*Stream.
- */
- private static class StreamUtils {
- public static int computeRawVarint32Size(final int value) {
- if ((value & (0xffffffff << 7)) == 0) return 1;
- if ((value & (0xffffffff << 14)) == 0) return 2;
- if ((value & (0xffffffff << 21)) == 0) return 3;
- if ((value & (0xffffffff << 28)) == 0) return 4;
- return 5;
- }
-
- static void writeRawVInt32(OutputStream output, int value) throws IOException {
- assert value >= 0;
- while (true) {
- if ((value & ~0x7F) == 0) {
- output.write(value);
- return;
- } else {
- output.write((value & 0x7F) | 0x80);
- value >>>= 7;
- }
- }
- }
-
- static int readRawVarint32(InputStream input) throws IOException {
- byte tmp = (byte)input.read();
- if (tmp >= 0) {
- return tmp;
- }
- int result = tmp & 0x7f;
- if ((tmp = (byte)input.read()) >= 0) {
- result |= tmp << 7;
- } else {
- result |= (tmp & 0x7f) << 7;
- if ((tmp = (byte)input.read()) >= 0) {
- result |= tmp << 14;
- } else {
- result |= (tmp & 0x7f) << 14;
- if ((tmp = (byte)input.read()) >= 0) {
- result |= tmp << 21;
- } else {
- result |= (tmp & 0x7f) << 21;
- result |= (tmp = (byte)input.read()) << 28;
- if (tmp < 0) {
- // Discard upper 32 bits.
- for (int i = 0; i < 5; i++) {
- if (input.read() >= 0) {
- return result;
- }
- }
- throw new IOException("Malformed varint");
- }
- }
- }
- }
- return result;
- }
-
- static short toShort(byte hi, byte lo) {
- short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
- Preconditions.checkArgument(s >= 0);
- return s;
- }
-
- static void writeShort(OutputStream out, short v) throws IOException {
- Preconditions.checkArgument(v >= 0);
- out.write((byte)(0xff & (v >> 8)));
- out.write((byte)(0xff & v));
- }
- }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
index 6447b4f..1c539a9 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.FSUtils;
/**
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
index 06f0df5..84eeb88 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
@@ -28,6 +28,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java
index b992aca..eec44fb 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -36,8 +37,9 @@ public class TestCustomWALCellCodec {
public Configuration conf;
public CompressionContext context;
- public CustomWALCellCodec(Configuration conf, CompressionContext compression) {
- super(conf, compression);
+ public CustomWALCellCodec(Configuration conf, CompressionContext compression,
+ TagCompressionContext tagCompressionContext) {
+ super(conf, compression, tagCompressionContext);
this.conf = conf;
this.context = compression;
}
@@ -53,7 +55,7 @@ public class TestCustomWALCellCodec {
Configuration conf = new Configuration(false);
conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, CustomWALCellCodec.class,
WALCellCodec.class);
- CustomWALCellCodec codec = (CustomWALCellCodec) WALCellCodec.create(conf, null);
+ CustomWALCellCodec codec = (CustomWALCellCodec) WALCellCodec.create(conf, null, null);
assertEquals("Custom codec didn't get initialized with the right configuration!", conf,
codec.conf);
assertEquals("Custom codec didn't get initialized with the right compression context!", null,
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
index afd0589..58d0b7c 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.Test;
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
deleted file mode 100644
index a669823..0000000
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import static org.junit.Assert.*;
-
-import java.math.BigInteger;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Tests LRUDictionary
- */
-@Category(SmallTests.class)
-public class TestLRUDictionary {
- LRUDictionary testee;
-
- @Before
- public void setUp() throws Exception {
- testee = new LRUDictionary();
- testee.init(Short.MAX_VALUE);
- }
-
- @Test
- public void TestContainsNothing() {
- assertTrue(isDictionaryEmpty(testee));
- }
-
- /**
- * Assert can't add empty array.
- */
- @Test
- public void testPassingEmptyArrayToFindEntry() {
- assertEquals(Dictionary.NOT_IN_DICTIONARY,
- testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
- assertEquals(Dictionary.NOT_IN_DICTIONARY,
- testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
- }
-
- @Test
- public void testPassingSameArrayToAddEntry() {
- // Add random predefined byte array, in this case a random byte array from
- // HConstants. Assert that when we add, we get new index. Thats how it
- // works.
- int len = HConstants.CATALOG_FAMILY.length;
- int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
- assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
- assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
- }
-
- @Test
- public void testBasic() {
- Random rand = new Random();
- byte[] testBytes = new byte[10];
- rand.nextBytes(testBytes);
-
- // Verify that our randomly generated array doesn't exist in the dictionary
- assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1);
-
- // now since we looked up an entry, we should have added it to the
- // dictionary, so it isn't empty
-
- assertFalse(isDictionaryEmpty(testee));
-
- // Check if we can find it using findEntry
- short t = testee.findEntry(testBytes, 0, testBytes.length);
-
- // Making sure we do find what we're looking for
- assertTrue(t != -1);
-
- byte[] testBytesCopy = new byte[20];
-
- Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length);
-
- // copy byte arrays, make sure that we check that equal byte arrays are
- // equal without just checking the reference
- assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t);
-
- // make sure the entry retrieved is the same as the one put in
- assertTrue(Arrays.equals(testBytes, testee.getEntry(t)));
-
- testee.clear();
-
- // making sure clear clears the dictionary
- assertTrue(isDictionaryEmpty(testee));
- }
-
- @Test
- public void TestLRUPolicy(){
- //start by filling the dictionary up with byte arrays
- for (int i = 0; i < Short.MAX_VALUE; i++) {
- testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
- (BigInteger.valueOf(i)).toByteArray().length);
- }
-
- // check we have the first element added
- assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
- BigInteger.ZERO.toByteArray().length) != -1);
-
- // check for an element we know isn't there
- assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
- BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1);
-
- // since we just checked for this element, it should be there now.
- assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
- BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1);
-
- // test eviction, that the least recently added or looked at element is
- // evicted. We looked at ZERO so it should be in the dictionary still.
- assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
- BigInteger.ZERO.toByteArray().length) != -1);
- // Now go from beyond 1 to the end.
- for(int i = 1; i < Short.MAX_VALUE; i++) {
- assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
- BigInteger.valueOf(i).toByteArray().length) == -1);
- }
-
- // check we can find all of these.
- for (int i = 0; i < Short.MAX_VALUE; i++) {
- assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
- BigInteger.valueOf(i).toByteArray().length) != -1);
- }
- }
-
- static private boolean isDictionaryEmpty(LRUDictionary dict) {
- try {
- dict.getEntry((short)0);
- return false;
- } catch (IndexOutOfBoundsException ioobe) {
- return true;
- }
- }
-}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
index e3c1cc8..76005c3 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.codec.Codec.Decoder;
import org.apache.hadoop.hbase.codec.Codec.Encoder;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -42,8 +44,17 @@ public class TestWALCellCodecWithCompression {
@Test
public void testEncodeDecodeKVsWithTags() throws Exception {
+ doTest(null);
+ }
+
+ @Test
+ public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception {
+ doTest(new TagCompressionContext(LRUDictionary.class));
+ }
+
+ private void doTest(TagCompressionContext ctx) throws Exception {
WALCellCodec codec = new WALCellCodec(new Configuration(false), new CompressionContext(
- LRUDictionary.class, false));
+ LRUDictionary.class, false), ctx);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
Encoder encoder = codec.getEncoder(bos);
encoder.write(createKV(1));