commit cedfb45523b5b753790e21ec3fad864c5fc68c29 Author: Todd Lipcon Date: Wed Mar 14 00:15:51 2012 -0700 delta for compression patch diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java index a5d8c71..602704e 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -95,22 +95,19 @@ public class Compressor { static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException { byte status = in.readByte(); - int length; - short dictIdx; - byte[] entry = null; if (status == Dictionary.NOT_IN_DICTIONARY) { - length = WritableUtils.readVInt(in); + int length = WritableUtils.readVInt(in); // if this isn't in the dictionary, we need to add to the dictionary. byte[] arr = new byte[length]; in.readFully(arr); if (dict != null) dict.addEntry(arr, 0, length); return arr; } else { - dictIdx = toShort(status, in.readByte()); - entry = dict.getEntry(dictIdx); + short dictIdx = toShort(status, in.readByte()); + byte[] entry = dict.getEntry(dictIdx); if (entry == null) { - throw new IOException("Missing dictionary reference at offset " + throw new IOException("Missing dictionary entry for index " + dictIdx); } return entry; @@ -124,55 +121,34 @@ public class Compressor { * @param to the array to write into * @param offset array offset to start writing to * @param in the DataInput to read from - * @param sizeBytes size of the length of the prefix. Oftentimes we do not use - * an int, because we can represent the entry as a byte or a short * @param dict the dictionary to use for compression * - * @return the index of the last byte written. + * @return the length of the uncompressed data */ static int uncompressIntoArray(byte[] to, int offset, DataInput in, - int sizeBytes, Dictionary dict) - throws IOException { + Dictionary dict) throws IOException { byte status = in.readByte(); - int length; - short dictIdx; - int pos = offset; - byte[] entry = null; if (status == Dictionary.NOT_IN_DICTIONARY) { // status byte indicating that data to be read is not in dictionary. - length = WritableUtils.readVInt(in); + // if this isn't in the dictionary, we need to add to the dictionary. + int length = WritableUtils.readVInt(in); + in.readFully(to, offset, length); + dict.addEntry(to, offset, length); + return length; } else { // the status byte also acts as the higher order byte of the dictionary // entry - dictIdx = toShort(status, in.readByte()); - entry = dict.getEntry(dictIdx); - length = entry.length; - } - - // sometimes, we need to write the size of the byte array, different - // datatypes may be used - if (sizeBytes == Bytes.SIZEOF_BYTE) { - pos = Bytes.putByte(to, pos, (byte) (length & 0x0000ff)); - } else if (sizeBytes == Bytes.SIZEOF_SHORT) { - pos = Bytes.putShort(to, pos, (short) (length & 0x0000ffff)); - } else if (sizeBytes == Bytes.SIZEOF_INT) { - pos = Bytes.putInt(to, pos, length); - } else if (sizeBytes != 0) { - throw new IOException("sizeBytes of " + sizeBytes + " not supported"); - } - - if (status == Dictionary.NOT_IN_DICTIONARY) { - // if this isn't in the dictionary, we need to add to the dictionary. - in.readFully(to, pos, length); - dict.addEntry(to, pos, length); - pos += length; - } else { + short dictIdx = toShort(status, in.readByte()); + byte[] entry = dict.getEntry(dictIdx); + if (entry == null) { + throw new IOException("Missing dictionary entry for index " + + dictIdx); + } // now we write the uncompressed value. - pos = Bytes.putBytes(to, pos, entry, 0, length); + Bytes.putBytes(to, offset, entry, 0, entry.length); + return entry.length; } - - return pos; } /** @@ -182,12 +158,13 @@ public class Compressor { * @param out the DataOutput to write into * @param dict the dictionary to use for compression */ - static void writeCompressed(byte[] data, int offset, int length, DataOutput out, Dictionary dict) throws IOException { short dictIdx = Dictionary.NOT_IN_DICTIONARY; - if (dict != null) dictIdx = dict.findEntry(data, offset, length); + if (dict != null) { + dictIdx = dict.findEntry(data, offset, length); + } if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { // not in dict out.writeByte(Dictionary.NOT_IN_DICTIONARY); @@ -203,4 +180,4 @@ public class Compressor { Preconditions.checkArgument(s >= 0); return s; } -} \ No newline at end of file +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java index e35250c..5c1872e 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java @@ -49,8 +49,8 @@ interface Dictionary { /** * Adds an entry to the dictionary. - * Be careful using this method. It will just add pass entry to the - * dictionary though it already has an entry. + * Be careful using this method. It will add an entry to the + * dictionary even if it already has an entry for the same data. * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating * dictionary entries. * @@ -66,4 +66,4 @@ interface Dictionary { * Flushes the dictionary, empties all values. */ public void clear(); -} \ No newline at end of file +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index b11aed6..c245f06 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -1656,7 +1656,6 @@ public class HLog implements Syncable { this.key = key; this.edit = edit; } - /** * Gets the edit * @return edit @@ -1664,7 +1663,6 @@ public class HLog implements Syncable { public WALEdit getEdit() { return edit; } - /** * Gets the key * @return key @@ -1698,7 +1696,6 @@ public class HLog implements Syncable { this.key.readFields(dataInput); this.edit.readFields(dataInput); } - } /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 7d7f306..c522576 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -47,7 +47,36 @@ import org.apache.hadoop.io.WritableUtils; public class HLogKey implements WritableComparable { // should be < 0 (@see #readFields(DataInput)) // version 2 supports HLog compression - private static final int VERSION = -2; + enum Version { + UNVERSIONED(0), + INITIAL(-1), + COMPRESSED(-2); + + final int code; + static final Version[] byCode; + static { + byCode = Version.values(); + for (int i = 0; i < byCode.length; i++) { + if (byCode[i].code != -1 * i) { + throw new AssertionError("Values in this enum should be descending by one"); + } + } + } + + Version(int code) { + this.code = code; + } + + boolean atLeast(Version other) { + return code <= other.code; + } + + static Version fromCode(int code) { + return byCode[code * -1]; + } + } + + private static final Version VERSION = Version.COMPRESSED; // The encoded region name. private byte [] encodedRegionName; @@ -225,7 +254,7 @@ public class HLogKey implements WritableComparable { @Override public void write(DataOutput out) throws IOException { - WritableUtils.writeVInt(out, VERSION); + WritableUtils.writeVInt(out, VERSION.code); if (compressionContext == null) { Bytes.writeByteArray(out, this.encodedRegionName); Bytes.writeByteArray(out, this.tablename); @@ -250,7 +279,7 @@ public class HLogKey implements WritableComparable { @Override public void readFields(DataInput in) throws IOException { - int version = 0; + Version version = Version.UNVERSIONED; // HLogKey was not versioned in the beginning. // In order to introduce it now, we make use of the fact // that encodedRegionName was written with Bytes.writeByteArray, @@ -262,14 +291,14 @@ public class HLogKey implements WritableComparable { int len = WritableUtils.readVInt(in); if (len < 0) { // what we just read was the version - version = len; + version = Version.fromCode(len); // We only compress V2 of HLogkey. // If compression is on, the length is handled by the dictionary - if (compressionContext == null || version == -1) { + if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) { len = WritableUtils.readVInt(in); } } - if (compressionContext == null || version == -1) { + if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) { this.encodedRegionName = new byte[len]; in.readFully(this.encodedRegionName); this.tablename = Bytes.readByteArray(in); @@ -281,7 +310,7 @@ public class HLogKey implements WritableComparable { this.logSeqNum = in.readLong(); this.writeTime = in.readLong(); this.clusterId = HConstants.DEFAULT_CLUSTER_ID; - if (version < 0) { + if (version.atLeast(Version.INITIAL)) { if (in.readBoolean()) { this.clusterId = new UUID(in.readLong(), in.readLong()); } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java index 616edc4..0f9743f 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java @@ -43,7 +43,7 @@ class KeyValueCompression { * @throws IOException */ - public static KeyValue readFields(DataInput in, CompressionContext readContext) + public static KeyValue readKV(DataInput in, CompressionContext readContext) throws IOException { int keylength = WritableUtils.readVInt(in); int vlength = WritableUtils.readVInt(in); @@ -55,22 +55,35 @@ class KeyValueCompression { pos = Bytes.putInt(backingArray, pos, vlength); // the row - pos = Compressor.uncompressIntoArray(backingArray, pos, in, - Bytes.SIZEOF_SHORT, readContext.rowDict); + int elemLen = Compressor.uncompressIntoArray(backingArray, + pos + Bytes.SIZEOF_SHORT, in, readContext.rowDict); + checkLength(elemLen, Short.MAX_VALUE); + pos = Bytes.putShort(backingArray, pos, (short)elemLen); + pos += elemLen; // family - pos = Compressor.uncompressIntoArray(backingArray, pos, in, - Bytes.SIZEOF_BYTE, readContext.familyDict); + elemLen = Compressor.uncompressIntoArray(backingArray, + pos + Bytes.SIZEOF_BYTE, in, readContext.familyDict); + checkLength(elemLen, Byte.MAX_VALUE); + pos = Bytes.putByte(backingArray, pos, (byte)elemLen); + pos += elemLen; // qualifier - pos = Compressor.uncompressIntoArray(backingArray, pos, in, 0, + elemLen = Compressor.uncompressIntoArray(backingArray, pos, in, readContext.qualifierDict); + pos += elemLen; // the rest in.readFully(backingArray, pos, length - pos); return new KeyValue(backingArray); + } + private static void checkLength(int len, int max) throws IOException { + if (len < 0 || len > max) { + throw new IOException( + "Invalid length for compresesed portion of keyvalue: " + len); + } } /** @@ -81,7 +94,7 @@ class KeyValueCompression { * @param writeContext the compressionContext to use. * @throws IOException */ - public static void write(final DataOutput out, KeyValue keyVal, + public static void writeKV(final DataOutput out, KeyValue keyVal, CompressionContext writeContext) throws IOException { byte[] backingArray = keyVal.getBuffer(); int offset = keyVal.getOffset(); @@ -110,4 +123,4 @@ class KeyValueCompression { int remainingLength = keyVal.getLength() + offset - (pos); out.write(backingArray, pos, remainingLength); } -} \ No newline at end of file +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java index 1b2e3e5..584b7a4 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java @@ -22,6 +22,8 @@ import java.util.HashMap; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.base.Preconditions; + /** * WALDictionary using an LRU eviction algorithm. Uses a linked list running * through a hashtable. @@ -58,33 +60,35 @@ public class LRUDictionary implements Dictionary { * Internal class used to implement LRU eviction and dual lookup (by key and * value). * - * This is not thread safe. Don't use in thread safe applications + * This is not thread safe. Don't use in multi-threaded applications. */ static class BidirectionalLRUMap { - static final int MAXSIZE = Short.MAX_VALUE; + static final int MAX_SIZE = Short.MAX_VALUE; private int currSize = 0; - private Node tail; + // Head and tail of the LRU list. private Node head; + private Node tail; private HashMap nodeToIndex = new HashMap(); - private Node[] indexToNode = new Node[MAXSIZE]; + private Node[] indexToNode = new Node[MAX_SIZE]; public BidirectionalLRUMap() { - for (int i = 0; i < MAXSIZE; i++) { + for (int i = 0; i < MAX_SIZE; i++) { indexToNode[i] = new Node(); } } private short put(byte[] array, int offset, int length) { - // We copy the bits we want, otherwise we might be holding references to - // massive arrays in our dictionary. + // We copy the bytes we want, otherwise we might be holding references to + // massive arrays in our dictionary (or those arrays might change) byte[] stored = new byte[length]; Bytes.putBytes(stored, 0, array, offset, length); - if (currSize < MAXSIZE) { + if (currSize < MAX_SIZE) { + // There is space to add without evicting. indexToNode[currSize].setContents(stored, 0, stored.length); - moveToFront(indexToNode[currSize]); + setHead(indexToNode[currSize]); short ret = (short) currSize++; nodeToIndex.put(indexToNode[ret], ret); return ret; @@ -93,7 +97,7 @@ public class LRUDictionary implements Dictionary { tail.setContents(stored, 0, stored.length); // we need to rehash this. nodeToIndex.put(tail, s); - moveToFront(tail); + moveToHead(tail); return s; } } @@ -103,7 +107,7 @@ public class LRUDictionary implements Dictionary { final Node comparisonNode = new Node(); comparisonNode.setContents(array, offset, length); if ((s = nodeToIndex.get(comparisonNode)) != null) { - moveToFront(indexToNode[s]); + moveToHead(indexToNode[s]); return s; } else { return -1; @@ -111,40 +115,47 @@ public class LRUDictionary implements Dictionary { } private byte[] get(short idx) { - moveToFront(indexToNode[idx]); + Preconditions.checkElementIndex(idx, currSize); + moveToHead(indexToNode[idx]); return indexToNode[idx].container; } - private void moveToFront(Node n) { - - if (tail == null) { - tail = n; - } - + private void moveToHead(Node n) { if (head == n) { + // no-op -- it's already the head. return; } + // At this point we definitely have prev, since it's not the head. + assert n.prev != null; + // Unlink prev. + n.prev.next = n.next; - if (tail == n) { - if (n.next != null) { - tail = n.next; - } - } - + // Unlink next if (n.next != null) { n.next.prev = n.prev; + } else { + assert n == tail; + tail = n.prev; } - if (n.prev != null) { - n.prev.next = n.next; - } - - n.prev = head; + // Node is now removed from the list. Re-add it at the head. + setHead(n); + } + + private void setHead(Node n) { + // assume it's already unlinked from the list at this point. + n.prev = null; + n.next = head; if (head != null) { - head.next = n; + assert head.prev == null; + head.prev = n; } - n.next = null; head = n; + + // First entry + if (tail == null) { + tail = n; + } } private void clear() { @@ -157,7 +168,7 @@ public class LRUDictionary implements Dictionary { n.container = null; } - for (int i = 0; i < MAXSIZE; i++) { + for (int i = 0; i < MAX_SIZE; i++) { indexToNode[i].next = null; indexToNode[i].prev = null; } @@ -167,8 +178,8 @@ public class LRUDictionary implements Dictionary { byte[] container; int offset; int length; - Node next; - Node prev; + Node next; // link towards the tail + Node prev; // link towards the head public Node() { } @@ -196,4 +207,4 @@ public class LRUDictionary implements Dictionary { } } } -} \ No newline at end of file +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 70473ae..efa5b64 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -123,7 +123,7 @@ public class WALEdit implements Writable, HeapSize { int numEdits = in.readInt(); for (int idx = 0; idx < numEdits; idx++) { if (compressionContext != null) { - this.add(KeyValueCompression.readFields(in, compressionContext)); + this.add(KeyValueCompression.readKV(in, compressionContext)); } else { KeyValue kv = new KeyValue(); kv.readFields(in); @@ -157,7 +157,7 @@ public class WALEdit implements Writable, HeapSize { // We interleave the two lists for code simplicity for (KeyValue kv : kvs) { if (compressionContext != null) { - KeyValueCompression.write(out, kv, compressionContext); + KeyValueCompression.writeKV(out, kv, compressionContext); } else{ kv.write(out); } @@ -201,4 +201,4 @@ public class WALEdit implements Writable, HeapSize { return sb.toString(); } -} \ No newline at end of file +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java index eea5870..dad681d 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java @@ -27,13 +27,16 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.experimental.categories.Category; /** * Test our compressor class. */ +@Category(SmallTests.class) public class TestCompressor { @BeforeClass public static void setUpBeforeClass() throws Exception { diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java new file mode 100644 index 0000000..8fa7fe8 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.*; + +import com.google.common.collect.Lists; + +@Category(SmallTests.class) +public class TestKeyValueCompression { + private static final byte[] VALUE = Bytes.toBytes("fake value"); + private static final int BUF_SIZE = 256*1024; + + @Test + public void testCountingKVs() throws Exception { + List kvs = Lists.newArrayList(); + for (int i = 0; i < 400; i++) { + byte[] row = Bytes.toBytes("row" + i); + byte[] fam = Bytes.toBytes("fam" + i); + byte[] qual = Bytes.toBytes("qual" + i); + kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE)); + } + + runTestCycle(kvs); + } + + @Test + public void testRepeatingKVs() throws Exception { + List kvs = Lists.newArrayList(); + for (int i = 0; i < 400; i++) { + byte[] row = Bytes.toBytes("row" + (i % 10)); + byte[] fam = Bytes.toBytes("fam" + (i % 127)); + byte[] qual = Bytes.toBytes("qual" + (i % 128)); + kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE)); + } + + runTestCycle(kvs); + } + + private void runTestCycle(List kvs) throws Exception { + CompressionContext ctx = new CompressionContext(LRUDictionary.class); + DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE); + for (KeyValue kv : kvs) { + KeyValueCompression.writeKV(buf, kv, ctx); + } + + ctx.clear(); + DataInputStream in = new DataInputStream(new ByteArrayInputStream( + buf.getData(), 0, buf.getLength())); + for (KeyValue kv : kvs) { + KeyValue readBack = KeyValueCompression.readKV(in, ctx); + assertEquals(kv, readBack); + } + } +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java index 81c8807..99983a2 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.math.BigInteger; import java.util.Arrays; @@ -112,7 +110,7 @@ public class TestLRUDictionary { @Test public void TestLRUPolicy(){ //start by filling the dictionary up with byte arrays - for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) { + for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0, (BigInteger.valueOf(i)).toByteArray().length); } @@ -134,24 +132,24 @@ public class TestLRUDictionary { assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, BigInteger.ZERO.toByteArray().length) != -1); // Now go from beyond 1 to the end. - for(int i = 1; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) { + for(int i = 1; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, BigInteger.valueOf(i).toByteArray().length) == -1); } // check we can find all of these. - for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) { + for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, BigInteger.valueOf(i).toByteArray().length) != -1); } } static private boolean isDictionaryEmpty(LRUDictionary dict) { - for (short i = 0; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) { - if (dict.getEntry(i) != null) { - return false; - } + try { + dict.getEntry((short)0); + return false; + } catch (IndexOutOfBoundsException ioobe) { + return true; } - return true; } -} \ No newline at end of file +}