diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 48caea3..dbe3b30 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -805,6 +805,13 @@ public final class HConstants { public static final String ENABLE_WAL_COMPRESSION = "hbase.regionserver.wal.enablecompression"; + /** + * Configuration name of tags compression in HLog. Tag compression will happen only when this + * config is enabled along with HLog compression. + */ + public static final String ENABLE_WAL_TAGS_COMPRESSION = + "hbase.regionserver.wal.tags.enablecompression"; + /** Region in Transition metrics threshold time */ public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold"; diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java new file mode 100644 index 0000000..c0d1c8d --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.io.util.Dictionary; +import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IOUtils; + +/** + * Context that holds the dictionary for Tag compression and doing the compress/uncompress. This + * will be used for compressing tags while writing into HFile and WALs. + */ +@InterfaceAudience.Private +public class TagCompressionContext { + private final Dictionary tagDict; + + public TagCompressionContext(Class dictType) throws SecurityException, + NoSuchMethodException, InstantiationException, IllegalAccessException, + InvocationTargetException { + Constructor dictConstructor = dictType.getConstructor(); + tagDict = dictConstructor.newInstance(); + tagDict.init(Short.MAX_VALUE); + } + + public void clear() { + tagDict.clear(); + } + + /** + * + * @param out + * @param in + * @param tagsLength + * @throws IOException + */ + public void compressTags(OutputStream out, ByteBuffer in, short tagsLength) throws IOException { + if (in.hasArray()) { + compressTags(out, in.array(), in.arrayOffset() + in.position(), tagsLength); + ByteBufferUtils.skip(in, tagsLength); + } else { + byte[] tagBuf = new byte[tagsLength]; + in.get(tagBuf); + compressTags(out, tagBuf, 0, tagsLength); + } + } + + /** + * + * @param out + * @param in + * @param offset + * @param tagsLength + * @throws IOException + */ + public void compressTags(OutputStream out, byte[] in, int offset, short tagsLength) + throws IOException { + int pos = offset; + int endOffset = pos + tagsLength; + while (pos < endOffset) { + short tagLen = Bytes.toShort(in, pos); + pos += Tag.TAG_LENGTH_SIZE; + write(in, pos, tagLen, out); + pos += tagLen; + } + } + + /** + * + * @param src + * @param dest + * @param offset + * @param tagsLength + * @throws IOException + */ + public void uncompressTags(ByteBuffer src, byte[] dest, int offset, int tagsLength) + throws IOException { + int endOffset = offset + tagsLength; + while (offset < endOffset) { + byte status = src.get(); + short tagLen; + if (status == Dictionary.NOT_IN_DICTIONARY) { + // We are writing short as tagLen. So can downcast this without any risk. + tagLen = (short) StreamUtils.readRawVarint32(src); + offset = Bytes.putShort(dest, offset, tagLen); + src.get(dest, offset, tagLen); + tagDict.addEntry(dest, offset, tagLen); + offset += tagLen; + } else { + short dictIdx = StreamUtils.toShort(status, src.get()); + byte[] entry = tagDict.getEntry(dictIdx); + if (entry == null) { + throw new IOException("Missing dictionary entry for index " + dictIdx); + } + tagLen = (short) entry.length; + offset = Bytes.putShort(dest, offset, tagLen); + System.arraycopy(entry, 0, dest, offset, tagLen); + ByteBufferUtils.skip(src, tagLen); + offset += entry.length; + } + } + } + + /** + * + * @param src + * @param dest + * @param tagsLength + * @throws IOException + */ + public void uncompressTags(InputStream src, ByteBuffer dest, short tagsLength) throws IOException { + if (dest.hasArray()) { + uncompressTags(src, dest.array(), dest.arrayOffset() + dest.position(), tagsLength); + } else { + byte[] tagBuf = new byte[tagsLength]; + uncompressTags(src, tagBuf, 0, tagsLength); + dest.put(tagBuf); + } + } + + /** + * + * @param src + * @param dest + * @param offset + * @param tagsLength + * @throws IOException + */ + public void uncompressTags(InputStream src, byte[] dest, int offset, short tagsLength) + throws IOException { + int endOffset = offset + tagsLength; + while (offset < endOffset) { + byte status = (byte) src.read(); + if (status == Dictionary.NOT_IN_DICTIONARY) { + // We are writing short as tagLen. So can downcast this without any risk. + short tagLen = (short) StreamUtils.readRawVarint32(src); + offset = Bytes.putShort(dest, offset, tagLen); + IOUtils.readFully(src, dest, offset, tagLen); + tagDict.addEntry(dest, offset, tagLen); + offset += tagLen; + } else { + short dictIdx = StreamUtils.toShort(status, (byte) src.read()); + byte[] entry = tagDict.getEntry(dictIdx); + if (entry == null) { + throw new IOException("Missing dictionary entry for index " + dictIdx); + } + offset = Bytes.putShort(dest, offset, (short) entry.length); + System.arraycopy(entry, 0, dest, offset, entry.length); + offset += entry.length; + } + } + } + + private void write(byte[] data, int offset, short length, OutputStream out) throws IOException { + short dictIdx = Dictionary.NOT_IN_DICTIONARY; + if (tagDict != null) { + dictIdx = tagDict.findEntry(data, offset, length); + } + if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { + out.write(Dictionary.NOT_IN_DICTIONARY); + StreamUtils.writeRawVInt32(out, length); + out.write(data, offset, length); + } else { + StreamUtils.writeShort(out, dictIdx); + } + } + +} \ No newline at end of file diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java new file mode 100644 index 0000000..ee034ed --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/Dictionary.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.util; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Dictionary interface + * + * Dictionary indexes should be either bytes or shorts, only positive. (The + * first bit is reserved for detecting whether something is compressed or not). + */ +@InterfaceAudience.Private +public interface Dictionary { + byte NOT_IN_DICTIONARY = -1; + + void init(int initialSize); + /** + * Gets an entry from the dictionary. + * + * @param idx index of the entry + * @return the entry, or null if non existent + */ + byte[] getEntry(short idx); + + /** + * Finds the index of an entry. + * If no entry found, we add it. + * + * @param data the byte array that we're looking up + * @param offset Offset into data to add to Dictionary. + * @param length Length beyond offset that comprises entry; must be > 0. + * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found + */ + short findEntry(byte[] data, int offset, int length); + + /** + * Adds an entry to the dictionary. + * Be careful using this method. It will add an entry to the + * dictionary even if it already has an entry for the same data. + * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating + * dictionary entries. + * + * @param data the entry to add + * @param offset Offset into data to add to Dictionary. + * @param length Length beyond offset that comprises entry; must be > 0. + * @return the index of the entry + */ + + short addEntry(byte[] data, int offset, int length); + + /** + * Flushes the dictionary, empties all values. + */ + void clear(); +} diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java new file mode 100644 index 0000000..3836d74 --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/LRUDictionary.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.util; + +import java.util.HashMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.common.base.Preconditions; + +/** + * WALDictionary using an LRU eviction algorithm. Uses a linked list running + * through a hashtable. Currently has max of 2^15 entries. Will start + * evicting if exceeds this number The maximum memory we expect this dictionary + * to take in the worst case is about: + * (2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes (these are some big names) = ~16MB. + * If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes. + */ +@InterfaceAudience.Private +public class LRUDictionary implements Dictionary { + + BidirectionalLRUMap backingStore; + @Override + public byte[] getEntry(short idx) { + return backingStore.get(idx); + } + + @Override + public void init(int initialSize) { + backingStore = new BidirectionalLRUMap(initialSize); + } + @Override + public short findEntry(byte[] data, int offset, int length) { + short ret = backingStore.findIdx(data, offset, length); + if (ret == NOT_IN_DICTIONARY) { + addEntry(data, offset, length); + } + return ret; + } + + @Override + public short addEntry(byte[] data, int offset, int length) { + if (length <= 0) return NOT_IN_DICTIONARY; + return backingStore.put(data, offset, length); + } + + @Override + public void clear() { + backingStore.clear(); + } + + /* + * Internal class used to implement LRU eviction and dual lookup (by key and + * value). + * + * This is not thread safe. Don't use in multi-threaded applications. + */ + static class BidirectionalLRUMap { + private int currSize = 0; + + // Head and tail of the LRU list. + private Node head; + private Node tail; + + private HashMap nodeToIndex = new HashMap(); + private Node[] indexToNode; + private int initSize = 0; + + public BidirectionalLRUMap(int initialSize) { + initSize = initialSize; + indexToNode = new Node[initialSize]; + for (int i = 0; i < initialSize; i++) { + indexToNode[i] = new Node(); + } + } + + private short put(byte[] array, int offset, int length) { + // We copy the bytes we want, otherwise we might be holding references to + // massive arrays in our dictionary (or those arrays might change) + byte[] stored = new byte[length]; + Bytes.putBytes(stored, 0, array, offset, length); + + if (currSize < initSize) { + // There is space to add without evicting. + indexToNode[currSize].setContents(stored, 0, stored.length); + setHead(indexToNode[currSize]); + short ret = (short) currSize++; + nodeToIndex.put(indexToNode[ret], ret); + return ret; + } else { + short s = nodeToIndex.remove(tail); + tail.setContents(stored, 0, stored.length); + // we need to rehash this. + nodeToIndex.put(tail, s); + moveToHead(tail); + return s; + } + } + + private short findIdx(byte[] array, int offset, int length) { + Short s; + final Node comparisonNode = new Node(); + comparisonNode.setContents(array, offset, length); + if ((s = nodeToIndex.get(comparisonNode)) != null) { + moveToHead(indexToNode[s]); + return s; + } else { + return -1; + } + } + + private byte[] get(short idx) { + Preconditions.checkElementIndex(idx, currSize); + moveToHead(indexToNode[idx]); + return indexToNode[idx].container; + } + + private void moveToHead(Node n) { + if (head == n) { + // no-op -- it's already the head. + return; + } + // At this point we definitely have prev, since it's not the head. + assert n.prev != null; + // Unlink prev. + n.prev.next = n.next; + + // Unlink next + if (n.next != null) { + n.next.prev = n.prev; + } else { + assert n == tail; + tail = n.prev; + } + // Node is now removed from the list. Re-add it at the head. + setHead(n); + } + + private void setHead(Node n) { + // assume it's already unlinked from the list at this point. + n.prev = null; + n.next = head; + if (head != null) { + assert head.prev == null; + head.prev = n; + } + + head = n; + + // First entry + if (tail == null) { + tail = n; + } + } + + private void clear() { + currSize = 0; + nodeToIndex.clear(); + tail = null; + head = null; + + for (Node n : indexToNode) { + n.container = null; + } + + for (int i = 0; i < initSize; i++) { + indexToNode[i].next = null; + indexToNode[i].prev = null; + } + } + + private static class Node { + byte[] container; + int offset; + int length; + Node next; // link towards the tail + Node prev; // link towards the head + + public Node() { + } + + private void setContents(byte[] container, int offset, int length) { + this.container = container; + this.offset = offset; + this.length = length; + } + + @Override + public int hashCode() { + return Bytes.hashCode(container, offset, length); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof Node)) { + return false; + } + + Node casted = (Node) other; + return Bytes.equals(container, offset, length, casted.container, + casted.offset, casted.length); + } + } + } +} diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java new file mode 100644 index 0000000..c19cb19 --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; + +import com.google.common.base.Preconditions; + +/** + * It seems like as soon as somebody sets himself to the task of creating VInt encoding, his mind + * blanks out for a split-second and he starts the work by wrapping it in the most convoluted + * interface he can come up with. Custom streams that allocate memory, DataOutput that is only used + * to write single bytes... We operate on simple streams. Thus, we are going to have a simple + * implementation copy-pasted from protobuf Coded*Stream. + */ +@InterfaceAudience.Private +public class StreamUtils { + + public static void writeRawVInt32(OutputStream output, int value) throws IOException { + assert value >= 0; + while (true) { + if ((value & ~0x7F) == 0) { + output.write(value); + return; + } else { + output.write((value & 0x7F) | 0x80); + value >>>= 7; + } + } + } + + public static int readRawVarint32(InputStream input) throws IOException { + byte tmp = (byte) input.read(); + if (tmp >= 0) { + return tmp; + } + int result = tmp & 0x7f; + if ((tmp = (byte) input.read()) >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 0x7f) << 7; + if ((tmp = (byte) input.read()) >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 0x7f) << 14; + if ((tmp = (byte) input.read()) >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 0x7f) << 21; + result |= (tmp = (byte) input.read()) << 28; + if (tmp < 0) { + // Discard upper 32 bits. + for (int i = 0; i < 5; i++) { + if (input.read() >= 0) { + return result; + } + } + throw new IOException("Malformed varint"); + } + } + } + } + return result; + } + + public static int readRawVarint32(ByteBuffer input) throws IOException { + byte tmp = input.get(); + if (tmp >= 0) { + return tmp; + } + int result = tmp & 0x7f; + if ((tmp = input.get()) >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 0x7f) << 7; + if ((tmp = input.get()) >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 0x7f) << 14; + if ((tmp = input.get()) >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 0x7f) << 21; + result |= (tmp = input.get()) << 28; + if (tmp < 0) { + // Discard upper 32 bits. + for (int i = 0; i < 5; i++) { + if (input.get() >= 0) { + return result; + } + } + throw new IOException("Malformed varint"); + } + } + } + } + return result; + } + + public static short toShort(byte hi, byte lo) { + short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF)); + Preconditions.checkArgument(s >= 0); + return s; + } + + public static void writeShort(OutputStream out, short v) throws IOException { + Preconditions.checkArgument(v >= 0); + out.write((byte) (0xff & (v >> 8))); + out.write((byte) (0xff & v)); + } +} \ No newline at end of file diff --git hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java new file mode 100644 index 0000000..871c6fc --- /dev/null +++ hbase-common/src/test/java/org/apache/hadoop/hbase/io/util/TestLRUDictionary.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.util; + +import static org.junit.Assert.*; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests LRUDictionary + */ +@Category(SmallTests.class) +public class TestLRUDictionary { + LRUDictionary testee; + + @Before + public void setUp() throws Exception { + testee = new LRUDictionary(); + testee.init(Short.MAX_VALUE); + } + + @Test + public void TestContainsNothing() { + assertTrue(isDictionaryEmpty(testee)); + } + + /** + * Assert can't add empty array. + */ + @Test + public void testPassingEmptyArrayToFindEntry() { + assertEquals(Dictionary.NOT_IN_DICTIONARY, + testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0)); + assertEquals(Dictionary.NOT_IN_DICTIONARY, + testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0)); + } + + @Test + public void testPassingSameArrayToAddEntry() { + // Add random predefined byte array, in this case a random byte array from + // HConstants. Assert that when we add, we get new index. Thats how it + // works. + int len = HConstants.CATALOG_FAMILY.length; + int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len); + assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len)); + assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len)); + } + + @Test + public void testBasic() { + Random rand = new Random(); + byte[] testBytes = new byte[10]; + rand.nextBytes(testBytes); + + // Verify that our randomly generated array doesn't exist in the dictionary + assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1); + + // now since we looked up an entry, we should have added it to the + // dictionary, so it isn't empty + + assertFalse(isDictionaryEmpty(testee)); + + // Check if we can find it using findEntry + short t = testee.findEntry(testBytes, 0, testBytes.length); + + // Making sure we do find what we're looking for + assertTrue(t != -1); + + byte[] testBytesCopy = new byte[20]; + + Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length); + + // copy byte arrays, make sure that we check that equal byte arrays are + // equal without just checking the reference + assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t); + + // make sure the entry retrieved is the same as the one put in + assertTrue(Arrays.equals(testBytes, testee.getEntry(t))); + + testee.clear(); + + // making sure clear clears the dictionary + assertTrue(isDictionaryEmpty(testee)); + } + + @Test + public void TestLRUPolicy(){ + //start by filling the dictionary up with byte arrays + for (int i = 0; i < Short.MAX_VALUE; i++) { + testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0, + (BigInteger.valueOf(i)).toByteArray().length); + } + + // check we have the first element added + assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, + BigInteger.ZERO.toByteArray().length) != -1); + + // check for an element we know isn't there + assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0, + BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1); + + // since we just checked for this element, it should be there now. + assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0, + BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1); + + // test eviction, that the least recently added or looked at element is + // evicted. We looked at ZERO so it should be in the dictionary still. + assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, + BigInteger.ZERO.toByteArray().length) != -1); + // Now go from beyond 1 to the end. + for(int i = 1; i < Short.MAX_VALUE; i++) { + assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, + BigInteger.valueOf(i).toByteArray().length) == -1); + } + + // check we can find all of these. + for (int i = 0; i < Short.MAX_VALUE; i++) { + assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, + BigInteger.valueOf(i).toByteArray().length) != -1); + } + } + + static private boolean isDictionaryEmpty(LRUDictionary dict) { + try { + dict.getEntry((short)0); + return false; + } catch (IndexOutOfBoundsException ioobe) { + return true; + } + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index a76cafa..6983c70 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.Dictionary; /** * Context that holds the various dictionaries for compression in HLog. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java index f83217a..04046f1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java deleted file mode 100644 index fd1c264..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * Dictionary interface - * - * Dictionary indexes should be either bytes or shorts, only positive. (The - * first bit is reserved for detecting whether something is compressed or not). - */ -@InterfaceAudience.Private -interface Dictionary { - byte NOT_IN_DICTIONARY = -1; - - void init(int initialSize); - /** - * Gets an entry from the dictionary. - * - * @param idx index of the entry - * @return the entry, or null if non existent - */ - byte[] getEntry(short idx); - - /** - * Finds the index of an entry. - * If no entry found, we add it. - * - * @param data the byte array that we're looking up - * @param offset Offset into data to add to Dictionary. - * @param length Length beyond offset that comprises entry; must be > 0. - * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found - */ - short findEntry(byte[] data, int offset, int length); - - /** - * Adds an entry to the dictionary. - * Be careful using this method. It will add an entry to the - * dictionary even if it already has an entry for the same data. - * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating - * dictionary entries. - * - * @param data the entry to add - * @param offset Offset into data to add to Dictionary. - * @param length Length beyond offset that comprises entry; must be > 0. - * @return the index of the entry - */ - - short addEntry(byte[] data, int offset, int length); - - /** - * Flushes the dictionary, empties all values. - */ - void clear(); -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java deleted file mode 100644 index 1bea447..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import java.util.HashMap; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.base.Preconditions; - -/** - * WALDictionary using an LRU eviction algorithm. Uses a linked list running - * through a hashtable. Currently has max of 2^15 entries. Will start - * evicting if exceeds this number The maximum memory we expect this dictionary - * to take in the worst case is about: - * (2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes (these are some big names) = ~16MB. - * If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes. - */ -@InterfaceAudience.Private -public class LRUDictionary implements Dictionary { - - BidirectionalLRUMap backingStore; - @Override - public byte[] getEntry(short idx) { - return backingStore.get(idx); - } - - @Override - public void init(int initialSize) { - backingStore = new BidirectionalLRUMap(initialSize); - } - @Override - public short findEntry(byte[] data, int offset, int length) { - short ret = backingStore.findIdx(data, offset, length); - if (ret == NOT_IN_DICTIONARY) { - addEntry(data, offset, length); - } - return ret; - } - - @Override - public short addEntry(byte[] data, int offset, int length) { - if (length <= 0) return NOT_IN_DICTIONARY; - return backingStore.put(data, offset, length); - } - - @Override - public void clear() { - backingStore.clear(); - } - - /* - * Internal class used to implement LRU eviction and dual lookup (by key and - * value). - * - * This is not thread safe. Don't use in multi-threaded applications. - */ - static class BidirectionalLRUMap { - private int currSize = 0; - - // Head and tail of the LRU list. - private Node head; - private Node tail; - - private HashMap nodeToIndex = new HashMap(); - private Node[] indexToNode; - private int initSize = 0; - - public BidirectionalLRUMap(int initialSize) { - initSize = initialSize; - indexToNode = new Node[initialSize]; - for (int i = 0; i < initialSize; i++) { - indexToNode[i] = new Node(); - } - } - - private short put(byte[] array, int offset, int length) { - // We copy the bytes we want, otherwise we might be holding references to - // massive arrays in our dictionary (or those arrays might change) - byte[] stored = new byte[length]; - Bytes.putBytes(stored, 0, array, offset, length); - - if (currSize < initSize) { - // There is space to add without evicting. - indexToNode[currSize].setContents(stored, 0, stored.length); - setHead(indexToNode[currSize]); - short ret = (short) currSize++; - nodeToIndex.put(indexToNode[ret], ret); - return ret; - } else { - short s = nodeToIndex.remove(tail); - tail.setContents(stored, 0, stored.length); - // we need to rehash this. - nodeToIndex.put(tail, s); - moveToHead(tail); - return s; - } - } - - private short findIdx(byte[] array, int offset, int length) { - Short s; - final Node comparisonNode = new Node(); - comparisonNode.setContents(array, offset, length); - if ((s = nodeToIndex.get(comparisonNode)) != null) { - moveToHead(indexToNode[s]); - return s; - } else { - return -1; - } - } - - private byte[] get(short idx) { - Preconditions.checkElementIndex(idx, currSize); - moveToHead(indexToNode[idx]); - return indexToNode[idx].container; - } - - private void moveToHead(Node n) { - if (head == n) { - // no-op -- it's already the head. - return; - } - // At this point we definitely have prev, since it's not the head. - assert n.prev != null; - // Unlink prev. - n.prev.next = n.next; - - // Unlink next - if (n.next != null) { - n.next.prev = n.prev; - } else { - assert n == tail; - tail = n.prev; - } - // Node is now removed from the list. Re-add it at the head. - setHead(n); - } - - private void setHead(Node n) { - // assume it's already unlinked from the list at this point. - n.prev = null; - n.next = head; - if (head != null) { - assert head.prev == null; - head.prev = n; - } - - head = n; - - // First entry - if (tail == null) { - tail = n; - } - } - - private void clear() { - currSize = 0; - nodeToIndex.clear(); - tail = null; - head = null; - - for (Node n : indexToNode) { - n.container = null; - } - - for (int i = 0; i < initSize; i++) { - indexToNode[i].next = null; - indexToNode[i].prev = null; - } - } - - private static class Node { - byte[] container; - int offset; - int length; - Node next; // link towards the tail - Node prev; // link towards the head - - public Node() { - } - - private void setContents(byte[] container, int offset, int length) { - this.container = container; - this.offset = offset; - this.length = length; - } - - @Override - public int hashCode() { - return Bytes.hashCode(container, offset, length); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof Node)) { - return false; - } - - Node casted = (Node) other; - return Bytes.equals(container, offset, length, casted.container, - casted.offset, casted.length); - } - } - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 26d8e61..c75a233 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -28,8 +28,14 @@ import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; @@ -64,11 +70,36 @@ public class ProtobufLogReader extends ReaderBase { private long walEditsStopOffset; private boolean trailerPresent; + // Context used by the WAL for compressing tags + private TagCompressionContext tagCompressionContext; + public ProtobufLogReader() { super(); } @Override + public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream) + throws IOException { + super.init(fs, path, conf, stream); + if (hasTagsCompression()) { + try { + if (tagCompressionContext == null) { + tagCompressionContext = new TagCompressionContext(LRUDictionary.class); + } else { + tagCompressionContext.clear(); + } + } catch (Exception e) { + throw new IOException("Failed to initialize TagCompressionContext", e); + } + } + } + + private boolean hasTagsCompression() { + // By default tags compression be enabled along with WAL compression. + return hasCompression() && conf.getBoolean(HConstants.ENABLE_WAL_TAGS_COMPRESSION, true); + } + + @Override public void close() throws IOException { if (this.inputStream != null) { this.inputStream.close(); @@ -178,7 +209,8 @@ public class ProtobufLogReader extends ReaderBase { @Override protected void initAfterCompression() throws IOException { - WALCellCodec codec = WALCellCodec.create(this.conf, this.compressionContext); + WALCellCodec codec = WALCellCodec.create(this.conf, this.compressionContext, + this.tagCompressionContext); this.cellDecoder = codec.getDecoder(this.inputStream); if (this.hasCompression) { this.byteStringUncompressor = codec.getByteStringUncompressor(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 1174847..0583f88 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; @@ -50,6 +52,9 @@ public class ProtobufLogWriter extends WriterBase { // than this size, it is written/read respectively, with a WARN message in the log. private int trailerWarnSize; + // Context used by the WAL for compressing tags + private TagCompressionContext tagCompressionContext; + public ProtobufLogWriter() { super(); } @@ -69,7 +74,18 @@ public class ProtobufLogWriter extends WriterBase { output.write(ProtobufLogReader.PB_WAL_MAGIC); WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output); - WALCellCodec codec = WALCellCodec.create(conf, this.compressionContext); + if (doCompress) { + // By default tags compression be enabled along with WAL compression. + if (conf.getBoolean(HConstants.ENABLE_WAL_TAGS_COMPRESSION, true)) { + try { + this.tagCompressionContext = new TagCompressionContext(LRUDictionary.class); + } catch (Exception e) { + throw new IOException("Failed to initiate TagCompressionContext", e); + } + } + } + WALCellCodec codec = WALCellCodec.create(conf, this.compressionContext, + this.tagCompressionContext); this.cellEncoder = codec.getEncoder(this.output); if (doCompress) { this.compressor = codec.getByteStringCompressor(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 72db786..8c05d08 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.FSUtils; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 4f73f32..57e2465 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -29,11 +29,13 @@ import org.apache.hadoop.hbase.codec.BaseDecoder; import org.apache.hadoop.hbase.codec.BaseEncoder; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.util.Dictionary; +import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.io.IOUtils; -import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; @@ -46,6 +48,8 @@ public class WALCellCodec implements Codec { public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; private final CompressionContext compression; + private final TagCompressionContext tagCompressionContext; + private final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() { @Override public byte[] uncompress(ByteString data, Dictionary dict) throws IOException { @@ -59,9 +63,13 @@ public class WALCellCodec implements Codec { * @param conf configuration to configure this * @param compression compression the codec should support, can be null to indicate no * compression + * @param tagCompressionContext context for compressing the tags. + * Can be null to indicate no tag compression. */ - public WALCellCodec(Configuration conf, CompressionContext compression) { + public WALCellCodec(Configuration conf, CompressionContext compression, + TagCompressionContext tagCompressionContext) { this.compression = compression; + this.tagCompressionContext = tagCompressionContext; } /** @@ -70,14 +78,16 @@ public class WALCellCodec implements Codec { * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, * uses a {@link WALCellCodec}. * @param compression compression the codec should use + * @param tagCompressionContext context for compressing the tags. * @return a {@link WALCellCodec} ready for use. * @throws UnsupportedOperationException if the codec cannot be instantiated */ - public static WALCellCodec create(Configuration conf, CompressionContext compression) - throws UnsupportedOperationException { + public static WALCellCodec create(Configuration conf, CompressionContext compression, + TagCompressionContext tagCompressionContext) throws UnsupportedOperationException { String className = conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); return ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class, - CompressionContext.class }, new Object[] { conf, compression }); + CompressionContext.class, TagCompressionContext.class }, new Object[] { conf, compression, + tagCompressionContext }); } public interface ByteStringCompressor { @@ -141,9 +151,13 @@ public class WALCellCodec implements Codec { static class CompressedKvEncoder extends BaseEncoder { private final CompressionContext compression; - public CompressedKvEncoder(OutputStream out, CompressionContext compression) { + private final TagCompressionContext tagCompressionContext; + + public CompressedKvEncoder(OutputStream out, CompressionContext compression, + TagCompressionContext tagCompressionContext) { super(out); this.compression = compression; + this.tagCompressionContext = tagCompressionContext; } @Override @@ -157,7 +171,8 @@ public class WALCellCodec implements Codec { StreamUtils.writeRawVInt32(out, kv.getKeyLength()); StreamUtils.writeRawVInt32(out, kv.getValueLength()); // To support tags - StreamUtils.writeRawVInt32(out, kv.getTagsLength()); + short tagsLength = kv.getTagsLength(); + StreamUtils.writeRawVInt32(out, tagsLength); // Write row, qualifier, and family; use dictionary // compression as they're likely to have duplicates. @@ -165,11 +180,24 @@ public class WALCellCodec implements Codec { write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict); write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict); - // Write the rest uncompressed. + // Write timestamp, type and value as uncompressed. int pos = kv.getTimestampOffset(); - int remainingLength = kv.getLength() + offset - pos; - out.write(kvBuffer, pos, remainingLength); + int tsTypeValLen = kv.getLength() + offset - pos; + if (tagsLength > 0) { + tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; + } + out.write(kvBuffer, pos, tsTypeValLen); + if (tagsLength > 0) { + if (tagCompressionContext != null) { + // Write tags using Dictionary compression + tagCompressionContext.compressTags(out, kvBuffer, kv.getTagsOffset(), tagsLength); + } else { + // Tag compression is disabled within the WAL compression. Just write the tags bytes as + // it is. + out.write(kvBuffer, kv.getTagsOffset(), tagsLength); + } + } } private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException { @@ -189,9 +217,13 @@ public class WALCellCodec implements Codec { static class CompressedKvDecoder extends BaseDecoder { private final CompressionContext compression; - public CompressedKvDecoder(InputStream in, CompressionContext compression) { + private final TagCompressionContext tagCompressionContext; + + public CompressedKvDecoder(InputStream in, CompressionContext compression, + TagCompressionContext tagCompressionContext) { super(in); this.compression = compression; + this.tagCompressionContext = tagCompressionContext; } @Override @@ -199,7 +231,7 @@ public class WALCellCodec implements Codec { int keylength = StreamUtils.readRawVarint32(in); int vlength = StreamUtils.readRawVarint32(in); - int tagsLength = StreamUtils.readRawVarint32(in); + short tagsLength = (short) StreamUtils.readRawVarint32(in); int length = 0; if(tagsLength == 0) { length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; @@ -228,8 +260,23 @@ public class WALCellCodec implements Codec { elemLen = readIntoArray(backingArray, pos, compression.qualifierDict); pos += elemLen; - // the rest - IOUtils.readFully(in, backingArray, pos, length - pos); + // timestamp, type and value + int tsTypeValLen = length - pos; + if (tagsLength > 0) { + tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; + } + IOUtils.readFully(in, backingArray, pos, tsTypeValLen); + pos += tsTypeValLen; + + // tags + if (tagsLength > 0) { + pos = Bytes.putShort(backingArray, pos, tagsLength); + if (tagCompressionContext != null) { + this.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); + } else { + IOUtils.readFully(in, backingArray, pos, tagsLength); + } + } return new KeyValue(backingArray, 0, length); } @@ -275,14 +322,14 @@ public class WALCellCodec implements Codec { @Override public Decoder getDecoder(InputStream is) { - return (compression == null) - ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression); + return (compression == null) ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder( + is, compression, tagCompressionContext); } @Override public Encoder getEncoder(OutputStream os) { - return (compression == null) - ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); + return (compression == null) ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, + compression, tagCompressionContext); } public ByteStringCompressor getByteStringCompressor() { @@ -294,80 +341,4 @@ public class WALCellCodec implements Codec { // TODO: ideally this should also encapsulate compressionContext return this.statelessUncompressor; } - - /** - * It seems like as soon as somebody sets himself to the task of creating VInt encoding, - * his mind blanks out for a split-second and he starts the work by wrapping it in the - * most convoluted interface he can come up with. Custom streams that allocate memory, - * DataOutput that is only used to write single bytes... We operate on simple streams. - * Thus, we are going to have a simple implementation copy-pasted from protobuf Coded*Stream. - */ - private static class StreamUtils { - public static int computeRawVarint32Size(final int value) { - if ((value & (0xffffffff << 7)) == 0) return 1; - if ((value & (0xffffffff << 14)) == 0) return 2; - if ((value & (0xffffffff << 21)) == 0) return 3; - if ((value & (0xffffffff << 28)) == 0) return 4; - return 5; - } - - static void writeRawVInt32(OutputStream output, int value) throws IOException { - assert value >= 0; - while (true) { - if ((value & ~0x7F) == 0) { - output.write(value); - return; - } else { - output.write((value & 0x7F) | 0x80); - value >>>= 7; - } - } - } - - static int readRawVarint32(InputStream input) throws IOException { - byte tmp = (byte)input.read(); - if (tmp >= 0) { - return tmp; - } - int result = tmp & 0x7f; - if ((tmp = (byte)input.read()) >= 0) { - result |= tmp << 7; - } else { - result |= (tmp & 0x7f) << 7; - if ((tmp = (byte)input.read()) >= 0) { - result |= tmp << 14; - } else { - result |= (tmp & 0x7f) << 14; - if ((tmp = (byte)input.read()) >= 0) { - result |= tmp << 21; - } else { - result |= (tmp & 0x7f) << 21; - result |= (tmp = (byte)input.read()) << 28; - if (tmp < 0) { - // Discard upper 32 bits. - for (int i = 0; i < 5; i++) { - if (input.read() >= 0) { - return result; - } - } - throw new IOException("Malformed varint"); - } - } - } - } - return result; - } - - static short toShort(byte hi, byte lo) { - short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF)); - Preconditions.checkArgument(s >= 0); - return s; - } - - static void writeShort(OutputStream out, short v) throws IOException { - Preconditions.checkArgument(v >= 0); - out.write((byte)(0xff & (v >> 8))); - out.write((byte)(0xff & v)); - } - } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java index 6447b4f..1c539a9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.FSUtils; /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java index 06f0df5..84eeb88 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java @@ -28,6 +28,8 @@ import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.util.Dictionary; +import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.Bytes; import org.junit.BeforeClass; import org.junit.Test; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java index b992aca..eec44fb 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.TagCompressionContext; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -36,8 +37,9 @@ public class TestCustomWALCellCodec { public Configuration conf; public CompressionContext context; - public CustomWALCellCodec(Configuration conf, CompressionContext compression) { - super(conf, compression); + public CustomWALCellCodec(Configuration conf, CompressionContext compression, + TagCompressionContext tagCompressionContext) { + super(conf, compression, tagCompressionContext); this.conf = conf; this.context = compression; } @@ -53,7 +55,7 @@ public class TestCustomWALCellCodec { Configuration conf = new Configuration(false); conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, CustomWALCellCodec.class, WALCellCodec.class); - CustomWALCellCodec codec = (CustomWALCellCodec) WALCellCodec.create(conf, null); + CustomWALCellCodec codec = (CustomWALCellCodec) WALCellCodec.create(conf, null, null); assertEquals("Custom codec didn't get initialized with the right configuration!", conf, codec.conf); assertEquals("Custom codec didn't get initialized with the right compression context!", null, diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java index afd0589..58d0b7c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DataOutputBuffer; import org.junit.Test; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java deleted file mode 100644 index a669823..0000000 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver.wal; - -import static org.junit.Assert.*; - -import java.math.BigInteger; -import java.util.Arrays; -import java.util.Random; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Tests LRUDictionary - */ -@Category(SmallTests.class) -public class TestLRUDictionary { - LRUDictionary testee; - - @Before - public void setUp() throws Exception { - testee = new LRUDictionary(); - testee.init(Short.MAX_VALUE); - } - - @Test - public void TestContainsNothing() { - assertTrue(isDictionaryEmpty(testee)); - } - - /** - * Assert can't add empty array. - */ - @Test - public void testPassingEmptyArrayToFindEntry() { - assertEquals(Dictionary.NOT_IN_DICTIONARY, - testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0)); - assertEquals(Dictionary.NOT_IN_DICTIONARY, - testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0)); - } - - @Test - public void testPassingSameArrayToAddEntry() { - // Add random predefined byte array, in this case a random byte array from - // HConstants. Assert that when we add, we get new index. Thats how it - // works. - int len = HConstants.CATALOG_FAMILY.length; - int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len); - assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len)); - assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len)); - } - - @Test - public void testBasic() { - Random rand = new Random(); - byte[] testBytes = new byte[10]; - rand.nextBytes(testBytes); - - // Verify that our randomly generated array doesn't exist in the dictionary - assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1); - - // now since we looked up an entry, we should have added it to the - // dictionary, so it isn't empty - - assertFalse(isDictionaryEmpty(testee)); - - // Check if we can find it using findEntry - short t = testee.findEntry(testBytes, 0, testBytes.length); - - // Making sure we do find what we're looking for - assertTrue(t != -1); - - byte[] testBytesCopy = new byte[20]; - - Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length); - - // copy byte arrays, make sure that we check that equal byte arrays are - // equal without just checking the reference - assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t); - - // make sure the entry retrieved is the same as the one put in - assertTrue(Arrays.equals(testBytes, testee.getEntry(t))); - - testee.clear(); - - // making sure clear clears the dictionary - assertTrue(isDictionaryEmpty(testee)); - } - - @Test - public void TestLRUPolicy(){ - //start by filling the dictionary up with byte arrays - for (int i = 0; i < Short.MAX_VALUE; i++) { - testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0, - (BigInteger.valueOf(i)).toByteArray().length); - } - - // check we have the first element added - assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, - BigInteger.ZERO.toByteArray().length) != -1); - - // check for an element we know isn't there - assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0, - BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1); - - // since we just checked for this element, it should be there now. - assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0, - BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1); - - // test eviction, that the least recently added or looked at element is - // evicted. We looked at ZERO so it should be in the dictionary still. - assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, - BigInteger.ZERO.toByteArray().length) != -1); - // Now go from beyond 1 to the end. - for(int i = 1; i < Short.MAX_VALUE; i++) { - assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, - BigInteger.valueOf(i).toByteArray().length) == -1); - } - - // check we can find all of these. - for (int i = 0; i < Short.MAX_VALUE; i++) { - assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, - BigInteger.valueOf(i).toByteArray().length) != -1); - } - } - - static private boolean isDictionaryEmpty(LRUDictionary dict) { - try { - dict.getEntry((short)0); - return false; - } catch (IndexOutOfBoundsException ioobe) { - return true; - } - } -} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index e3c1cc8..76005c3 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.codec.Codec.Decoder; import org.apache.hadoop.hbase.codec.Codec.Encoder; +import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -42,8 +44,17 @@ public class TestWALCellCodecWithCompression { @Test public void testEncodeDecodeKVsWithTags() throws Exception { + doTest(null); + } + + @Test + public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception { + doTest(new TagCompressionContext(LRUDictionary.class)); + } + + private void doTest(TagCompressionContext ctx) throws Exception { WALCellCodec codec = new WALCellCodec(new Configuration(false), new CompressionContext( - LRUDictionary.class, false)); + LRUDictionary.class, false), ctx); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); Encoder encoder = codec.getEncoder(bos); encoder.write(createKV(1));