diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
new file mode 100644
index 0000000..3188fed
--- /dev/null
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This
+ * will be used for compressing tags while writing into WALs.
+ */
+@InterfaceAudience.Private
+public class TagCompressionContext {
+ private final Dictionary tagDict;
+
+ public TagCompressionContext(Class extends Dictionary> dictType) throws SecurityException,
+ NoSuchMethodException, InstantiationException, IllegalAccessException,
+ InvocationTargetException {
+ Constructor extends Dictionary> dictConstructor = dictType.getConstructor();
+ tagDict = dictConstructor.newInstance();
+ tagDict.init(Short.MAX_VALUE);
+ }
+
+ public void clear() {
+ tagDict.clear();
+ }
+
+ /**
+ * Compress tags one by one and writes the OutputStream.
+ * @param out Stream to which the compressed tags to be written
+ * @param in Source where tags are available
+ * @param offset Offset for the tags bytes
+ * @param length Length of all tag bytes
+ * @throws IOException
+ */
+ public void compressTags(OutputStream out, byte[] in, int offset, short length)
+ throws IOException {
+ int pos = offset;
+ int endOffset = pos + length;
+ assert pos < endOffset;
+ while (pos < endOffset) {
+ short tagLen = Bytes.toShort(in, pos);
+ pos += Tag.TAG_LENGTH_SIZE;
+ write(in, pos, tagLen, out);
+ pos += tagLen;
+ }
+ }
+
+ /**
+ * Uncompress tags from the InputStream and writes to the destination array.
+ * @param src Stream where the compressed tags are available
+ * @param dest Destination array where to write the uncompressed tags
+ * @param offset Offset in destination where tags to be written
+ * @param length Length of all tag bytes
+ * @throws IOException
+ */
+ public void uncompressTags(InputStream src, byte[] dest, int offset, short length)
+ throws IOException {
+ int endOffset = offset + length;
+ while (offset < endOffset) {
+ byte status = (byte) src.read();
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ // We are writing short as tagLen. So can downcast this without any risk.
+ short tagLen = (short) StreamUtils.readRawVarint32(src);
+ offset = Bytes.putShort(dest, offset, tagLen);
+ IOUtils.readFully(src, dest, offset, tagLen);
+ tagDict.addEntry(dest, offset, tagLen);
+ offset += tagLen;
+ } else {
+ short dictIdx = StreamUtils.toShort(status, (byte) src.read());
+ byte[] entry = tagDict.getEntry(dictIdx);
+ if (entry == null) {
+ throw new IOException("Missing dictionary entry for index " + dictIdx);
+ }
+ offset = Bytes.putShort(dest, offset, (short) entry.length);
+ System.arraycopy(entry, 0, dest, offset, entry.length);
+ offset += entry.length;
+ }
+ }
+ }
+
+ private void write(byte[] data, int offset, short length, OutputStream out) throws IOException {
+ short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+ if (tagDict != null) {
+ dictIdx = tagDict.findEntry(data, offset, length);
+ }
+ if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+ out.write(Dictionary.NOT_IN_DICTIONARY);
+ StreamUtils.writeRawVInt32(out, length);
+ out.write(data, offset, length);
+ } else {
+ StreamUtils.writeShort(out, dictIdx);
+ }
+ }
+}
\ No newline at end of file
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
new file mode 100644
index 0000000..ee034ed
--- /dev/null
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Dictionary interface
+ *
+ * Dictionary indexes should be either bytes or shorts, only positive. (The
+ * first bit is reserved for detecting whether something is compressed or not).
+ */
+@InterfaceAudience.Private
+public interface Dictionary {
+ byte NOT_IN_DICTIONARY = -1;
+
+ void init(int initialSize);
+ /**
+ * Gets an entry from the dictionary.
+ *
+ * @param idx index of the entry
+ * @return the entry, or null if non existent
+ */
+ byte[] getEntry(short idx);
+
+ /**
+ * Finds the index of an entry.
+ * If no entry found, we add it.
+ *
+ * @param data the byte array that we're looking up
+ * @param offset Offset into data to add to Dictionary.
+ * @param length Length beyond offset that comprises entry; must be > 0.
+ * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
+ */
+ short findEntry(byte[] data, int offset, int length);
+
+ /**
+ * Adds an entry to the dictionary.
+ * Be careful using this method. It will add an entry to the
+ * dictionary even if it already has an entry for the same data.
+ * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating
+ * dictionary entries.
+ *
+ * @param data the entry to add
+ * @param offset Offset into data to add to Dictionary.
+ * @param length Length beyond offset that comprises entry; must be > 0.
+ * @return the index of the entry
+ */
+
+ short addEntry(byte[] data, int offset, int length);
+
+ /**
+ * Flushes the dictionary, empties all values.
+ */
+ void clear();
+}
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
new file mode 100644
index 0000000..0424925
--- /dev/null
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * WALDictionary using an LRU eviction algorithm. Uses a linked list running
+ * through a hashtable. Currently has max of 2^15 entries. Will start
+ * evicting if exceeds this number The maximum memory we expect this dictionary
+ * to take in the worst case is about:
+ * (2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes
+ * (these are some big names) = ~16MB.
+ * If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes.
+ */
+@InterfaceAudience.Private
+public class LRUDictionary implements Dictionary {
+
+ BidirectionalLRUMap backingStore;
+ @Override
+ public byte[] getEntry(short idx) {
+ return backingStore.get(idx);
+ }
+
+ @Override
+ public void init(int initialSize) {
+ backingStore = new BidirectionalLRUMap(initialSize);
+ }
+ @Override
+ public short findEntry(byte[] data, int offset, int length) {
+ short ret = backingStore.findIdx(data, offset, length);
+ if (ret == NOT_IN_DICTIONARY) {
+ addEntry(data, offset, length);
+ }
+ return ret;
+ }
+
+ @Override
+ public short addEntry(byte[] data, int offset, int length) {
+ if (length <= 0) return NOT_IN_DICTIONARY;
+ return backingStore.put(data, offset, length);
+ }
+
+ @Override
+ public void clear() {
+ backingStore.clear();
+ }
+
+ /*
+ * Internal class used to implement LRU eviction and dual lookup (by key and
+ * value).
+ *
+ * This is not thread safe. Don't use in multi-threaded applications.
+ */
+ static class BidirectionalLRUMap {
+ private int currSize = 0;
+
+ // Head and tail of the LRU list.
+ private Node head;
+ private Node tail;
+
+ private HashMap nodeToIndex = new HashMap();
+ private Node[] indexToNode;
+ private int initSize = 0;
+
+ public BidirectionalLRUMap(int initialSize) {
+ initSize = initialSize;
+ indexToNode = new Node[initialSize];
+ for (int i = 0; i < initialSize; i++) {
+ indexToNode[i] = new Node();
+ }
+ }
+
+ private short put(byte[] array, int offset, int length) {
+ // We copy the bytes we want, otherwise we might be holding references to
+ // massive arrays in our dictionary (or those arrays might change)
+ byte[] stored = new byte[length];
+ Bytes.putBytes(stored, 0, array, offset, length);
+
+ if (currSize < initSize) {
+ // There is space to add without evicting.
+ indexToNode[currSize].setContents(stored, 0, stored.length);
+ setHead(indexToNode[currSize]);
+ short ret = (short) currSize++;
+ nodeToIndex.put(indexToNode[ret], ret);
+ return ret;
+ } else {
+ short s = nodeToIndex.remove(tail);
+ tail.setContents(stored, 0, stored.length);
+ // we need to rehash this.
+ nodeToIndex.put(tail, s);
+ moveToHead(tail);
+ return s;
+ }
+ }
+
+ private short findIdx(byte[] array, int offset, int length) {
+ Short s;
+ final Node comparisonNode = new Node();
+ comparisonNode.setContents(array, offset, length);
+ if ((s = nodeToIndex.get(comparisonNode)) != null) {
+ moveToHead(indexToNode[s]);
+ return s;
+ } else {
+ return -1;
+ }
+ }
+
+ private byte[] get(short idx) {
+ Preconditions.checkElementIndex(idx, currSize);
+ moveToHead(indexToNode[idx]);
+ return indexToNode[idx].container;
+ }
+
+ private void moveToHead(Node n) {
+ if (head == n) {
+ // no-op -- it's already the head.
+ return;
+ }
+ // At this point we definitely have prev, since it's not the head.
+ assert n.prev != null;
+ // Unlink prev.
+ n.prev.next = n.next;
+
+ // Unlink next
+ if (n.next != null) {
+ n.next.prev = n.prev;
+ } else {
+ assert n == tail;
+ tail = n.prev;
+ }
+ // Node is now removed from the list. Re-add it at the head.
+ setHead(n);
+ }
+
+ private void setHead(Node n) {
+ // assume it's already unlinked from the list at this point.
+ n.prev = null;
+ n.next = head;
+ if (head != null) {
+ assert head.prev == null;
+ head.prev = n;
+ }
+
+ head = n;
+
+ // First entry
+ if (tail == null) {
+ tail = n;
+ }
+ }
+
+ private void clear() {
+ currSize = 0;
+ nodeToIndex.clear();
+ tail = null;
+ head = null;
+
+ for (Node n : indexToNode) {
+ n.container = null;
+ }
+
+ for (int i = 0; i < initSize; i++) {
+ indexToNode[i].next = null;
+ indexToNode[i].prev = null;
+ }
+ }
+
+ private static class Node {
+ byte[] container;
+ int offset;
+ int length;
+ Node next; // link towards the tail
+ Node prev; // link towards the head
+
+ public Node() {
+ }
+
+ private void setContents(byte[] container, int offset, int length) {
+ this.container = container;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public int hashCode() {
+ return Bytes.hashCode(container, offset, length);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Node)) {
+ return false;
+ }
+
+ Node casted = (Node) other;
+ return Bytes.equals(container, offset, length, casted.container,
+ casted.offset, casted.length);
+ }
+ }
+ }
+}
diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
new file mode 100644
index 0000000..9d6aec4
--- /dev/null
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/*
+ * It seems like as soon as somebody sets himself to the task of creating VInt encoding, his mind
+ * blanks out for a split-second and he starts the work by wrapping it in the most convoluted
+ * interface he can come up with. Custom streams that allocate memory, DataOutput that is only used
+ * to write single bytes... We operate on simple streams. Thus, we are going to have a simple
+ * implementation copy-pasted from protobuf Coded*Stream.
+ */
+@InterfaceAudience.Private
+public class StreamUtils {
+
+ public static void writeRawVInt32(OutputStream output, int value) throws IOException {
+ assert value >= 0;
+ while (true) {
+ if ((value & ~0x7F) == 0) {
+ output.write(value);
+ return;
+ } else {
+ output.write((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ }
+ }
+
+ public static int readRawVarint32(InputStream input) throws IOException {
+ byte tmp = (byte) input.read();
+ if (tmp >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = (byte) input.read()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = (byte) input.read()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = (byte) input.read()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = (byte) input.read()) << 28;
+ if (tmp < 0) {
+ // Discard upper 32 bits.
+ for (int i = 0; i < 5; i++) {
+ if (input.read() >= 0) {
+ return result;
+ }
+ }
+ throw new IOException("Malformed varint");
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ public static int readRawVarint32(ByteBuffer input) throws IOException {
+ byte tmp = input.get();
+ if (tmp >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = input.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = input.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = input.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = input.get()) << 28;
+ if (tmp < 0) {
+ // Discard upper 32 bits.
+ for (int i = 0; i < 5; i++) {
+ if (input.get() >= 0) {
+ return result;
+ }
+ }
+ throw new IOException("Malformed varint");
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ public static short toShort(byte hi, byte lo) {
+ short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
+ Preconditions.checkArgument(s >= 0);
+ return s;
+ }
+
+ public static void writeShort(OutputStream out, short v) throws IOException {
+ Preconditions.checkArgument(v >= 0);
+ out.write((byte) (0xff & (v >> 8)));
+ out.write((byte) (0xff & v));
+ }
+}
\ No newline at end of file
diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
new file mode 100644
index 0000000..871c6fc
--- /dev/null
+++ hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.util;
+
+import static org.junit.Assert.*;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests LRUDictionary
+ */
+@Category(SmallTests.class)
+public class TestLRUDictionary {
+ LRUDictionary testee;
+
+ @Before
+ public void setUp() throws Exception {
+ testee = new LRUDictionary();
+ testee.init(Short.MAX_VALUE);
+ }
+
+ @Test
+ public void TestContainsNothing() {
+ assertTrue(isDictionaryEmpty(testee));
+ }
+
+ /**
+ * Assert can't add empty array.
+ */
+ @Test
+ public void testPassingEmptyArrayToFindEntry() {
+ assertEquals(Dictionary.NOT_IN_DICTIONARY,
+ testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+ assertEquals(Dictionary.NOT_IN_DICTIONARY,
+ testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+ }
+
+ @Test
+ public void testPassingSameArrayToAddEntry() {
+ // Add random predefined byte array, in this case a random byte array from
+ // HConstants. Assert that when we add, we get new index. Thats how it
+ // works.
+ int len = HConstants.CATALOG_FAMILY.length;
+ int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+ }
+
+ @Test
+ public void testBasic() {
+ Random rand = new Random();
+ byte[] testBytes = new byte[10];
+ rand.nextBytes(testBytes);
+
+ // Verify that our randomly generated array doesn't exist in the dictionary
+ assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1);
+
+ // now since we looked up an entry, we should have added it to the
+ // dictionary, so it isn't empty
+
+ assertFalse(isDictionaryEmpty(testee));
+
+ // Check if we can find it using findEntry
+ short t = testee.findEntry(testBytes, 0, testBytes.length);
+
+ // Making sure we do find what we're looking for
+ assertTrue(t != -1);
+
+ byte[] testBytesCopy = new byte[20];
+
+ Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length);
+
+ // copy byte arrays, make sure that we check that equal byte arrays are
+ // equal without just checking the reference
+ assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t);
+
+ // make sure the entry retrieved is the same as the one put in
+ assertTrue(Arrays.equals(testBytes, testee.getEntry(t)));
+
+ testee.clear();
+
+ // making sure clear clears the dictionary
+ assertTrue(isDictionaryEmpty(testee));
+ }
+
+ @Test
+ public void TestLRUPolicy(){
+ //start by filling the dictionary up with byte arrays
+ for (int i = 0; i < Short.MAX_VALUE; i++) {
+ testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
+ (BigInteger.valueOf(i)).toByteArray().length);
+ }
+
+ // check we have the first element added
+ assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+ BigInteger.ZERO.toByteArray().length) != -1);
+
+ // check for an element we know isn't there
+ assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+ BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1);
+
+ // since we just checked for this element, it should be there now.
+ assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+ BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1);
+
+ // test eviction, that the least recently added or looked at element is
+ // evicted. We looked at ZERO so it should be in the dictionary still.
+ assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+ BigInteger.ZERO.toByteArray().length) != -1);
+ // Now go from beyond 1 to the end.
+ for(int i = 1; i < Short.MAX_VALUE; i++) {
+ assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+ BigInteger.valueOf(i).toByteArray().length) == -1);
+ }
+
+ // check we can find all of these.
+ for (int i = 0; i < Short.MAX_VALUE; i++) {
+ assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+ BigInteger.valueOf(i).toByteArray().length) != -1);
+ }
+ }
+
+ static private boolean isDictionaryEmpty(LRUDictionary dict) {
+ try {
+ dict.getEntry((short)0);
+ return false;
+ } catch (IndexOutOfBoundsException ioobe) {
+ return true;
+ }
+ }
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index a76cafa..18f36b8 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -20,21 +20,31 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.TagCompressionContext;
+import org.apache.hadoop.hbase.io.util.Dictionary;
/**
* Context that holds the various dictionaries for compression in HLog.
*/
@InterfaceAudience.Private
class CompressionContext {
+
+ static final String ENABLE_WAL_TAGS_COMPRESSION =
+ "hbase.regionserver.wal.tags.enablecompression";
+
final Dictionary regionDict;
final Dictionary tableDict;
final Dictionary familyDict;
final Dictionary qualifierDict;
final Dictionary rowDict;
+ // Context used for compressing tags
+ TagCompressionContext tagCompressionContext = null;
- public CompressionContext(Class extends Dictionary> dictType, boolean recoveredEdits)
- throws SecurityException, NoSuchMethodException, InstantiationException,
+ public CompressionContext(Class extends Dictionary> dictType, boolean recoveredEdits,
+ Configuration conf) throws SecurityException, NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException {
Constructor extends Dictionary> dictConstructor =
dictType.getConstructor();
@@ -54,6 +64,9 @@ class CompressionContext {
rowDict.init(Short.MAX_VALUE);
familyDict.init(Byte.MAX_VALUE);
qualifierDict.init(Byte.MAX_VALUE);
+ if (conf != null && conf.getBoolean(ENABLE_WAL_TAGS_COMPRESSION, true)) {
+ tagCompressionContext = new TagCompressionContext(dictType);
+ }
}
void clear() {
@@ -62,5 +75,8 @@ class CompressionContext {
familyDict.clear();
qualifierDict.clear();
rowDict.clear();
+ if (tagCompressionContext != null) {
+ tagCompressionContext.clear();
+ }
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index f83217a..04046f1 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
deleted file mode 100644
index fd1c264..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * Dictionary interface
- *
- * Dictionary indexes should be either bytes or shorts, only positive. (The
- * first bit is reserved for detecting whether something is compressed or not).
- */
-@InterfaceAudience.Private
-interface Dictionary {
- byte NOT_IN_DICTIONARY = -1;
-
- void init(int initialSize);
- /**
- * Gets an entry from the dictionary.
- *
- * @param idx index of the entry
- * @return the entry, or null if non existent
- */
- byte[] getEntry(short idx);
-
- /**
- * Finds the index of an entry.
- * If no entry found, we add it.
- *
- * @param data the byte array that we're looking up
- * @param offset Offset into data to add to Dictionary.
- * @param length Length beyond offset that comprises entry; must be > 0.
- * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
- */
- short findEntry(byte[] data, int offset, int length);
-
- /**
- * Adds an entry to the dictionary.
- * Be careful using this method. It will add an entry to the
- * dictionary even if it already has an entry for the same data.
- * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating
- * dictionary entries.
- *
- * @param data the entry to add
- * @param offset Offset into data to add to Dictionary.
- * @param length Length beyond offset that comprises entry; must be > 0.
- * @return the index of the entry
- */
-
- short addEntry(byte[] data, int offset, int length);
-
- /**
- * Flushes the dictionary, empties all values.
- */
- void clear();
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
deleted file mode 100644
index 1bea447..0000000
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.util.HashMap;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.base.Preconditions;
-
-/**
- * WALDictionary using an LRU eviction algorithm. Uses a linked list running
- * through a hashtable. Currently has max of 2^15 entries. Will start
- * evicting if exceeds this number The maximum memory we expect this dictionary
- * to take in the worst case is about:
- * (2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes (these are some big names) = ~16MB.
- * If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes.
- */
-@InterfaceAudience.Private
-public class LRUDictionary implements Dictionary {
-
- BidirectionalLRUMap backingStore;
- @Override
- public byte[] getEntry(short idx) {
- return backingStore.get(idx);
- }
-
- @Override
- public void init(int initialSize) {
- backingStore = new BidirectionalLRUMap(initialSize);
- }
- @Override
- public short findEntry(byte[] data, int offset, int length) {
- short ret = backingStore.findIdx(data, offset, length);
- if (ret == NOT_IN_DICTIONARY) {
- addEntry(data, offset, length);
- }
- return ret;
- }
-
- @Override
- public short addEntry(byte[] data, int offset, int length) {
- if (length <= 0) return NOT_IN_DICTIONARY;
- return backingStore.put(data, offset, length);
- }
-
- @Override
- public void clear() {
- backingStore.clear();
- }
-
- /*
- * Internal class used to implement LRU eviction and dual lookup (by key and
- * value).
- *
- * This is not thread safe. Don't use in multi-threaded applications.
- */
- static class BidirectionalLRUMap {
- private int currSize = 0;
-
- // Head and tail of the LRU list.
- private Node head;
- private Node tail;
-
- private HashMap nodeToIndex = new HashMap();
- private Node[] indexToNode;
- private int initSize = 0;
-
- public BidirectionalLRUMap(int initialSize) {
- initSize = initialSize;
- indexToNode = new Node[initialSize];
- for (int i = 0; i < initialSize; i++) {
- indexToNode[i] = new Node();
- }
- }
-
- private short put(byte[] array, int offset, int length) {
- // We copy the bytes we want, otherwise we might be holding references to
- // massive arrays in our dictionary (or those arrays might change)
- byte[] stored = new byte[length];
- Bytes.putBytes(stored, 0, array, offset, length);
-
- if (currSize < initSize) {
- // There is space to add without evicting.
- indexToNode[currSize].setContents(stored, 0, stored.length);
- setHead(indexToNode[currSize]);
- short ret = (short) currSize++;
- nodeToIndex.put(indexToNode[ret], ret);
- return ret;
- } else {
- short s = nodeToIndex.remove(tail);
- tail.setContents(stored, 0, stored.length);
- // we need to rehash this.
- nodeToIndex.put(tail, s);
- moveToHead(tail);
- return s;
- }
- }
-
- private short findIdx(byte[] array, int offset, int length) {
- Short s;
- final Node comparisonNode = new Node();
- comparisonNode.setContents(array, offset, length);
- if ((s = nodeToIndex.get(comparisonNode)) != null) {
- moveToHead(indexToNode[s]);
- return s;
- } else {
- return -1;
- }
- }
-
- private byte[] get(short idx) {
- Preconditions.checkElementIndex(idx, currSize);
- moveToHead(indexToNode[idx]);
- return indexToNode[idx].container;
- }
-
- private void moveToHead(Node n) {
- if (head == n) {
- // no-op -- it's already the head.
- return;
- }
- // At this point we definitely have prev, since it's not the head.
- assert n.prev != null;
- // Unlink prev.
- n.prev.next = n.next;
-
- // Unlink next
- if (n.next != null) {
- n.next.prev = n.prev;
- } else {
- assert n == tail;
- tail = n.prev;
- }
- // Node is now removed from the list. Re-add it at the head.
- setHead(n);
- }
-
- private void setHead(Node n) {
- // assume it's already unlinked from the list at this point.
- n.prev = null;
- n.next = head;
- if (head != null) {
- assert head.prev == null;
- head.prev = n;
- }
-
- head = n;
-
- // First entry
- if (tail == null) {
- tail = n;
- }
- }
-
- private void clear() {
- currSize = 0;
- nodeToIndex.clear();
- tail = null;
- head = null;
-
- for (Node n : indexToNode) {
- n.container = null;
- }
-
- for (int i = 0; i < initSize; i++) {
- indexToNode[i].next = null;
- indexToNode[i].prev = null;
- }
- }
-
- private static class Node {
- byte[] container;
- int offset;
- int length;
- Node next; // link towards the tail
- Node prev; // link towards the head
-
- public Node() {
- }
-
- private void setContents(byte[] container, int offset, int length) {
- this.container = container;
- this.offset = offset;
- this.length = length;
- }
-
- @Override
- public int hashCode() {
- return Bytes.hashCode(container, offset, length);
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof Node)) {
- return false;
- }
-
- Node casted = (Node) other;
- return Bytes.equals(container, offset, length, casted.container,
- casted.offset, casted.length);
- }
- }
- }
-}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index 72db786..4f7204e 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -73,7 +74,7 @@ public abstract class ReaderBase implements HLog.Reader {
try {
if (compressionContext == null) {
compressionContext = new CompressionContext(LRUDictionary.class,
- FSUtils.isRecoveredEdits(path));
+ FSUtils.isRecoveredEdits(path), conf);
} else {
compressionContext.clear();
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 4f73f32..b749991 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -29,11 +29,12 @@ import org.apache.hadoop.hbase.codec.BaseDecoder;
import org.apache.hadoop.hbase.codec.BaseEncoder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.io.IOUtils;
-import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
@@ -157,7 +158,8 @@ public class WALCellCodec implements Codec {
StreamUtils.writeRawVInt32(out, kv.getKeyLength());
StreamUtils.writeRawVInt32(out, kv.getValueLength());
// To support tags
- StreamUtils.writeRawVInt32(out, kv.getTagsLength());
+ short tagsLength = kv.getTagsLength();
+ StreamUtils.writeRawVInt32(out, tagsLength);
// Write row, qualifier, and family; use dictionary
// compression as they're likely to have duplicates.
@@ -165,11 +167,25 @@ public class WALCellCodec implements Codec {
write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
- // Write the rest uncompressed.
+ // Write timestamp, type and value as uncompressed.
int pos = kv.getTimestampOffset();
- int remainingLength = kv.getLength() + offset - pos;
- out.write(kvBuffer, pos, remainingLength);
-
+ int tsTypeValLen = kv.getLength() + offset - pos;
+ if (tagsLength > 0) {
+ tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ }
+ assert tsTypeValLen > 0;
+ out.write(kvBuffer, pos, tsTypeValLen);
+ if (tagsLength > 0) {
+ if (compression.tagCompressionContext != null) {
+ // Write tags using Dictionary compression
+ compression.tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(),
+ tagsLength);
+ } else {
+ // Tag compression is disabled within the WAL compression. Just write the tags bytes as
+ // it is.
+ out.write(kvBuffer, kv.getTagsOffset(), tagsLength);
+ }
+ }
}
private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
@@ -199,7 +215,7 @@ public class WALCellCodec implements Codec {
int keylength = StreamUtils.readRawVarint32(in);
int vlength = StreamUtils.readRawVarint32(in);
- int tagsLength = StreamUtils.readRawVarint32(in);
+ short tagsLength = (short) StreamUtils.readRawVarint32(in);
int length = 0;
if(tagsLength == 0) {
length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
@@ -228,8 +244,23 @@ public class WALCellCodec implements Codec {
elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
pos += elemLen;
- // the rest
- IOUtils.readFully(in, backingArray, pos, length - pos);
+ // timestamp, type and value
+ int tsTypeValLen = length - pos;
+ if (tagsLength > 0) {
+ tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
+ }
+ IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
+ pos += tsTypeValLen;
+
+ // tags
+ if (tagsLength > 0) {
+ pos = Bytes.putShort(backingArray, pos, tagsLength);
+ if (compression.tagCompressionContext != null) {
+ compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
+ } else {
+ IOUtils.readFully(in, backingArray, pos, tagsLength);
+ }
+ }
return new KeyValue(backingArray, 0, length);
}
@@ -294,80 +325,4 @@ public class WALCellCodec implements Codec {
// TODO: ideally this should also encapsulate compressionContext
return this.statelessUncompressor;
}
-
- /**
- * It seems like as soon as somebody sets himself to the task of creating VInt encoding,
- * his mind blanks out for a split-second and he starts the work by wrapping it in the
- * most convoluted interface he can come up with. Custom streams that allocate memory,
- * DataOutput that is only used to write single bytes... We operate on simple streams.
- * Thus, we are going to have a simple implementation copy-pasted from protobuf Coded*Stream.
- */
- private static class StreamUtils {
- public static int computeRawVarint32Size(final int value) {
- if ((value & (0xffffffff << 7)) == 0) return 1;
- if ((value & (0xffffffff << 14)) == 0) return 2;
- if ((value & (0xffffffff << 21)) == 0) return 3;
- if ((value & (0xffffffff << 28)) == 0) return 4;
- return 5;
- }
-
- static void writeRawVInt32(OutputStream output, int value) throws IOException {
- assert value >= 0;
- while (true) {
- if ((value & ~0x7F) == 0) {
- output.write(value);
- return;
- } else {
- output.write((value & 0x7F) | 0x80);
- value >>>= 7;
- }
- }
- }
-
- static int readRawVarint32(InputStream input) throws IOException {
- byte tmp = (byte)input.read();
- if (tmp >= 0) {
- return tmp;
- }
- int result = tmp & 0x7f;
- if ((tmp = (byte)input.read()) >= 0) {
- result |= tmp << 7;
- } else {
- result |= (tmp & 0x7f) << 7;
- if ((tmp = (byte)input.read()) >= 0) {
- result |= tmp << 14;
- } else {
- result |= (tmp & 0x7f) << 14;
- if ((tmp = (byte)input.read()) >= 0) {
- result |= tmp << 21;
- } else {
- result |= (tmp & 0x7f) << 21;
- result |= (tmp = (byte)input.read()) << 28;
- if (tmp < 0) {
- // Discard upper 32 bits.
- for (int i = 0; i < 5; i++) {
- if (input.read() >= 0) {
- return result;
- }
- }
- throw new IOException("Malformed varint");
- }
- }
- }
- }
- return result;
- }
-
- static short toShort(byte hi, byte lo) {
- short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
- Preconditions.checkArgument(s >= 0);
- return s;
- }
-
- static void writeShort(OutputStream out, short v) throws IOException {
- Preconditions.checkArgument(v >= 0);
- out.write((byte)(0xff & (v >> 8)));
- out.write((byte)(0xff & v));
- }
- }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
index 6447b4f..85ca4cc 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.FSUtils;
/**
@@ -40,7 +41,7 @@ public abstract class WriterBase implements HLog.Writer {
if (doCompress) {
try {
this.compressionContext = new CompressionContext(LRUDictionary.class,
- FSUtils.isRecoveredEdits(path));
+ FSUtils.isRecoveredEdits(path), conf);
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
index 06f0df5..84eeb88 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
@@ -28,6 +28,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.io.util.Dictionary;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
index afd0589..8aa3387 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.Test;
@@ -67,7 +68,7 @@ public class TestKeyValueCompression {
}
private void runTestCycle(List kvs) throws Exception {
- CompressionContext ctx = new CompressionContext(LRUDictionary.class, false);
+ CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
for (KeyValue kv : kvs) {
KeyValueCompression.writeKV(buf, kv, ctx);
@@ -84,7 +85,7 @@ public class TestKeyValueCompression {
@Test
public void testKVWithTags() throws Exception {
- CompressionContext ctx = new CompressionContext(LRUDictionary.class, false);
+ CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
KeyValueCompression.writeKV(buf, createKV(1), ctx);
KeyValueCompression.writeKV(buf, createKV(0), ctx);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
deleted file mode 100644
index a669823..0000000
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import static org.junit.Assert.*;
-
-import java.math.BigInteger;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Tests LRUDictionary
- */
-@Category(SmallTests.class)
-public class TestLRUDictionary {
- LRUDictionary testee;
-
- @Before
- public void setUp() throws Exception {
- testee = new LRUDictionary();
- testee.init(Short.MAX_VALUE);
- }
-
- @Test
- public void TestContainsNothing() {
- assertTrue(isDictionaryEmpty(testee));
- }
-
- /**
- * Assert can't add empty array.
- */
- @Test
- public void testPassingEmptyArrayToFindEntry() {
- assertEquals(Dictionary.NOT_IN_DICTIONARY,
- testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
- assertEquals(Dictionary.NOT_IN_DICTIONARY,
- testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
- }
-
- @Test
- public void testPassingSameArrayToAddEntry() {
- // Add random predefined byte array, in this case a random byte array from
- // HConstants. Assert that when we add, we get new index. Thats how it
- // works.
- int len = HConstants.CATALOG_FAMILY.length;
- int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
- assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
- assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
- }
-
- @Test
- public void testBasic() {
- Random rand = new Random();
- byte[] testBytes = new byte[10];
- rand.nextBytes(testBytes);
-
- // Verify that our randomly generated array doesn't exist in the dictionary
- assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1);
-
- // now since we looked up an entry, we should have added it to the
- // dictionary, so it isn't empty
-
- assertFalse(isDictionaryEmpty(testee));
-
- // Check if we can find it using findEntry
- short t = testee.findEntry(testBytes, 0, testBytes.length);
-
- // Making sure we do find what we're looking for
- assertTrue(t != -1);
-
- byte[] testBytesCopy = new byte[20];
-
- Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length);
-
- // copy byte arrays, make sure that we check that equal byte arrays are
- // equal without just checking the reference
- assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t);
-
- // make sure the entry retrieved is the same as the one put in
- assertTrue(Arrays.equals(testBytes, testee.getEntry(t)));
-
- testee.clear();
-
- // making sure clear clears the dictionary
- assertTrue(isDictionaryEmpty(testee));
- }
-
- @Test
- public void TestLRUPolicy(){
- //start by filling the dictionary up with byte arrays
- for (int i = 0; i < Short.MAX_VALUE; i++) {
- testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
- (BigInteger.valueOf(i)).toByteArray().length);
- }
-
- // check we have the first element added
- assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
- BigInteger.ZERO.toByteArray().length) != -1);
-
- // check for an element we know isn't there
- assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
- BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1);
-
- // since we just checked for this element, it should be there now.
- assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
- BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1);
-
- // test eviction, that the least recently added or looked at element is
- // evicted. We looked at ZERO so it should be in the dictionary still.
- assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
- BigInteger.ZERO.toByteArray().length) != -1);
- // Now go from beyond 1 to the end.
- for(int i = 1; i < Short.MAX_VALUE; i++) {
- assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
- BigInteger.valueOf(i).toByteArray().length) == -1);
- }
-
- // check we can find all of these.
- for (int i = 0; i < Short.MAX_VALUE; i++) {
- assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
- BigInteger.valueOf(i).toByteArray().length) != -1);
- }
- }
-
- static private boolean isDictionaryEmpty(LRUDictionary dict) {
- try {
- dict.getEntry((short)0);
- return false;
- } catch (IndexOutOfBoundsException ioobe) {
- return true;
- }
- }
-}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
index e3c1cc8..032368e 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.codec.Codec.Decoder;
import org.apache.hadoop.hbase.codec.Codec.Encoder;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -42,8 +43,19 @@ public class TestWALCellCodecWithCompression {
@Test
public void testEncodeDecodeKVsWithTags() throws Exception {
- WALCellCodec codec = new WALCellCodec(new Configuration(false), new CompressionContext(
- LRUDictionary.class, false));
+ doTest(false);
+ }
+
+ @Test
+ public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception {
+ doTest(true);
+ }
+
+ private void doTest(boolean compressTags) throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
+ WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false,
+ conf));
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
Encoder encoder = codec.getEncoder(bos);
encoder.write(createKV(1));