diff --git src/main/java/org/apache/hadoop/hbase/HConstants.java src/main/java/org/apache/hadoop/hbase/HConstants.java index 045c6f3..8888347 100644 --- src/main/java/org/apache/hadoop/hbase/HConstants.java +++ src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -644,6 +644,10 @@ public final class HConstants { /** File permission umask to use when creating hbase data files */ public static final String DATA_FILE_UMASK_KEY = "hbase.data.umask"; + /** Configuration name of HLog Compression */ + public static final String ENABLE_WAL_COMPRESSION = + "hbase.regionserver.wal.enablecompression"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java new file mode 100644 index 0000000..7a995a6 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Context that holds the various dictionaries for compression in HLog. + */ +class CompressionContext { + final Dictionary regionDict; + final Dictionary tableDict; + final Dictionary familyDict; + final Dictionary qualifierDict; + final Dictionary rowDict; + + public CompressionContext(Class dictType) + throws SecurityException, NoSuchMethodException, + IllegalArgumentException, InstantiationException, IllegalAccessException, + InvocationTargetException { + Constructor dictConstructor = dictType + .getConstructor(); + + regionDict = dictConstructor.newInstance(); + tableDict = dictConstructor.newInstance(); + familyDict = dictConstructor.newInstance(); + qualifierDict = dictConstructor.newInstance(); + rowDict = dictConstructor.newInstance(); + } + + void clear() { + regionDict.clear(); + tableDict.clear(); + familyDict.clear(); + qualifierDict.clear(); + rowDict.clear(); + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java new file mode 100644 index 0000000..a5d8c71 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import com.google.common.base.Preconditions; + +/** + * A set of static functions for running our custom WAL compression/decompression. + * Also contains a command line tool to compress and uncompress HLogs. + */ +public class Compressor { + /** + * Command line tool to compress and uncompress WALs. + */ + public static void main(String[] args) throws IOException { + if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) { + printHelp(); + System.exit(-1); + } + + Path inputPath = new Path(args[0]); + Path outputPath = new Path(args[1]); + + transformFile(inputPath, outputPath); + } + + private static void printHelp() { + System.err.println("usage: Compressor "); + System.err.println("If HLog is compressed, will be decompressed."); + System.err.println("If HLog is uncompressed, will be compressed."); + return; + } + + private static void transformFile(Path input, Path output) + throws IOException { + SequenceFileLogReader in = new SequenceFileLogReader(); + SequenceFileLogWriter out = new SequenceFileLogWriter(); + + try { + Configuration conf = HBaseConfiguration.create(); + + FileSystem inFS = input.getFileSystem(conf); + FileSystem outFS = output.getFileSystem(conf); + + in.init(inFS, input, conf); + boolean compress = in.reader.isWALCompressionEnabled(); + + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); + out.init(outFS, output, conf); + + Entry e = null; + while ((e = in.next()) != null) out.append(e); + } finally { + in.close(); + out.close(); + } + } + + /** + * Reads the next compressed entry and returns it as a byte array + * + * @param in the DataInput to read from + * @param dict the dictionary we use for our read. + * + * @param the uncompressed array. + */ + static byte[] readCompressed(DataInput in, Dictionary dict) + throws IOException { + byte status = in.readByte(); + int length; + short dictIdx; + byte[] entry = null; + + if (status == Dictionary.NOT_IN_DICTIONARY) { + length = WritableUtils.readVInt(in); + // if this isn't in the dictionary, we need to add to the dictionary. + byte[] arr = new byte[length]; + in.readFully(arr); + if (dict != null) dict.addEntry(arr, 0, length); + return arr; + } else { + dictIdx = toShort(status, in.readByte()); + entry = dict.getEntry(dictIdx); + if (entry == null) { + throw new IOException("Missing dictionary reference at offset " + + dictIdx); + } + return entry; + } + } + + /** + * Reads a compressed entry into an array. + * The output into the array ends up length-prefixed. + * + * @param to the array to write into + * @param offset array offset to start writing to + * @param in the DataInput to read from + * @param sizeBytes size of the length of the prefix. Oftentimes we do not use + * an int, because we can represent the entry as a byte or a short + * @param dict the dictionary to use for compression + * + * @return the index of the last byte written. + */ + static int uncompressIntoArray(byte[] to, int offset, DataInput in, + int sizeBytes, Dictionary dict) + throws IOException { + byte status = in.readByte(); + int length; + short dictIdx; + int pos = offset; + byte[] entry = null; + + if (status == Dictionary.NOT_IN_DICTIONARY) { + // status byte indicating that data to be read is not in dictionary. + length = WritableUtils.readVInt(in); + } else { + // the status byte also acts as the higher order byte of the dictionary + // entry + dictIdx = toShort(status, in.readByte()); + entry = dict.getEntry(dictIdx); + length = entry.length; + } + + // sometimes, we need to write the size of the byte array, different + // datatypes may be used + if (sizeBytes == Bytes.SIZEOF_BYTE) { + pos = Bytes.putByte(to, pos, (byte) (length & 0x0000ff)); + } else if (sizeBytes == Bytes.SIZEOF_SHORT) { + pos = Bytes.putShort(to, pos, (short) (length & 0x0000ffff)); + } else if (sizeBytes == Bytes.SIZEOF_INT) { + pos = Bytes.putInt(to, pos, length); + } else if (sizeBytes != 0) { + throw new IOException("sizeBytes of " + sizeBytes + " not supported"); + } + + if (status == Dictionary.NOT_IN_DICTIONARY) { + // if this isn't in the dictionary, we need to add to the dictionary. + in.readFully(to, pos, length); + dict.addEntry(to, pos, length); + pos += length; + } else { + // now we write the uncompressed value. + pos = Bytes.putBytes(to, pos, entry, 0, length); + } + + return pos; + } + + /** + * Compresses and writes an array to a DataOutput + * + * @param data the array to write. + * @param out the DataOutput to write into + * @param dict the dictionary to use for compression + */ + + static void writeCompressed(byte[] data, int offset, int length, + DataOutput out, Dictionary dict) + throws IOException { + short dictIdx = Dictionary.NOT_IN_DICTIONARY; + if (dict != null) dictIdx = dict.findEntry(data, offset, length); + if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { + // not in dict + out.writeByte(Dictionary.NOT_IN_DICTIONARY); + WritableUtils.writeVInt(out, length); + out.write(data, offset, length); + } else { + out.writeShort(dictIdx); + } + } + + static short toShort(byte hi, byte lo) { + short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF)); + Preconditions.checkArgument(s >= 0); + return s; + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java new file mode 100644 index 0000000..e35250c --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +/** + * Dictionary interface + * + * Dictionary indexes should be either bytes or shorts, only positive. (The + * first bit is reserved for detecting whether something is compressed or not). + */ + +interface Dictionary { + static final byte NOT_IN_DICTIONARY = -1; + + /** + * Gets an entry from the dictionary. + * + * @param idx index of the entry + * @return the entry, or null if non existent + */ + public byte[] getEntry(short idx); + + /** + * Finds the index of an entry. + * If no entry found, we add it. + * + * @param data the byte array that we're looking up + * @param offset Offset into data to add to Dictionary. + * @param length Length beyond offset that comprises entry; must be > 0. + * @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found + */ + public short findEntry(byte[] data, int offset, int length); + + /** + * Adds an entry to the dictionary. + * Be careful using this method. It will just add pass entry to the + * dictionary though it already has an entry. + * Call {{@link #findEntry(byte[], int, int)}} to add without duplicating + * dictionary entries. + * + * @param data the entry to add + * @param offset Offset into data to add to Dictionary. + * @param length Length beyond offset that comprises entry; must be > 0. + * @return the index of the entry + */ + + public short addEntry(byte[] data, int offset, int length); + + /** + * Flushes the dictionary, empties all values. + */ + public void clear(); +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index b5049b1..b11aed6 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -1656,6 +1656,7 @@ public class HLog implements Syncable { this.key = key; this.edit = edit; } + /** * Gets the edit * @return edit @@ -1663,6 +1664,7 @@ public class HLog implements Syncable { public WALEdit getEdit() { return edit; } + /** * Gets the key * @return key @@ -1671,6 +1673,15 @@ public class HLog implements Syncable { return key; } + /** + * Set compression context for this entry. + * @param compressionContext Compression context + */ + public void setCompressionContext(CompressionContext compressionContext) { + edit.setCompressionContext(compressionContext); + key.setCompressionContext(compressionContext); + } + @Override public String toString() { return this.key + "=" + this.edit; @@ -1687,6 +1698,7 @@ public class HLog implements Syncable { this.key.readFields(dataInput); this.edit.readFields(dataInput); } + } /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 311ea1b..719534a 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -46,7 +46,8 @@ import org.apache.hadoop.io.WritableUtils; @InterfaceAudience.Private public class HLogKey implements WritableComparable { // should be < 0 (@see #readFields(DataInput)) - private static final int VERSION = -1; + // version 2 supports HLog compression + private static final int VERSION = -2; // The encoded region name. private byte [] encodedRegionName; @@ -57,7 +58,9 @@ public class HLogKey implements WritableComparable { private UUID clusterId; - /** Writable Consructor -- Do not use. */ + private CompressionContext compressionContext; + + /** Writable Constructor -- Do not use. */ public HLogKey() { this(null, null, 0L, HConstants.LATEST_TIMESTAMP, HConstants.DEFAULT_CLUSTER_ID); @@ -84,6 +87,18 @@ public class HLogKey implements WritableComparable { this.clusterId = clusterId; } + /** + * Enables compression. + * + * @param tableDict + * dictionary used for compressing table + * @param regionDict + * dictionary used for compressing region + */ + public void setCompressionContext(CompressionContext compressionContext) { + this.compressionContext = compressionContext; + } + /** @return encoded region name */ public byte [] getEncodedRegionName() { return encodedRegionName; @@ -216,8 +231,16 @@ public class HLogKey implements WritableComparable { @Override public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, VERSION); - Bytes.writeByteArray(out, this.encodedRegionName); - Bytes.writeByteArray(out, this.tablename); + if (compressionContext == null) { + Bytes.writeByteArray(out, this.encodedRegionName); + Bytes.writeByteArray(out, this.tablename); + } else { + Compressor.writeCompressed(this.encodedRegionName, 0, + this.encodedRegionName.length, out, + compressionContext.regionDict); + Compressor.writeCompressed(this.tablename, 0, this.tablename.length, out, + compressionContext.tableDict); + } out.writeLong(this.logSeqNum); out.writeLong(this.writeTime); // avoid storing 16 bytes when replication is not enabled @@ -245,11 +268,21 @@ public class HLogKey implements WritableComparable { if (len < 0) { // what we just read was the version version = len; - len = WritableUtils.readVInt(in); + // We only compress V2 of HLogkey. + // If compression is on, the length is handled by the dictionary + if (compressionContext == null || version == -1) { + len = WritableUtils.readVInt(in); + } + } + if (compressionContext == null || version == -1) { + this.encodedRegionName = new byte[len]; + in.readFully(this.encodedRegionName); + this.tablename = Bytes.readByteArray(in); + } else { + this.encodedRegionName = Compressor.readCompressed(in, compressionContext.regionDict); + this.tablename = Compressor.readCompressed(in, compressionContext.tableDict); } - this.encodedRegionName = new byte[len]; - in.readFully(this.encodedRegionName); - this.tablename = Bytes.readByteArray(in); + this.logSeqNum = in.readLong(); this.writeTime = in.readLong(); this.clusterId = HConstants.DEFAULT_CLUSTER_ID; diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java new file mode 100644 index 0000000..616edc4 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +/** + * Compression class for {@link KeyValue}s written to the WAL. This is not + * synchronized, so synchronization should be handled outside. + * + * Class only compresses and uncompresses row keys, family names, and the + * qualifier. More may be added depending on use patterns. + */ +class KeyValueCompression { + /** + * Uncompresses a KeyValue from a DataInput and returns it. + * + * @param in the DataInput + * @param readContext the compressionContext to use. + * @return an uncompressed KeyValue + * @throws IOException + */ + + public static KeyValue readFields(DataInput in, CompressionContext readContext) + throws IOException { + int keylength = WritableUtils.readVInt(in); + int vlength = WritableUtils.readVInt(in); + int length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; + + byte[] backingArray = new byte[length]; + int pos = 0; + pos = Bytes.putInt(backingArray, pos, keylength); + pos = Bytes.putInt(backingArray, pos, vlength); + + // the row + pos = Compressor.uncompressIntoArray(backingArray, pos, in, + Bytes.SIZEOF_SHORT, readContext.rowDict); + + // family + pos = Compressor.uncompressIntoArray(backingArray, pos, in, + Bytes.SIZEOF_BYTE, readContext.familyDict); + + // qualifier + pos = Compressor.uncompressIntoArray(backingArray, pos, in, 0, + readContext.qualifierDict); + + // the rest + in.readFully(backingArray, pos, length - pos); + + return new KeyValue(backingArray); + + } + + /** + * Compresses and writes ourKV to out, a DataOutput. + * + * @param out the DataOutput + * @param keyVal the KV to compress and write + * @param writeContext the compressionContext to use. + * @throws IOException + */ + public static void write(final DataOutput out, KeyValue keyVal, + CompressionContext writeContext) throws IOException { + byte[] backingArray = keyVal.getBuffer(); + int offset = keyVal.getOffset(); + + // we first write the KeyValue infrastructure as VInts. + WritableUtils.writeVInt(out, keyVal.getKeyLength()); + WritableUtils.writeVInt(out, keyVal.getValueLength()); + + // now we write the row key, as the row key is likely to be repeated + // We save space only if we attempt to compress elements with duplicates + Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getRowOffset(), + keyVal.getRowLength(), out, writeContext.rowDict); + + + // now family, if it exists. if it doesn't, we write a 0 length array. + Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getFamilyOffset(), + keyVal.getFamilyLength(), out, writeContext.familyDict); + + // qualifier next + Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getQualifierOffset(), + keyVal.getQualifierLength(), out, + writeContext.qualifierDict); + + // now we write the rest uncompressed + int pos = keyVal.getTimestampOffset(); + int remainingLength = keyVal.getLength() + offset - (pos); + out.write(backingArray, pos, remainingLength); + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java new file mode 100644 index 0000000..1b2e3e5 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.HashMap; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * WALDictionary using an LRU eviction algorithm. Uses a linked list running + * through a hashtable. + */ +public class LRUDictionary implements Dictionary { + private final BidirectionalLRUMap backingStore = new BidirectionalLRUMap(); + + @Override + public byte[] getEntry(short idx) { + return backingStore.get(idx); + } + + @Override + public short findEntry(byte[] data, int offset, int length) { + short ret = backingStore.findIdx(data, offset, length); + if (ret == NOT_IN_DICTIONARY) { + addEntry(data, offset, length); + } + return ret; + } + + @Override + public short addEntry(byte[] data, int offset, int length) { + if (length <= 0) return NOT_IN_DICTIONARY; + return backingStore.put(data, offset, length); + } + + @Override + public void clear() { + backingStore.clear(); + } + + /* + * Internal class used to implement LRU eviction and dual lookup (by key and + * value). + * + * This is not thread safe. Don't use in thread safe applications + */ + static class BidirectionalLRUMap { + static final int MAXSIZE = Short.MAX_VALUE; + private int currSize = 0; + + private Node tail; + private Node head; + + private HashMap nodeToIndex = new HashMap(); + private Node[] indexToNode = new Node[MAXSIZE]; + + public BidirectionalLRUMap() { + for (int i = 0; i < MAXSIZE; i++) { + indexToNode[i] = new Node(); + } + } + + private short put(byte[] array, int offset, int length) { + // We copy the bits we want, otherwise we might be holding references to + // massive arrays in our dictionary. + byte[] stored = new byte[length]; + Bytes.putBytes(stored, 0, array, offset, length); + + if (currSize < MAXSIZE) { + indexToNode[currSize].setContents(stored, 0, stored.length); + moveToFront(indexToNode[currSize]); + short ret = (short) currSize++; + nodeToIndex.put(indexToNode[ret], ret); + return ret; + } else { + short s = nodeToIndex.remove(tail); + tail.setContents(stored, 0, stored.length); + // we need to rehash this. + nodeToIndex.put(tail, s); + moveToFront(tail); + return s; + } + } + + private short findIdx(byte[] array, int offset, int length) { + Short s; + final Node comparisonNode = new Node(); + comparisonNode.setContents(array, offset, length); + if ((s = nodeToIndex.get(comparisonNode)) != null) { + moveToFront(indexToNode[s]); + return s; + } else { + return -1; + } + } + + private byte[] get(short idx) { + moveToFront(indexToNode[idx]); + return indexToNode[idx].container; + } + + private void moveToFront(Node n) { + + if (tail == null) { + tail = n; + } + + if (head == n) { + return; + } + + if (tail == n) { + if (n.next != null) { + tail = n.next; + } + } + + if (n.next != null) { + n.next.prev = n.prev; + } + if (n.prev != null) { + n.prev.next = n.next; + } + + n.prev = head; + if (head != null) { + head.next = n; + } + n.next = null; + + head = n; + } + + private void clear() { + currSize = 0; + nodeToIndex.clear(); + tail = null; + head = null; + + for (Node n : indexToNode) { + n.container = null; + } + + for (int i = 0; i < MAXSIZE; i++) { + indexToNode[i].next = null; + indexToNode[i].prev = null; + } + } + + private static class Node { + byte[] container; + int offset; + int length; + Node next; + Node prev; + + public Node() { + } + + private void setContents(byte[] container, int offset, int length) { + this.container = container; + this.offset = offset; + this.length = length; + } + + @Override + public int hashCode() { + return Bytes.hashCode(container, offset, length); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof Node)) { + return false; + } + + Node casted = (Node) other; + return Bytes.equals(container, offset, length, casted.container, + casted.offset, casted.length); + } + } + } +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index ff63a5f..c365f5a 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -22,11 +22,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.FilterInputStream; import java.io.IOException; -import java.lang.Class; -import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -58,7 +55,6 @@ public class SequenceFileLogReader implements HLog.Reader { WALReader(final FileSystem fs, final Path p, final Configuration c) throws IOException { super(fs, p, c); - } @Override @@ -70,6 +66,15 @@ public class SequenceFileLogReader implements HLog.Reader { } /** + * Call this method after init() has been executed + * + * @return whether WAL compression is enabled + */ + public boolean isWALCompressionEnabled() { + return SequenceFileLogWriter.isWALCompressionEnabled(this.getMetadata()); + } + + /** * Override just so can intercept first call to getPos. */ static class WALReaderFSDataInputStream extends FSDataInputStream { @@ -136,10 +141,15 @@ public class SequenceFileLogReader implements HLog.Reader { Configuration conf; WALReader reader; + // Needed logging exceptions Path path; int edit = 0; long entryStart = 0; + /** + * Compression context to use reading. Can be null if no compression. + */ + private CompressionContext compressionContext = null; protected Class keyClass; @@ -159,19 +169,32 @@ public class SequenceFileLogReader implements HLog.Reader { this.keyClass = keyClass; } - @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { this.conf = conf; this.path = path; reader = new WALReader(fs, path, conf); + + // If compression is enabled, new dictionaries are created here. + boolean compression = reader.isWALCompressionEnabled(); + if (compression) { + try { + if (compressionContext == null) { + compressionContext = new CompressionContext(LRUDictionary.class); + } else { + compressionContext.clear(); + } + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext", e); + } + } } @Override public void close() throws IOException { try { - reader.close(); + if (reader != null) reader.close(); } catch (IOException ioe) { throw addFileInfoToException(ioe); } @@ -205,6 +228,9 @@ public class SequenceFileLogReader implements HLog.Reader { } boolean b = false; try { + if (compressionContext != null) { + e.setCompressionContext(compressionContext); + } b = this.reader.next(e.getKey(), e.getEdit()); } catch (IOException ioe) { throw addFileInfoToException(ioe); @@ -259,4 +285,4 @@ public class SequenceFileLogReader implements HLog.Reader { return ioe; } -} +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 01ebb5c..2bd5046 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -25,6 +25,7 @@ import java.io.OutputStream; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +34,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.compress.CompressionCodec; @@ -45,6 +48,16 @@ import org.apache.hadoop.io.compress.DefaultCodec; */ @InterfaceAudience.Private public class SequenceFileLogWriter implements HLog.Writer { + static final Text WAL_VERSION_KEY = new Text("version"); + // Let the version be 1. Let absence of a version meta tag be old, version 0. + // Set this version '1' to be the version that introduces compression, + // the COMPRESSION_VERSION. + private static final int COMPRESSION_VERSION = 1; + static final int VERSION = COMPRESSION_VERSION; + static final Text WAL_VERSION = new Text("" + VERSION); + static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type"); + static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary"); + private final Log LOG = LogFactory.getLog(this.getClass()); // The sequence file we delegate to. private SequenceFile.Writer writer; @@ -54,6 +67,13 @@ public class SequenceFileLogWriter implements HLog.Writer { private Class keyClass; + /** + * Context used by our wal dictionary compressor. Null if we're not to do + * our custom dictionary compression. This custom WAL compression is distinct + * from sequencefile native compression. + */ + private CompressionContext compressionContext; + private Method syncFs = null; private Method hflush = null; @@ -74,9 +94,56 @@ public class SequenceFileLogWriter implements HLog.Writer { this.keyClass = keyClass; } + /** + * Create sequence file Metadata for our WAL file with version and compression + * type (if any). + * @param conf + * @param compress + * @return Metadata instance. + */ + private static Metadata createMetadata(final Configuration conf, + final boolean compress) { + TreeMap metaMap = new TreeMap(); + metaMap.put(WAL_VERSION_KEY, WAL_VERSION); + if (compress) { + // Currently we only do one compression type. + metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE); + } + return new Metadata(metaMap); + } + + /** + * Call this method after init() has been executed + * + * @return whether WAL compression is enabled + */ + static boolean isWALCompressionEnabled(final Metadata metadata) { + // Check version is >= VERSION? + Text txt = metadata.get(WAL_VERSION_KEY); + if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) { + return false; + } + // Now check that compression type is present. Currently only one value. + txt = metadata.get(WAL_COMPRESSION_TYPE_KEY); + return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE); + } + @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { + // Should we do our custom WAL compression? + boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + if (compress) { + try { + if (this.compressionContext == null) { + this.compressionContext = new CompressionContext(LRUDictionary.class); + } else { + this.compressionContext.clear(); + } + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext", e); + } + } if (null == keyClass) { keyClass = HLog.getKeyClass(conf); @@ -101,7 +168,7 @@ public class SequenceFileLogWriter implements HLog.Writer { fs.getDefaultBlockSize())), Boolean.valueOf(false) /*createParent*/, SequenceFile.CompressionType.NONE, new DefaultCodec(), - new Metadata() + createMetadata(conf, compress) }); } catch (InvocationTargetException ite) { // function was properly called, but threw it's own exception @@ -123,7 +190,7 @@ public class SequenceFileLogWriter implements HLog.Writer { SequenceFile.CompressionType.NONE, new DefaultCodec(), null, - new Metadata()); + createMetadata(conf, compress)); } else { LOG.debug("using new createWriter -- HADOOP-6840"); } @@ -133,7 +200,8 @@ public class SequenceFileLogWriter implements HLog.Writer { this.hflush = getHFlush(); String msg = "Path=" + path + ", syncFs=" + (this.syncFs != null) + - ", hflush=" + (this.hflush != null); + ", hflush=" + (this.hflush != null) + + ", compression=" + compress; if (this.syncFs != null || this.hflush != null) { LOG.debug(msg); } else { @@ -207,6 +275,7 @@ public class SequenceFileLogWriter implements HLog.Writer { @Override public void append(HLog.Entry entry) throws IOException { + entry.setCompressionContext(compressionContext); this.writer.append(entry.getKey(), entry.getEdit()); } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index d8f317c..70473ae 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -76,9 +76,15 @@ public class WALEdit implements Writable, HeapSize { private final ArrayList kvs = new ArrayList(); private NavigableMap scopes; + private CompressionContext compressionContext; + public WALEdit() { } + public void setCompressionContext(final CompressionContext compressionContext) { + this.compressionContext = compressionContext; + } + public void add(KeyValue kv) { this.kvs.add(kv); } @@ -116,9 +122,13 @@ public class WALEdit implements Writable, HeapSize { // this is new style HLog entry containing multiple KeyValues. int numEdits = in.readInt(); for (int idx = 0; idx < numEdits; idx++) { - KeyValue kv = new KeyValue(); - kv.readFields(in); - this.add(kv); + if (compressionContext != null) { + this.add(KeyValueCompression.readFields(in, compressionContext)); + } else { + KeyValue kv = new KeyValue(); + kv.readFields(in); + this.add(kv); + } } int numFamilies = in.readInt(); if (numFamilies > 0) { @@ -133,7 +143,7 @@ public class WALEdit implements Writable, HeapSize { } } else { // this is an old style HLog entry. The int that we just - // read is actually the length of a single KeyValue. + // read is actually the length of a single KeyValue KeyValue kv = new KeyValue(); kv.readFields(versionOrLength, in); this.add(kv); @@ -146,7 +156,11 @@ public class WALEdit implements Writable, HeapSize { out.writeInt(kvs.size()); // We interleave the two lists for code simplicity for (KeyValue kv : kvs) { - kv.write(out); + if (compressionContext != null) { + KeyValueCompression.write(out, kv, compressionContext); + } else{ + kv.write(out); + } } if (scopes == null) { out.writeInt(0); @@ -187,4 +201,4 @@ public class WALEdit implements Writable, HeapSize { return sb.toString(); } -} +} \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/util/Bytes.java src/main/java/org/apache/hadoop/hbase/util/Bytes.java index de8e40b..e904770 100644 --- src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -1461,6 +1461,18 @@ public class Bytes { } /** + * @param bytes array to hash + * @param offset offset to start from + * @param length length to hash + * */ + public static int hashCode(byte[] bytes, int offset, int length) { + int hash = 1; + for (int i = offset; i < offset + length; i++) + hash = (31 * hash) + (int) bytes[i]; + return hash; + } + + /** * @param t operands * @return Array of byte arrays made from passed array of Text */ diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java new file mode 100644 index 0000000..eea5870 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ +package org.apache.hadoop.hbase.regionserver.wal; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test our compressor class. + */ +public class TestCompressor { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @Test + public void testToShort() { + short s = 1; + assertEquals(s, Compressor.toShort((byte)0, (byte)1)); + s <<= 8; + assertEquals(s, Compressor.toShort((byte)1, (byte)0)); + } + + @Test (expected = IllegalArgumentException.class) + public void testNegativeToShort() { + Compressor.toShort((byte)0xff, (byte)0xff); + } + + @Test + public void testCompressingWithNullDictionaries() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + byte [] blahBytes = Bytes.toBytes("blah"); + Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, null); + dos.close(); + byte [] dosbytes = baos.toByteArray(); + DataInputStream dis = + new DataInputStream(new ByteArrayInputStream(dosbytes)); + byte [] product = Compressor.readCompressed(dis, null); + assertTrue(Bytes.equals(blahBytes, product)); + } + + @Test + public void testCompressingWithClearDictionaries() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + Dictionary dictionary = new LRUDictionary(); + byte [] blahBytes = Bytes.toBytes("blah"); + Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, dictionary); + dos.close(); + byte [] dosbytes = baos.toByteArray(); + DataInputStream dis = + new DataInputStream(new ByteArrayInputStream(dosbytes)); + dictionary = new LRUDictionary(); + byte [] product = Compressor.readCompressed(dis, dictionary); + assertTrue(Bytes.equals(blahBytes, product)); + } +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java new file mode 100644 index 0000000..6f295f2 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests LRUDictionary + */ +@Category(SmallTests.class) +public class TestLRUDictionary { + LRUDictionary testee; + + @Before + public void setUp() throws Exception { + testee = new LRUDictionary(); + } + + @Test + public void TestContainsNothing() { + assertTrue(isDictionaryEmpty(testee)); + } + + /** + * Assert can't add empty array. + */ + @Test + public void testPassingEmptyArrayToFindEntry() { + assertEquals(Dictionary.NOT_IN_DICTIONARY, + testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0)); + assertEquals(Dictionary.NOT_IN_DICTIONARY, + testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0)); + } + + @Test (expected = IllegalArgumentException.class) + public void testPassingEmptyArrayToAddEntry() { + testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0); + } + + @Test + public void testPassingSameArrayToAddEntry() { + // Add random predefined byte array, in this case a random byte array from + // HConstants. Assert that when we add, we get new index. Thats how it + // works. + int len = HConstants.CATALOG_FAMILY.length; + int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len); + assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len)); + assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len)); + } + + @Test + public void testBasic() { + Random rand = new Random(); + byte[] testBytes = new byte[10]; + rand.nextBytes(testBytes); + + // Verify that our randomly generated array doesn't exist in the dictionary + assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1); + + // now since we looked up an entry, we should have added it to the + // dictionary, so it isn't empty + + assertFalse(isDictionaryEmpty(testee)); + + // Check if we can find it using findEntry + short t = testee.findEntry(testBytes, 0, testBytes.length); + + // Making sure we do find what we're looking for + assertTrue(t != -1); + + byte[] testBytesCopy = new byte[20]; + + Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length); + + // copy byte arrays, make sure that we check that equal byte arrays are + // equal without just checking the reference + assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t); + + // make sure the entry retrieved is the same as the one put in + assertTrue(Arrays.equals(testBytes, testee.getEntry(t))); + + testee.clear(); + + // making sure clear clears the dictionary + assertTrue(isDictionaryEmpty(testee)); + } + + @Test + public void TestLRUPolicy(){ + //start by filling the dictionary up with byte arrays + for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) { + testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0, + (BigInteger.valueOf(i)).toByteArray().length); + } + + // check we have the first element added + assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, + BigInteger.ZERO.toByteArray().length) != -1); + + // check for an element we know isn't there + assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0, + BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1); + + // since we just checked for this element, it should be there now. + assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0, + BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1); + + // test eviction, that the least recently added or looked at element is + // evicted. We looked at ZERO so it should be in the dictionary still. + assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, + BigInteger.ZERO.toByteArray().length) != -1); + // Now go from beyond 1 to the end. + for(int i = 1; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) { + assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, + BigInteger.valueOf(i).toByteArray().length) == -1); + } + + // check we can find all of these. + for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) { + assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, + BigInteger.valueOf(i).toByteArray().length) != -1); + } + } + + static private boolean isDictionaryEmpty(LRUDictionary dict) { + for (short i = 0; i < LRUDictionary.BidirectionalLRUMap.MAXSIZE; i++) { + if (dict.getEntry(i) != null) { + return false; + } + } + return true; + } +} \ No newline at end of file diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index a11899c..e594697 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -61,7 +61,7 @@ import org.mockito.Mockito; @Category(MediumTests.class) public class TestWALReplay { public static final Log LOG = LogFactory.getLog(TestWALReplay.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); private Path hbaseRootDir = null; private Path oldLogDir; diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java new file mode 100644 index 0000000..7e57359 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Enables compression and runs the TestWALReplay tests. + */ +@Category(MediumTests.class) +public class TestWALReplayCompressed extends TestWALReplay { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestWALReplay.setUpBeforeClass(); + Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + } + +}