diff --git src/main/java/org/apache/hadoop/hbase/HConstants.java src/main/java/org/apache/hadoop/hbase/HConstants.java
index 045c6f3..8888347 100644
--- src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -644,6 +644,10 @@ public final class HConstants {
/** File permission umask to use when creating hbase data files */
public static final String DATA_FILE_UMASK_KEY = "hbase.data.umask";
+ /** Configuration name of HLog Compression */
+ public static final String ENABLE_WAL_COMPRESSION =
+ "hbase.regionserver.wal.enablecompression";
+
private HConstants() {
// Can't be instantiated with this ctor.
}
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
new file mode 100644
index 0000000..7a995a6
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Context that holds the various dictionaries for compression in HLog.
+ */
+class CompressionContext {
+ final Dictionary regionDict;
+ final Dictionary tableDict;
+ final Dictionary familyDict;
+ final Dictionary qualifierDict;
+ final Dictionary rowDict;
+
+ public CompressionContext(Class extends Dictionary> dictType)
+ throws SecurityException, NoSuchMethodException,
+ IllegalArgumentException, InstantiationException, IllegalAccessException,
+ InvocationTargetException {
+ Constructor extends Dictionary> dictConstructor = dictType
+ .getConstructor();
+
+ regionDict = dictConstructor.newInstance();
+ tableDict = dictConstructor.newInstance();
+ familyDict = dictConstructor.newInstance();
+ qualifierDict = dictConstructor.newInstance();
+ rowDict = dictConstructor.newInstance();
+ }
+
+ void clear() {
+ regionDict.clear();
+ tableDict.clear();
+ familyDict.clear();
+ qualifierDict.clear();
+ rowDict.clear();
+ }
+}
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
new file mode 100644
index 0000000..a5d8c71
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import com.google.common.base.Preconditions;
+
+/**
+ * A set of static functions for running our custom WAL compression/decompression.
+ * Also contains a command line tool to compress and uncompress HLogs.
+ */
+public class Compressor {
+ /**
+ * Command line tool to compress and uncompress WALs.
+ */
+ public static void main(String[] args) throws IOException {
+ if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
+ printHelp();
+ System.exit(-1);
+ }
+
+ Path inputPath = new Path(args[0]);
+ Path outputPath = new Path(args[1]);
+
+ transformFile(inputPath, outputPath);
+ }
+
+ private static void printHelp() {
+ System.err.println("usage: Compressor ");
+ System.err.println("If HLog is compressed, will be decompressed.");
+ System.err.println("If HLog is uncompressed, will be compressed.");
+ return;
+ }
+
+ private static void transformFile(Path input, Path output)
+ throws IOException {
+ SequenceFileLogReader in = new SequenceFileLogReader();
+ SequenceFileLogWriter out = new SequenceFileLogWriter();
+
+ try {
+ Configuration conf = HBaseConfiguration.create();
+
+ FileSystem inFS = input.getFileSystem(conf);
+ FileSystem outFS = output.getFileSystem(conf);
+
+ in.init(inFS, input, conf);
+ boolean compress = in.reader.isWALCompressionEnabled();
+
+ conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
+ out.init(outFS, output, conf);
+
+ Entry e = null;
+ while ((e = in.next()) != null) out.append(e);
+ } finally {
+ in.close();
+ out.close();
+ }
+ }
+
+ /**
+ * Reads the next compressed entry and returns it as a byte array
+ *
+ * @param in the DataInput to read from
+ * @param dict the dictionary we use for our read.
+ *
+ * @param the uncompressed array.
+ */
+ static byte[] readCompressed(DataInput in, Dictionary dict)
+ throws IOException {
+ byte status = in.readByte();
+ int length;
+ short dictIdx;
+ byte[] entry = null;
+
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ length = WritableUtils.readVInt(in);
+ // if this isn't in the dictionary, we need to add to the dictionary.
+ byte[] arr = new byte[length];
+ in.readFully(arr);
+ if (dict != null) dict.addEntry(arr, 0, length);
+ return arr;
+ } else {
+ dictIdx = toShort(status, in.readByte());
+ entry = dict.getEntry(dictIdx);
+ if (entry == null) {
+ throw new IOException("Missing dictionary reference at offset "
+ + dictIdx);
+ }
+ return entry;
+ }
+ }
+
+ /**
+ * Reads a compressed entry into an array.
+ * The output into the array ends up length-prefixed.
+ *
+ * @param to the array to write into
+ * @param offset array offset to start writing to
+ * @param in the DataInput to read from
+ * @param sizeBytes size of the length of the prefix. Oftentimes we do not use
+ * an int, because we can represent the entry as a byte or a short
+ * @param dict the dictionary to use for compression
+ *
+ * @return the index of the last byte written.
+ */
+ static int uncompressIntoArray(byte[] to, int offset, DataInput in,
+ int sizeBytes, Dictionary dict)
+ throws IOException {
+ byte status = in.readByte();
+ int length;
+ short dictIdx;
+ int pos = offset;
+ byte[] entry = null;
+
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ // status byte indicating that data to be read is not in dictionary.
+ length = WritableUtils.readVInt(in);
+ } else {
+ // the status byte also acts as the higher order byte of the dictionary
+ // entry
+ dictIdx = toShort(status, in.readByte());
+ entry = dict.getEntry(dictIdx);
+ length = entry.length;
+ }
+
+ // sometimes, we need to write the size of the byte array, different
+ // datatypes may be used
+ if (sizeBytes == Bytes.SIZEOF_BYTE) {
+ pos = Bytes.putByte(to, pos, (byte) (length & 0x0000ff));
+ } else if (sizeBytes == Bytes.SIZEOF_SHORT) {
+ pos = Bytes.putShort(to, pos, (short) (length & 0x0000ffff));
+ } else if (sizeBytes == Bytes.SIZEOF_INT) {
+ pos = Bytes.putInt(to, pos, length);
+ } else if (sizeBytes != 0) {
+ throw new IOException("sizeBytes of " + sizeBytes + " not supported");
+ }
+
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ // if this isn't in the dictionary, we need to add to the dictionary.
+ in.readFully(to, pos, length);
+ dict.addEntry(to, pos, length);
+ pos += length;
+ } else {
+ // now we write the uncompressed value.
+ pos = Bytes.putBytes(to, pos, entry, 0, length);
+ }
+
+ return pos;
+ }
+
+ /**
+ * Compresses and writes an array to a DataOutput
+ *
+ * @param data the array to write.
+ * @param out the DataOutput to write into
+ * @param dict the dictionary to use for compression
+ */
+
+ static void writeCompressed(byte[] data, int offset, int length,
+ DataOutput out, Dictionary dict)
+ throws IOException {
+ short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+ if (dict != null) dictIdx = dict.findEntry(data, offset, length);
+ if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+ // not in dict
+ out.writeByte(Dictionary.NOT_IN_DICTIONARY);
+ WritableUtils.writeVInt(out, length);
+ out.write(data, offset, length);
+ } else {
+ out.writeShort(dictIdx);
+ }
+ }
+
+ static short toShort(byte hi, byte lo) {
+ short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
+ Preconditions.checkArgument(s >= 0);
+ return s;
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
new file mode 100644
index 0000000..e35250c
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+/**
+ * Dictionary interface
+ *
+ * Dictionary indexes should be either bytes or shorts, only positive. (The
+ * first bit is reserved for detecting whether something is compressed or not).
+ */
+
+interface Dictionary {
+ static final byte NOT_IN_DICTIONARY = -1;
+
+ /**
+ * Gets an entry from the dictionary.
+ *
+ * @param idx index of the entry
+ * @return the entry, or null if non existent
+ */
+ public byte[] getEntry(short idx);
+
+ /**
+ * Finds the index of an entry.
+ * If no entry found, we add it.
+ *
+ * @param data the byte array that we're looking up
+ * @param offset Offset into data to add to Dictionary.
+ * @param length Length beyond offset that comprises entry; must be > 0.
+ * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
+ */
+ public short findEntry(byte[] data, int offset, int length);
+
+ /**
+ * Adds an entry to the dictionary.
+ * Be careful using this method. It will just add pass entry to the
+ * dictionary though it already has an entry.
+ * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating
+ * dictionary entries.
+ *
+ * @param data the entry to add
+ * @param offset Offset into data to add to Dictionary.
+ * @param length Length beyond offset that comprises entry; must be > 0.
+ * @return the index of the entry
+ */
+
+ public short addEntry(byte[] data, int offset, int length);
+
+ /**
+ * Flushes the dictionary, empties all values.
+ */
+ public void clear();
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index b5049b1..b11aed6 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -1656,6 +1656,7 @@ public class HLog implements Syncable {
this.key = key;
this.edit = edit;
}
+
/**
* Gets the edit
* @return edit
@@ -1663,6 +1664,7 @@ public class HLog implements Syncable {
public WALEdit getEdit() {
return edit;
}
+
/**
* Gets the key
* @return key
@@ -1671,6 +1673,15 @@ public class HLog implements Syncable {
return key;
}
+ /**
+ * Set compression context for this entry.
+ * @param compressionContext Compression context
+ */
+ public void setCompressionContext(CompressionContext compressionContext) {
+ edit.setCompressionContext(compressionContext);
+ key.setCompressionContext(compressionContext);
+ }
+
@Override
public String toString() {
return this.key + "=" + this.edit;
@@ -1687,6 +1698,7 @@ public class HLog implements Syncable {
this.key.readFields(dataInput);
this.edit.readFields(dataInput);
}
+
}
/**
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 311ea1b..719534a 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -46,7 +46,8 @@ import org.apache.hadoop.io.WritableUtils;
@InterfaceAudience.Private
public class HLogKey implements WritableComparable {
// should be < 0 (@see #readFields(DataInput))
- private static final int VERSION = -1;
+ // version 2 supports HLog compression
+ private static final int VERSION = -2;
// The encoded region name.
private byte [] encodedRegionName;
@@ -57,7 +58,9 @@ public class HLogKey implements WritableComparable {
private UUID clusterId;
- /** Writable Consructor -- Do not use. */
+ private CompressionContext compressionContext;
+
+ /** Writable Constructor -- Do not use. */
public HLogKey() {
this(null, null, 0L, HConstants.LATEST_TIMESTAMP,
HConstants.DEFAULT_CLUSTER_ID);
@@ -84,6 +87,18 @@ public class HLogKey implements WritableComparable {
this.clusterId = clusterId;
}
+ /**
+ * Enables compression.
+ *
+ * @param tableDict
+ * dictionary used for compressing table
+ * @param regionDict
+ * dictionary used for compressing region
+ */
+ public void setCompressionContext(CompressionContext compressionContext) {
+ this.compressionContext = compressionContext;
+ }
+
/** @return encoded region name */
public byte [] getEncodedRegionName() {
return encodedRegionName;
@@ -216,8 +231,16 @@ public class HLogKey implements WritableComparable {
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, VERSION);
- Bytes.writeByteArray(out, this.encodedRegionName);
- Bytes.writeByteArray(out, this.tablename);
+ if (compressionContext == null) {
+ Bytes.writeByteArray(out, this.encodedRegionName);
+ Bytes.writeByteArray(out, this.tablename);
+ } else {
+ Compressor.writeCompressed(this.encodedRegionName, 0,
+ this.encodedRegionName.length, out,
+ compressionContext.regionDict);
+ Compressor.writeCompressed(this.tablename, 0, this.tablename.length, out,
+ compressionContext.tableDict);
+ }
out.writeLong(this.logSeqNum);
out.writeLong(this.writeTime);
// avoid storing 16 bytes when replication is not enabled
@@ -245,11 +268,21 @@ public class HLogKey implements WritableComparable {
if (len < 0) {
// what we just read was the version
version = len;
- len = WritableUtils.readVInt(in);
+ // We only compress V2 of HLogkey.
+ // If compression is on, the length is handled by the dictionary
+ if (compressionContext == null || version == -1) {
+ len = WritableUtils.readVInt(in);
+ }
+ }
+ if (compressionContext == null || version == -1) {
+ this.encodedRegionName = new byte[len];
+ in.readFully(this.encodedRegionName);
+ this.tablename = Bytes.readByteArray(in);
+ } else {
+ this.encodedRegionName = Compressor.readCompressed(in, compressionContext.regionDict);
+ this.tablename = Compressor.readCompressed(in, compressionContext.tableDict);
}
- this.encodedRegionName = new byte[len];
- in.readFully(this.encodedRegionName);
- this.tablename = Bytes.readByteArray(in);
+
this.logSeqNum = in.readLong();
this.writeTime = in.readLong();
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java
new file mode 100644
index 0000000..616edc4
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Compression class for {@link KeyValue}s written to the WAL. This is not
+ * synchronized, so synchronization should be handled outside.
+ *
+ * Class only compresses and uncompresses row keys, family names, and the
+ * qualifier. More may be added depending on use patterns.
+ */
+class KeyValueCompression {
+ /**
+ * Uncompresses a KeyValue from a DataInput and returns it.
+ *
+ * @param in the DataInput
+ * @param readContext the compressionContext to use.
+ * @return an uncompressed KeyValue
+ * @throws IOException
+ */
+
+ public static KeyValue readFields(DataInput in, CompressionContext readContext)
+ throws IOException {
+ int keylength = WritableUtils.readVInt(in);
+ int vlength = WritableUtils.readVInt(in);
+ int length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
+
+ byte[] backingArray = new byte[length];
+ int pos = 0;
+ pos = Bytes.putInt(backingArray, pos, keylength);
+ pos = Bytes.putInt(backingArray, pos, vlength);
+
+ // the row
+ pos = Compressor.uncompressIntoArray(backingArray, pos, in,
+ Bytes.SIZEOF_SHORT, readContext.rowDict);
+
+ // family
+ pos = Compressor.uncompressIntoArray(backingArray, pos, in,
+ Bytes.SIZEOF_BYTE, readContext.familyDict);
+
+ // qualifier
+ pos = Compressor.uncompressIntoArray(backingArray, pos, in, 0,
+ readContext.qualifierDict);
+
+ // the rest
+ in.readFully(backingArray, pos, length - pos);
+
+ return new KeyValue(backingArray);
+
+ }
+
+ /**
+ * Compresses and writes ourKV to out, a DataOutput.
+ *
+ * @param out the DataOutput
+ * @param keyVal the KV to compress and write
+ * @param writeContext the compressionContext to use.
+ * @throws IOException
+ */
+ public static void write(final DataOutput out, KeyValue keyVal,
+ CompressionContext writeContext) throws IOException {
+ byte[] backingArray = keyVal.getBuffer();
+ int offset = keyVal.getOffset();
+
+ // we first write the KeyValue infrastructure as VInts.
+ WritableUtils.writeVInt(out, keyVal.getKeyLength());
+ WritableUtils.writeVInt(out, keyVal.getValueLength());
+
+ // now we write the row key, as the row key is likely to be repeated
+ // We save space only if we attempt to compress elements with duplicates
+ Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getRowOffset(),
+ keyVal.getRowLength(), out, writeContext.rowDict);
+
+
+ // now family, if it exists. if it doesn't, we write a 0 length array.
+ Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getFamilyOffset(),
+ keyVal.getFamilyLength(), out, writeContext.familyDict);
+
+ // qualifier next
+ Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getQualifierOffset(),
+ keyVal.getQualifierLength(), out,
+ writeContext.qualifierDict);
+
+ // now we write the rest uncompressed
+ int pos = keyVal.getTimestampOffset();
+ int remainingLength = keyVal.getLength() + offset - (pos);
+ out.write(backingArray, pos, remainingLength);
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
new file mode 100644
index 0000000..1b2e3e5
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * WALDictionary using an LRU eviction algorithm. Uses a linked list running
+ * through a hashtable.
+ */
+public class LRUDictionary implements Dictionary {
+ private final BidirectionalLRUMap backingStore = new BidirectionalLRUMap();
+
+ @Override
+ public byte[] getEntry(short idx) {
+ return backingStore.get(idx);
+ }
+
+ @Override
+ public short findEntry(byte[] data, int offset, int length) {
+ short ret = backingStore.findIdx(data, offset, length);
+ if (ret == NOT_IN_DICTIONARY) {
+ addEntry(data, offset, length);
+ }
+ return ret;
+ }
+
+ @Override
+ public short addEntry(byte[] data, int offset, int length) {
+ if (length <= 0) return NOT_IN_DICTIONARY;
+ return backingStore.put(data, offset, length);
+ }
+
+ @Override
+ public void clear() {
+ backingStore.clear();
+ }
+
+ /*
+ * Internal class used to implement LRU eviction and dual lookup (by key and
+ * value).
+ *
+ * This is not thread safe. Don't use in thread safe applications
+ */
+ static class BidirectionalLRUMap {
+ static final int MAXSIZE = Short.MAX_VALUE;
+ private int currSize = 0;
+
+ private Node tail;
+ private Node head;
+
+ private HashMap nodeToIndex = new HashMap();
+ private Node[] indexToNode = new Node[MAXSIZE];
+
+ public BidirectionalLRUMap() {
+ for (int i = 0; i < MAXSIZE; i++) {
+ indexToNode[i] = new Node();
+ }
+ }
+
+ private short put(byte[] array, int offset, int length) {
+ // We copy the bits we want, otherwise we might be holding references to
+ // massive arrays in our dictionary.
+ byte[] stored = new byte[length];
+ Bytes.putBytes(stored, 0, array, offset, length);
+
+ if (currSize < MAXSIZE) {
+ indexToNode[currSize].setContents(stored, 0, stored.length);
+ moveToFront(indexToNode[currSize]);
+ short ret = (short) currSize++;
+ nodeToIndex.put(indexToNode[ret], ret);
+ return ret;
+ } else {
+ short s = nodeToIndex.remove(tail);
+ tail.setContents(stored, 0, stored.length);
+ // we need to rehash this.
+ nodeToIndex.put(tail, s);
+ moveToFront(tail);
+ return s;
+ }
+ }
+
+ private short findIdx(byte[] array, int offset, int length) {
+ Short s;
+ final Node comparisonNode = new Node();
+ comparisonNode.setContents(array, offset, length);
+ if ((s = nodeToIndex.get(comparisonNode)) != null) {
+ moveToFront(indexToNode[s]);
+ return s;
+ } else {
+ return -1;
+ }
+ }
+
+ private byte[] get(short idx) {
+ moveToFront(indexToNode[idx]);
+ return indexToNode[idx].container;
+ }
+
+ private void moveToFront(Node n) {
+
+ if (tail == null) {
+ tail = n;
+ }
+
+ if (head == n) {
+ return;
+ }
+
+ if (tail == n) {
+ if (n.next != null) {
+ tail = n.next;
+ }
+ }
+
+ if (n.next != null) {
+ n.next.prev = n.prev;
+ }
+ if (n.prev != null) {
+ n.prev.next = n.next;
+ }
+
+ n.prev = head;
+ if (head != null) {
+ head.next = n;
+ }
+ n.next = null;
+
+ head = n;
+ }
+
+ private void clear() {
+ currSize = 0;
+ nodeToIndex.clear();
+ tail = null;
+ head = null;
+
+ for (Node n : indexToNode) {
+ n.container = null;
+ }
+
+ for (int i = 0; i < MAXSIZE; i++) {
+ indexToNode[i].next = null;
+ indexToNode[i].prev = null;
+ }
+ }
+
+ private static class Node {
+ byte[] container;
+ int offset;
+ int length;
+ Node next;
+ Node prev;
+
+ public Node() {
+ }
+
+ private void setContents(byte[] container, int offset, int length) {
+ this.container = container;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public int hashCode() {
+ return Bytes.hashCode(container, offset, length);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Node)) {
+ return false;
+ }
+
+ Node casted = (Node) other;
+ return Bytes.equals(container, offset, length, casted.container,
+ casted.offset, casted.length);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
index ff63a5f..c365f5a 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -22,11 +22,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.FilterInputStream;
import java.io.IOException;
-import java.lang.Class;
-import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
-import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,7 +55,6 @@ public class SequenceFileLogReader implements HLog.Reader {
WALReader(final FileSystem fs, final Path p, final Configuration c)
throws IOException {
super(fs, p, c);
-
}
@Override
@@ -70,6 +66,15 @@ public class SequenceFileLogReader implements HLog.Reader {
}
/**
+ * Call this method after init() has been executed
+ *
+ * @return whether WAL compression is enabled
+ */
+ public boolean isWALCompressionEnabled() {
+ return SequenceFileLogWriter.isWALCompressionEnabled(this.getMetadata());
+ }
+
+ /**
* Override just so can intercept first call to getPos.
*/
static class WALReaderFSDataInputStream extends FSDataInputStream {
@@ -136,10 +141,15 @@ public class SequenceFileLogReader implements HLog.Reader {
Configuration conf;
WALReader reader;
+
// Needed logging exceptions
Path path;
int edit = 0;
long entryStart = 0;
+ /**
+ * Compression context to use reading. Can be null if no compression.
+ */
+ private CompressionContext compressionContext = null;
protected Class extends HLogKey> keyClass;
@@ -159,19 +169,32 @@ public class SequenceFileLogReader implements HLog.Reader {
this.keyClass = keyClass;
}
-
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
this.conf = conf;
this.path = path;
reader = new WALReader(fs, path, conf);
+
+ // If compression is enabled, new dictionaries are created here.
+ boolean compression = reader.isWALCompressionEnabled();
+ if (compression) {
+ try {
+ if (compressionContext == null) {
+ compressionContext = new CompressionContext(LRUDictionary.class);
+ } else {
+ compressionContext.clear();
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed to initiate CompressionContext", e);
+ }
+ }
}
@Override
public void close() throws IOException {
try {
- reader.close();
+ if (reader != null) reader.close();
} catch (IOException ioe) {
throw addFileInfoToException(ioe);
}
@@ -205,6 +228,9 @@ public class SequenceFileLogReader implements HLog.Reader {
}
boolean b = false;
try {
+ if (compressionContext != null) {
+ e.setCompressionContext(compressionContext);
+ }
b = this.reader.next(e.getKey(), e.getEdit());
} catch (IOException ioe) {
throw addFileInfoToException(ioe);
@@ -259,4 +285,4 @@ public class SequenceFileLogReader implements HLog.Reader {
return ioe;
}
-}
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
index 01ebb5c..2bd5046 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
@@ -25,6 +25,7 @@ import java.io.OutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,7 +34,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -45,6 +48,16 @@ import org.apache.hadoop.io.compress.DefaultCodec;
*/
@InterfaceAudience.Private
public class SequenceFileLogWriter implements HLog.Writer {
+ static final Text WAL_VERSION_KEY = new Text("version");
+ // Let the version be 1. Let absence of a version meta tag be old, version 0.
+ // Set this version '1' to be the version that introduces compression,
+ // the COMPRESSION_VERSION.
+ private static final int COMPRESSION_VERSION = 1;
+ static final int VERSION = COMPRESSION_VERSION;
+ static final Text WAL_VERSION = new Text("" + VERSION);
+ static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
+ static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
+
private final Log LOG = LogFactory.getLog(this.getClass());
// The sequence file we delegate to.
private SequenceFile.Writer writer;
@@ -54,6 +67,13 @@ public class SequenceFileLogWriter implements HLog.Writer {
private Class extends HLogKey> keyClass;
+ /**
+ * Context used by our wal dictionary compressor. Null if we're not to do
+ * our custom dictionary compression. This custom WAL compression is distinct
+ * from sequencefile native compression.
+ */
+ private CompressionContext compressionContext;
+
private Method syncFs = null;
private Method hflush = null;
@@ -74,9 +94,56 @@ public class SequenceFileLogWriter implements HLog.Writer {
this.keyClass = keyClass;
}
+ /**
+ * Create sequence file Metadata for our WAL file with version and compression
+ * type (if any).
+ * @param conf
+ * @param compress
+ * @return Metadata instance.
+ */
+ private static Metadata createMetadata(final Configuration conf,
+ final boolean compress) {
+ TreeMap metaMap = new TreeMap();
+ metaMap.put(WAL_VERSION_KEY, WAL_VERSION);
+ if (compress) {
+ // Currently we only do one compression type.
+ metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
+ }
+ return new Metadata(metaMap);
+ }
+
+ /**
+ * Call this method after init() has been executed
+ *
+ * @return whether WAL compression is enabled
+ */
+ static boolean isWALCompressionEnabled(final Metadata metadata) {
+ // Check version is >= VERSION?
+ Text txt = metadata.get(WAL_VERSION_KEY);
+ if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
+ return false;
+ }
+ // Now check that compression type is present. Currently only one value.
+ txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
+ return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
+ }
+
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
+ // Should we do our custom WAL compression?
+ boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
+ if (compress) {
+ try {
+ if (this.compressionContext == null) {
+ this.compressionContext = new CompressionContext(LRUDictionary.class);
+ } else {
+ this.compressionContext.clear();
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed to initiate CompressionContext", e);
+ }
+ }
if (null == keyClass) {
keyClass = HLog.getKeyClass(conf);
@@ -101,7 +168,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
fs.getDefaultBlockSize())),
Boolean.valueOf(false) /*createParent*/,
SequenceFile.CompressionType.NONE, new DefaultCodec(),
- new Metadata()
+ createMetadata(conf, compress)
});
} catch (InvocationTargetException ite) {
// function was properly called, but threw it's own exception
@@ -123,7 +190,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
SequenceFile.CompressionType.NONE,
new DefaultCodec(),
null,
- new Metadata());
+ createMetadata(conf, compress));
} else {
LOG.debug("using new createWriter -- HADOOP-6840");
}
@@ -133,7 +200,8 @@ public class SequenceFileLogWriter implements HLog.Writer {
this.hflush = getHFlush();
String msg = "Path=" + path +
", syncFs=" + (this.syncFs != null) +
- ", hflush=" + (this.hflush != null);
+ ", hflush=" + (this.hflush != null) +
+ ", compression=" + compress;
if (this.syncFs != null || this.hflush != null) {
LOG.debug(msg);
} else {
@@ -207,6 +275,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
@Override
public void append(HLog.Entry entry) throws IOException {
+ entry.setCompressionContext(compressionContext);
this.writer.append(entry.getKey(), entry.getEdit());
}
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index d8f317c..70473ae 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -76,9 +76,15 @@ public class WALEdit implements Writable, HeapSize {
private final ArrayList kvs = new ArrayList();
private NavigableMap scopes;
+ private CompressionContext compressionContext;
+
public WALEdit() {
}
+ public void setCompressionContext(final CompressionContext compressionContext) {
+ this.compressionContext = compressionContext;
+ }
+
public void add(KeyValue kv) {
this.kvs.add(kv);
}
@@ -116,9 +122,13 @@ public class WALEdit implements Writable, HeapSize {
// this is new style HLog entry containing multiple KeyValues.
int numEdits = in.readInt();
for (int idx = 0; idx < numEdits; idx++) {
- KeyValue kv = new KeyValue();
- kv.readFields(in);
- this.add(kv);
+ if (compressionContext != null) {
+ this.add(KeyValueCompression.readFields(in, compressionContext));
+ } else {
+ KeyValue kv = new KeyValue();
+ kv.readFields(in);
+ this.add(kv);
+ }
}
int numFamilies = in.readInt();
if (numFamilies > 0) {
@@ -133,7 +143,7 @@ public class WALEdit implements Writable, HeapSize {
}
} else {
// this is an old style HLog entry. The int that we just
- // read is actually the length of a single KeyValue.
+ // read is actually the length of a single KeyValue
KeyValue kv = new KeyValue();
kv.readFields(versionOrLength, in);
this.add(kv);
@@ -146,7 +156,11 @@ public class WALEdit implements Writable, HeapSize {
out.writeInt(kvs.size());
// We interleave the two lists for code simplicity
for (KeyValue kv : kvs) {
- kv.write(out);
+ if (compressionContext != null) {
+ KeyValueCompression.write(out, kv, compressionContext);
+ } else{
+ kv.write(out);
+ }
}
if (scopes == null) {
out.writeInt(0);
@@ -187,4 +201,4 @@ public class WALEdit implements Writable, HeapSize {
return sb.toString();
}
-}
+}
\ No newline at end of file
diff --git src/main/java/org/apache/hadoop/hbase/util/Bytes.java src/main/java/org/apache/hadoop/hbase/util/Bytes.java
index de8e40b..e904770 100644
--- src/main/java/org/apache/hadoop/hbase/util/Bytes.java
+++ src/main/java/org/apache/hadoop/hbase/util/Bytes.java
@@ -1461,6 +1461,18 @@ public class Bytes {
}
/**
+ * @param bytes array to hash
+ * @param offset offset to start from
+ * @param length length to hash
+ * */
+ public static int hashCode(byte[] bytes, int offset, int length) {
+ int hash = 1;
+ for (int i = offset; i < offset + length; i++)
+ hash = (31 * hash) + (int) bytes[i];
+ return hash;
+ }
+
+ /**
* @param t operands
* @return Array of byte arrays made from passed array of Text
*/
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
new file mode 100644
index 0000000..eea5870
--- /dev/null
+++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test our compressor class.
+ */
+public class TestCompressor {
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @Test
+ public void testToShort() {
+ short s = 1;
+ assertEquals(s, Compressor.toShort((byte)0, (byte)1));
+ s <<= 8;
+ assertEquals(s, Compressor.toShort((byte)1, (byte)0));
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void testNegativeToShort() {
+ Compressor.toShort((byte)0xff, (byte)0xff);
+ }
+
+ @Test
+ public void testCompressingWithNullDictionaries() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ byte [] blahBytes = Bytes.toBytes("blah");
+ Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, null);
+ dos.close();
+ byte [] dosbytes = baos.toByteArray();
+ DataInputStream dis =
+ new DataInputStream(new ByteArrayInputStream(dosbytes));
+ byte [] product = Compressor.readCompressed(dis, null);
+ assertTrue(Bytes.equals(blahBytes, product));
+ }
+
+ @Test
+ public void testCompressingWithClearDictionaries() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ Dictionary dictionary = new LRUDictionary();
+ byte [] blahBytes = Bytes.toBytes("blah");
+ Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, dictionary);
+ dos.close();
+ byte [] dosbytes = baos.toByteArray();
+ DataInputStream dis =
+ new DataInputStream(new ByteArrayInputStream(dosbytes));
+ dictionary = new LRUDictionary();
+ byte [] product = Compressor.readCompressed(dis, dictionary);
+ assertTrue(Bytes.equals(blahBytes, product));
+ }
+}
\ No newline at end of file
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
new file mode 100644
index 0000000..6f295f2
--- /dev/null
+++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests LRUDictionary
+ */
+@Category(SmallTests.class)
+public class TestLRUDictionary {
+ LRUDictionary testee;
+
+ @Before
+ public void setUp() throws Exception {
+ testee = new LRUDictionary();
+ }
+
+ @Test
+ public void TestContainsNothing() {
+ assertTrue(isDictionaryEmpty(testee));
+ }
+
+ /**
+ * Assert can't add empty array.
+ */
+ @Test
+ public void testPassingEmptyArrayToFindEntry() {
+ assertEquals(Dictionary.NOT_IN_DICTIONARY,
+ testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+ assertEquals(Dictionary.NOT_IN_DICTIONARY,
+ testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
+ }
+
+ @Test (expected = IllegalArgumentException.class)
+ public void testPassingEmptyArrayToAddEntry() {
+ testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0);
+ }
+
+ @Test
+ public void testPassingSameArrayToAddEntry() {
+ // Add random predefined byte array, in this case a random byte array from
+ // HConstants. Assert that when we add, we get new index. Thats how it
+ // works.
+ int len = HConstants.CATALOG_FAMILY.length;
+ int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+ assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
+ }
+
+ @Test
+ public void testBasic() {
+ Random rand = new Random();
+ byte[] testBytes = new byte[10];
+ rand.nextBytes(testBytes);
+
+ // Verify that our randomly generated array doesn't exist in the dictionary
+ assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1);
+
+ // now since we looked up an entry, we should have added it to the
+ // dictionary, so it isn't empty
+
+ assertFalse(isDictionaryEmpty(testee));
+
+ // Check if we can find it using findEntry
+ short t = testee.findEntry(testBytes, 0, testBytes.length);
+
+ // Making sure we do find what we're looking for
+ assertTrue(t != -1);
+
+ byte[] testBytesCopy = new byte[20];
+
+ Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length);
+
+ // copy byte arrays, make sure that we check that equal byte arrays are
+ // equal without just checking the reference
+ assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t);
+
+ // make sure the entry retrieved is the same as the one put in
+ assertTrue(Arrays.equals(testBytes, testee.getEntry(t)));
+
+ testee.clear();
+
+ // making sure clear clears the dictionary
+ assertTrue(isDictionaryEmpty(testee));
+ }
+
+ @Test
+ public void TestLRUPolicy(){
+ //start by filling the dictionary up with byte arrays
+ for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) {
+ testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
+ (BigInteger.valueOf(i)).toByteArray().length);
+ }
+
+ // check we have the first element added
+ assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+ BigInteger.ZERO.toByteArray().length) != -1);
+
+ // check for an element we know isn't there
+ assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+ BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1);
+
+ // since we just checked for this element, it should be there now.
+ assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
+ BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1);
+
+ // test eviction, that the least recently added or looked at element is
+ // evicted. We looked at ZERO so it should be in the dictionary still.
+ assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
+ BigInteger.ZERO.toByteArray().length) != -1);
+ // Now go from beyond 1 to the end.
+ for(int i = 1; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) {
+ assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+ BigInteger.valueOf(i).toByteArray().length) == -1);
+ }
+
+ // check we can find all of these.
+ for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) {
+ assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
+ BigInteger.valueOf(i).toByteArray().length) != -1);
+ }
+ }
+
+ static private boolean isDictionaryEmpty(LRUDictionary dict) {
+ for (short i = 0; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) {
+ if (dict.getEntry(i) != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
\ No newline at end of file
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index a11899c..e594697 100644
--- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -61,7 +61,7 @@ import org.mockito.Mockito;
@Category(MediumTests.class)
public class TestWALReplay {
public static final Log LOG = LogFactory.getLog(TestWALReplay.class);
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
private Path hbaseRootDir = null;
private Path oldLogDir;
diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java
new file mode 100644
index 0000000..7e57359
--- /dev/null
+++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MediumTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Enables compression and runs the TestWALReplay tests.
+ */
+@Category(MediumTests.class)
+public class TestWALReplayCompressed extends TestWALReplay {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TestWALReplay.setUpBeforeClass();
+ Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
+ conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ }
+
+}