Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1299099) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -61,7 +61,7 @@ @Category(MediumTests.class) public class TestWALReplay { public static final Log LOG = LogFactory.getLog(TestWALReplay.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); private Path hbaseRootDir = null; private Path oldLogDir; Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java (revision 0) @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Enables compression and runs the TestWALReplay tests. + */ +@Category(MediumTests.class) +public class TestWALReplayCompressed extends TestWALReplay { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestWALReplay.setUpBeforeClass(); + Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + } + +} Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java (revision 0) @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests LRUDictionary + */ +@Category(SmallTests.class) +public class TestLRUDictionary { + LRUDictionary testee; + + @Before + public void setUp() throws Exception { + testee = new LRUDictionary(); + } + + @Test + public void TestContainsNothing() { + assertTrue(dictIsEmpty(testee)); + } + + @Test + public void testBasic() { + Random rand = new Random(); + byte[] testBytes = new byte[10]; + rand.nextBytes(testBytes); + + // Verify that our randomly generated array doesn't exist in the dictionary + assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1); + + // now since we looked up an entry, we should have added it to the + // dictionary, so it isn't empty + + assertFalse(dictIsEmpty(testee)); + + // Check if we can find it using findEntry + short t = testee.findEntry(testBytes, 0, testBytes.length); + + // Making sure we do find what we're looking for + assertTrue(t != -1); + + byte[] testBytesCopy = new byte[20]; + + Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length); + + // copy byte arrays, make sure that we check that equal byte arrays are + // equal without just checking the reference + assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t); + + // make sure the entry retrieved is the same as the one put in + assertTrue(Arrays.equals(testBytes, testee.getEntry(t))); + + testee.clear(); + + // making sure clear clears the dictionary + assertTrue(dictIsEmpty(testee)); + } + + @Test + public void TestLRUPolicy(){ + //start by filling the dictionary up with byte arrays + for(int i = 0; i < Short.MAX_VALUE; i++){ + testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0, + (BigInteger.valueOf(i)).toByteArray().length); + } + + // check we have the first element added + assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, + BigInteger.ZERO.toByteArray().length) != -1); + + // check for an element we know isn't there + assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE) + .toByteArray(), 0, + BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1); + + // since we just checked for this element, it should be there now. + assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE) + .toByteArray(), 0, + BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1); + + // test eviction, that the least recently added or looked at element is + // evicted. + for(int i = 1; i < Short.MAX_VALUE; i++){ + assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, + BigInteger.valueOf(i).toByteArray().length) == -1); + } + + // check we can find all of these. + for (int i = 1; i < Short.MAX_VALUE; i++) { + assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, + BigInteger.valueOf(i).toByteArray().length) != -1); + } + } + + static private boolean dictIsEmpty(LRUDictionary dict) { + for (short i = 0; i < Short.MAX_VALUE; i++) { + if (dict.getEntry(i) != null) { + return false; + } + } + return true; + } +} Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1299099) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -644,6 +644,9 @@ /** File permission umask to use when creating hbase data files */ public static final String DATA_FILE_UMASK_KEY = "hbase.data.umask"; + /** Configuration name of HLog Compression */ + public static final String ENABLE_WAL_COMPRESSION = "hbase.regionserver.wal.enablecompression"; + private HConstants() { // Can't be instantiated with this ctor. } Index: src/main/java/org/apache/hadoop/hbase/util/Bytes.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/Bytes.java (revision 1299099) +++ src/main/java/org/apache/hadoop/hbase/util/Bytes.java (working copy) @@ -1461,6 +1461,18 @@ } /** + * @param bytes array to hash + * @param offset offset to start from + * @param length length to hash + * */ + public static int hashCode(byte[] bytes, int offset, int length) { + int hash = 1; + for (int i = offset; i < offset + length; i++) + hash = (31 * hash) + (int) bytes[i]; + return hash; + } + + /** * @param t operands * @return Array of byte arrays made from passed array of Text */ Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (revision 1299099) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (working copy) @@ -46,7 +46,8 @@ @InterfaceAudience.Private public class HLogKey implements WritableComparable { // should be < 0 (@see #readFields(DataInput)) - private static final int VERSION = -1; + // version 2 supports HLog compression + private static final int VERSION = -2; // The encoded region name. private byte [] encodedRegionName; @@ -57,7 +58,9 @@ private UUID clusterId; - /** Writable Consructor -- Do not use. */ + private CompressionContext keyContext; + + /** Writable Constructor -- Do not use. */ public HLogKey() { this(null, null, 0L, HConstants.LATEST_TIMESTAMP, HConstants.DEFAULT_CLUSTER_ID); @@ -84,6 +87,18 @@ this.clusterId = clusterId; } + /** + * Enables compression. + * + * @param tableDict + * dictionary used for compressing table + * @param regionDict + * dictionary used for compressing region + */ + public void enableCompression(CompressionContext keyContext) { + this.keyContext = keyContext; + } + /** @return encoded region name */ public byte [] getEncodedRegionName() { return encodedRegionName; @@ -216,8 +231,16 @@ @Override public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, VERSION); - Bytes.writeByteArray(out, this.encodedRegionName); - Bytes.writeByteArray(out, this.tablename); + if (keyContext == null) { + Bytes.writeByteArray(out, this.encodedRegionName); + Bytes.writeByteArray(out, this.tablename); + } else { + Compressor.writeCompressed(this.encodedRegionName, 0, + this.encodedRegionName.length, out, + keyContext.regionDict); + Compressor.writeCompressed(this.tablename, 0, this.tablename.length, out, + keyContext.tableDict); + } out.writeLong(this.logSeqNum); out.writeLong(this.writeTime); // avoid storing 16 bytes when replication is not enabled @@ -245,11 +268,22 @@ if (len < 0) { // what we just read was the version version = len; - len = WritableUtils.readVInt(in); + // We only compress V2 of HLogkey. + // If compression is on, the length is handled by the dictionary + if (keyContext == null || version == -1) { + len = WritableUtils.readVInt(in); + } } - this.encodedRegionName = new byte[len]; - in.readFully(this.encodedRegionName); - this.tablename = Bytes.readByteArray(in); + if (keyContext == null || version == -1) { + this.encodedRegionName = new byte[len]; + in.readFully(this.encodedRegionName); + this.tablename = Bytes.readByteArray(in); + } else { + this.encodedRegionName = Compressor.readCompressed(in, + keyContext.regionDict); + this.tablename = Compressor.readCompressed(in, keyContext.tableDict); + } + this.logSeqNum = in.readLong(); this.writeTime = in.readLong(); this.clusterId = HConstants.DEFAULT_CLUSTER_ID; Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java (revision 0) @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +/** + * Dictionary interface + * + * Dictionary indexes should be either bytes or shorts, only positive. (The + * first bit is reserved for detecting whether something is compressed or not). + */ + +interface Dictionary { + /** + * Gets an entry from the dictionary. + * + * @param idx index of the entry + * @return the entry, or null if non existent + */ + public byte[] getEntry(short idx); + + /** + * Finds the index of an entry + * + * @param data the byte array that we're looking up + * @return the index of the entry, or -1 if not found + */ + public short findEntry(byte[] data, int offset, int length); + + /** + * Adds an entry to the dictionary + * + * @param data the entry to add + * @return the index of the entry + */ + + public short addEntry(byte[] data, int offset, int length); + + /** + * Flushes the dictionary, empties all values. + */ + public void clear(); +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (revision 0) @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Context that holds the various dictionaries for compression in HLog. + */ +class CompressionContext { + final Dictionary regionDict; + final Dictionary tableDict; + final Dictionary familyDict; + final Dictionary qualifierDict; + final Dictionary rowDict; + + public CompressionContext(Class dictType) + throws SecurityException, NoSuchMethodException, + IllegalArgumentException, InstantiationException, IllegalAccessException, + InvocationTargetException { + Constructor dictConstructor = dictType + .getConstructor(); + + regionDict = dictConstructor.newInstance(); + tableDict = dictConstructor.newInstance(); + familyDict = dictConstructor.newInstance(); + qualifierDict = dictConstructor.newInstance(); + rowDict = dictConstructor.newInstance(); + } + + void clear() { + regionDict.clear(); + tableDict.clear(); + familyDict.clear(); + qualifierDict.clear(); + rowDict.clear(); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (revision 1299099) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (working copy) @@ -25,6 +25,7 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +34,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.compress.CompressionCodec; @@ -54,6 +57,10 @@ private Class keyClass; + boolean compression = false; + + private CompressionContext writeContext; + private Method syncFs = null; private Method hflush = null; @@ -77,6 +84,21 @@ @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { + compression = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + TreeMap metaMap = new TreeMap(); + if (compression) { + try { + if (writeContext == null) { + writeContext = new CompressionContext(LRUDictionary.class); + } else { + writeContext.clear(); + } + LOG.info("WAL compression enabled for " + path); + metaMap.put(new Text(HConstants.ENABLE_WAL_COMPRESSION), new Text("true")); + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext", e); + } + } if (null == keyClass) { keyClass = HLog.getKeyClass(conf); @@ -101,7 +123,7 @@ fs.getDefaultBlockSize())), Boolean.valueOf(false) /*createParent*/, SequenceFile.CompressionType.NONE, new DefaultCodec(), - new Metadata() + new Metadata(metaMap) }); } catch (InvocationTargetException ite) { // function was properly called, but threw it's own exception @@ -123,7 +145,7 @@ SequenceFile.CompressionType.NONE, new DefaultCodec(), null, - new Metadata()); + new Metadata(metaMap)); } else { LOG.debug("using new createWriter -- HADOOP-6840"); } @@ -207,6 +229,9 @@ @Override public void append(HLog.Entry entry) throws IOException { + if (compression) { + entry.enableCompression(writeContext); + } this.writer.append(entry.getKey(), entry.getEdit()); } Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java (revision 0) @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +/** + * Compression class for Key/Value pairs before writing them to WAL. This is not + * synchronized, so synchronization should be handled outside. + * + * Class only compresses and uncompresses row keys, family names, and the + * qualifier. More may be added depending on use patterns. + */ + +class KeyValueCompression { + /** + * Uncompresses a KeyValue from a DataInput and returns it. + * + * @param in the DataInput + * @param readContext the compressionContext to use. + * @return an uncompressed KeyValue + * @throws IOException + */ + + public static KeyValue readFields(DataInput in, CompressionContext readContext) + throws IOException { + int keylength = WritableUtils.readVInt(in); + int vlength = WritableUtils.readVInt(in); + int length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; + + byte[] backingArray = new byte[length]; + int pos = 0; + pos = Bytes.putInt(backingArray, pos, keylength); + pos = Bytes.putInt(backingArray, pos, vlength); + + // the row + pos = Compressor.uncompressIntoArray(backingArray, pos, in, + Bytes.SIZEOF_SHORT, readContext.rowDict); + + // family + pos = Compressor.uncompressIntoArray(backingArray, pos, in, + Bytes.SIZEOF_BYTE, readContext.familyDict); + + // qualifier + pos = Compressor.uncompressIntoArray(backingArray, pos, in, 0, + readContext.qualifierDict); + + // the rest + in.readFully(backingArray, pos, length - pos); + + return new KeyValue(backingArray); + + } + + /** + * Compresses and writes ourKV to out, a DataOutput. + * + * @param out the DataOutput + * @param keyVal the KV to compress and write + * @param writeContext the compressionContext to use. + * @throws IOException + */ + public static void write(final DataOutput out, KeyValue keyVal, + CompressionContext writeContext) throws IOException { + byte[] backingArray = keyVal.getBuffer(); + int offset = keyVal.getOffset(); + + // we first write the KeyValue infrastructure as VInts. + WritableUtils.writeVInt(out, keyVal.getKeyLength()); + WritableUtils.writeVInt(out, keyVal.getValueLength()); + + // now we write the row key, as the row key is likely to be repeated + // We save space only if we attempt to compress elements with duplicates + Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getRowOffset(), + keyVal.getRowLength(), out, writeContext.rowDict); + + + // now family, if it exists. if it doesn't, we write a 0 length array. + Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getFamilyOffset(), + keyVal.getFamilyLength(), out, writeContext.familyDict); + + // qualifier next + Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getQualifierOffset(), + keyVal.getQualifierLength(), out, + writeContext.qualifierDict); + + // now we write the rest uncompressed + int pos = keyVal.getTimestampOffset(); + int remainingLength = keyVal.getLength() + offset - (pos); + out.write(backingArray, pos, remainingLength); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java (revision 0) @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.HashMap; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * WALDictionary using an LRU eviction algorithm. Uses a linked list running + * through a hashtable. + */ +public class LRUDictionary implements Dictionary { + + private final BidirectionalLRUMap backingStore = new BidirectionalLRUMap(); + + @Override + public byte[] getEntry(short idx) { + return backingStore.get(idx); + + } + + @Override + public short findEntry(byte[] data, int offset, int length) { + short ret = backingStore.findIdx(data, offset, length); + if (ret == -1) { + backingStore.put(data, offset, length); + } + return ret; + } + + @Override + public short addEntry(byte[] data, int offset, int length) { + return backingStore.put(data, offset, length); + } + + @Override + public void clear() { + backingStore.clear(); + } + + /* + * Internal class used to implement LRU eviction and dual lookup (by key and + * value). + * + * This is not thread safe. Don't use in thread safe applications + */ + private static class BidirectionalLRUMap { + private final int maxSize = Short.MAX_VALUE; + private int currSize = 0; + + private Node tail; + private Node head; + + private HashMap nodeToIndex = new HashMap(); + private Node[] indexToNode = new Node[maxSize]; + + public BidirectionalLRUMap() { + for (int i = 0; i < maxSize; i++) { + indexToNode[i] = new Node(); + } + } + + private short put(byte[] array, int offset, int length) { + // We copy the bits we want, otherwise we might be holding references to + // massive arrays in our dictionary. + byte[] stored = new byte[length]; + Bytes.putBytes(stored, 0, array, offset, length); + + if (currSize < maxSize) { + indexToNode[currSize].setContents(stored, 0, stored.length); + moveToFront(indexToNode[currSize]); + short ret = (short) currSize++; + nodeToIndex.put(indexToNode[ret], ret); + return ret; + } else { + short s = nodeToIndex.remove(tail); + tail.setContents(stored, 0, stored.length); + // we need to rehash this. + nodeToIndex.put(tail, s); + moveToFront(tail); + return s; + } + } + + private short findIdx(byte[] array, int offset, int length) { + Short s; + final Node comparisonNode = new Node(); + comparisonNode.setContents(array, offset, length); + if ((s = nodeToIndex.get(comparisonNode)) != null) { + moveToFront(indexToNode[s]); + return s; + } else { + return -1; + } + } + + private byte[] get(short idx) { + moveToFront(indexToNode[idx]); + return indexToNode[idx].container; + } + + private void moveToFront(Node n) { + + if (tail == null) { + tail = n; + } + + if (head == n) { + return; + } + + if (tail == n) { + if (n.next != null) { + tail = n.next; + } + } + + if (n.next != null) { + n.next.prev = n.prev; + } + if (n.prev != null) { + n.prev.next = n.next; + } + + n.prev = head; + if (head != null) { + head.next = n; + } + n.next = null; + + head = n; + } + + private void clear() { + currSize = 0; + nodeToIndex.clear(); + tail = null; + head = null; + + for (Node n : indexToNode) { + n.container = null; + } + + for (int i = 0; i < maxSize; i++) { + indexToNode[i].next = null; + indexToNode[i].prev = null; + } + } + + private static class Node { + byte[] container; + int offset; + int length; + Node next; + Node prev; + + public Node() { + } + + private void setContents(byte[] container, int offset, int length) { + this.container = container; + this.offset = offset; + this.length = length; + } + + @Override + public int hashCode() { + return Bytes.hashCode(container, offset, length); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof Node)) { + return false; + } + + Node casted = (Node) other; + return Bytes.equals(container, offset, length, casted.container, + casted.offset, casted.length); + } + } + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1299099) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -1670,6 +1670,13 @@ public HLogKey getKey() { return key; } + /** + * Enables the compression for this entry + */ + public void enableCompression(CompressionContext writeContext) { + edit.enableCompression(writeContext); + key.enableCompression(writeContext); + } @Override public String toString() { @@ -1687,6 +1694,7 @@ this.key.readFields(dataInput); this.edit.readFields(dataInput); } + } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (revision 1299099) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (working copy) @@ -72,12 +72,20 @@ public class WALEdit implements Writable, HeapSize { private final int VERSION_2 = -1; + + private boolean compressed = false; private final ArrayList kvs = new ArrayList(); private NavigableMap scopes; + private CompressionContext dictContext; + public WALEdit() { } + + public void enableCompression(final CompressionContext dictContext) { + this.dictContext = dictContext; + } public void add(KeyValue kv) { this.kvs.add(kv); @@ -116,9 +124,13 @@ // this is new style HLog entry containing multiple KeyValues. int numEdits = in.readInt(); for (int idx = 0; idx < numEdits; idx++) { - KeyValue kv = new KeyValue(); - kv.readFields(in); - this.add(kv); + if (dictContext != null) { + this.add(KeyValueCompression.readFields(in, dictContext)); + } else { + KeyValue kv = new KeyValue(); + kv.readFields(in); + this.add(kv); + } } int numFamilies = in.readInt(); if (numFamilies > 0) { @@ -133,7 +145,7 @@ } } else { // this is an old style HLog entry. The int that we just - // read is actually the length of a single KeyValue. + // read is actually the length of a single KeyValue KeyValue kv = new KeyValue(); kv.readFields(versionOrLength, in); this.add(kv); @@ -146,7 +158,11 @@ out.writeInt(kvs.size()); // We interleave the two lists for code simplicity for (KeyValue kv : kvs) { - kv.write(out); + if (dictContext != null) { + KeyValueCompression.write(out, kv, dictContext); + } else{ + kv.write(out); + } } if (scopes == null) { out.writeInt(0); Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java (revision 0) @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import com.google.common.base.Preconditions; + +/** + * A set of static functions for compressing and uncompressing byte arrays out + * of a DataOutput/Input. Also contains a command line tool to compress and + * uncompress HLogs. + */ + +public class Compressor { + + static final byte NOT_IN_DICTIONARY = -1; + + /** + * Command line tool to compress and uncompress WALs. + */ + public static void main(String[] args) throws IOException { + if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) { + printHelp(); + System.exit(-1); + } + + Path inputPath = new Path(args[0]); + Path outputPath = new Path(args[1]); + + transformFile(inputPath, outputPath); + } + + private static void printHelp() { + System.err.println("Compressor \n"); + System.err.println("If HLog is compressed, would be decompressed."); + System.err.println("If HLog is uncompressed, would be compressed."); + return; + } + + private static void transformFile(Path input, Path output) + throws IOException { + SequenceFileLogReader in = new SequenceFileLogReader(); + SequenceFileLogWriter out = new SequenceFileLogWriter(); + + try { + Configuration conf = new Configuration(); + + FileSystem inFS = input.getFileSystem(conf); + FileSystem outFS = output.getFileSystem(conf); + + in.init(inFS, input, conf); + boolean compress = in.reader.WALCompressionEnabled(); + + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); + out.init(outFS, output, conf); + + Entry e; + + while ((e = in.next()) != null) { + if (compress) e.enableCompression(null); + out.append(e); + } + } finally { + in.close(); + out.close(); + } + } + + /** + * Reads the next compressed entry and returns it as a byte array + * + * @param in the DataInput to read from + * @param dict the dictionary we use for our read. + * + * @param the uncompressed array. + */ + static byte[] readCompressed(DataInput in, Dictionary dict) + throws IOException { + byte status = in.readByte(); + int length; + short dictIdx; + byte[] entry = null; + + if (status == NOT_IN_DICTIONARY) { + length = WritableUtils.readVInt(in); + // if this isn't in the dictionary, we need to add to the dictionary. + byte[] arr = new byte[length]; + in.readFully(arr); + dict.addEntry(arr, 0, length); + return arr; + } else { + dictIdx = toShort(status, in.readByte()); + entry = dict.getEntry(dictIdx); + if (entry == null) { + throw new IOException("Missing dictionary reference at offset " + + dictIdx); + } + return entry; + } + } + + /** + * Reads a compressed entry into an array. + * The output into the array ends up length-prefixed. + * + * @param to the array to write into + * @param offset array offset to start writing to + * @param in the DataInput to read from + * @param sizeBytes size of the length of the prefix. Oftentimes we do not use + * an int, because we can represent the entry as a byte or a short + * @param dict the dictionary to use for compression + * + * @return the index of the last byte written. + */ + static int uncompressIntoArray(byte[] to, int offset, DataInput in, + int sizeBytes, Dictionary dict) throws IOException { + byte status = in.readByte(); + int length; + short dictIdx; + int pos = offset; + byte[] entry = null; + + if (status == NOT_IN_DICTIONARY) { + // status byte indicating that data to be read is not in dictionary. + length = WritableUtils.readVInt(in); + } else { + // the status byte also acts as the higher order byte of the dictionary + // entry + dictIdx = toShort(status, in.readByte()); + entry = dict.getEntry(dictIdx); + length = entry.length; + } + + // sometimes, we need to write the size of the byte array, different + // datatypes may be used + if (sizeBytes == Bytes.SIZEOF_BYTE) { + pos = Bytes.putByte(to, pos, (byte) (length & 0x0000ff)); + } else if (sizeBytes == Bytes.SIZEOF_SHORT) { + pos = Bytes.putShort(to, pos, (short) (length & 0x0000ffff)); + } else if (sizeBytes == Bytes.SIZEOF_INT) { + pos = Bytes.putInt(to, pos, length); + } else if (sizeBytes != 0) { + throw new IOException("sizeBytes of " + sizeBytes + " not supported"); + } + + if (status == NOT_IN_DICTIONARY) { + // if this isn't in the dictionary, we need to add to the dictionary. + byte[] arr = new byte[length]; + in.readFully(to, pos, length); + dict.addEntry(to, pos, length); + pos += length; + } else { + // now we write the uncompressed value. + pos = Bytes.putBytes(to, pos, entry, 0, length); + } + + return pos; + } + + /** + * Compresses and writes an array to a DataOutput + * + * @param data the array to write. + * @param out the DataOutput to write into + * @param dict the dictionary to use for compression + */ + + static void writeCompressed(byte[] data, int offset, int length, + DataOutput out, Dictionary dict) + throws IOException { + short dictIdx = dict.findEntry(data, offset, length); + if (dictIdx == NOT_IN_DICTIONARY) { + // not in dict + out.writeByte(NOT_IN_DICTIONARY); + WritableUtils.writeVInt(out, length); + out.write(data, offset, length); + } else { + out.writeShort(dictIdx); + } + } + + private static short toShort(byte hi, byte lo) { + short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF)); + Preconditions.checkArgument(s >= 0); + return s; + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (revision 1299099) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (working copy) @@ -23,10 +23,8 @@ import java.io.FilterInputStream; import java.io.IOException; import java.lang.Class; -import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,7 +33,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; @InterfaceAudience.Private public class SequenceFileLogReader implements HLog.Reader { @@ -57,8 +57,8 @@ WALReader(final FileSystem fs, final Path p, final Configuration c) throws IOException { + super(fs, p, c); - } @Override @@ -69,7 +69,21 @@ bufferSize, length), length); } + private static Text ENABLE_WAL_COMPRESSION = + new Text(HConstants.ENABLE_WAL_COMPRESSION); + private static Text TRUE = new Text("true"); + /** + * Call this method after init() has been executed + * + * @return whether WAL compression is enabled + */ + public boolean WALCompressionEnabled() { + Text txt = this.getMetadata().get(ENABLE_WAL_COMPRESSION); + return txt != null && txt.equals(TRUE); + } + + /** * Override just so can intercept first call to getPos. */ static class WALReaderFSDataInputStream extends FSDataInputStream { @@ -134,12 +148,16 @@ } } + boolean compression; Configuration conf; WALReader reader; + // Needed logging exceptions Path path; int edit = 0; long entryStart = 0; + + private CompressionContext readContext = null; protected Class keyClass; @@ -159,13 +177,26 @@ this.keyClass = keyClass; } - @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { this.conf = conf; this.path = path; reader = new WALReader(fs, path, conf); + + // If compression is enabled, new dictionaries are created here. + compression = reader.WALCompressionEnabled(); + if (compression) { + try { + if (readContext == null) { + readContext = new CompressionContext(LRUDictionary.class); + } else { + readContext.clear(); + } + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext", e); + } + } } @Override @@ -205,6 +236,9 @@ } boolean b = false; try { + if (readContext != null) { + e.enableCompression(readContext); + } b = this.reader.next(e.getKey(), e.getEdit()); } catch (IOException ioe) { throw addFileInfoToException(ioe);