diff --git src/main/java/org/apache/hadoop/hbase/HConstants.java src/main/java/org/apache/hadoop/hbase/HConstants.java index 5120a3c..1af341a 100644 --- src/main/java/org/apache/hadoop/hbase/HConstants.java +++ src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -575,6 +575,9 @@ public final class HConstants { /** Host name of the local machine */ public static final String LOCALHOST = "localhost"; + /** Configuration name of HLog Compression */ + public static final String ENABLE_WAL_COMPRESSION = "hbase.regionserver.wal.compression"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressedKeyValue.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressedKeyValue.java new file mode 100644 index 0000000..11e02eb --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressedKeyValue.java @@ -0,0 +1,122 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +/** + * Compression class for Key/Value pairs before writing them to WAL. This is not + * synchronized, so synchronization should be handled outside. + * + * Class only compresses and uncompresses row keys, family names, and the + * qualifier. More/less may be added depending on use patterns. + */ + +class CompressedKeyValue { + /** + * Uncompresses a KeyValue from a DataInput and returns it. + * + * @param in + * the DataInput + * @param dict + * the dictionary to use + * @return a uncompressed KeyValue + * @throws IOException + */ + + /* + * We write using DataOutput.writeByte, which writes the 8 low order bits. + * Therefore range is 0-255. We always read using readUnsignedByte + */ + + public static KeyValue readFields(DataInput in, CompressionContext readContext) + throws IOException { + int keylength = WritableUtils.readVInt(in); + int vlength = WritableUtils.readVInt(in); + int length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; + + byte[] backingArray = new byte[length]; + int pos = 0; + pos = Bytes.putInt(backingArray, pos, keylength); + pos = Bytes.putInt(backingArray, pos, vlength); + + // the row + pos = Compressor.readCompressedIntoArray(backingArray, in, pos, + Bytes.SIZEOF_SHORT, readContext.rowDict); + + // family + pos = Compressor.readCompressedIntoArray(backingArray, in, pos, + Bytes.SIZEOF_BYTE, readContext.familyDict); + + // qualifier + pos = Compressor.readCompressedIntoArray(backingArray, in, pos, 0, + readContext.qualifierDict); + + // the rest + in.readFully(backingArray, pos, length - pos); + + return new KeyValue(backingArray); + + } + + /** + * Compressed and writes ourKV to out, a DataOutput. + * + * @param out + * the DataOutput + * @param ourKV + * the KV to compress and write + * @param dict + * the dictionary we use for compression + * @throws IOException + */ + public static void write(final DataOutput out, KeyValue ourKV, + CompressionContext writeContext) throws IOException { + byte[] backingArray = ourKV.getBuffer(); + int offset = ourKV.getOffset(); + + // we first write the KeyValue infrastructure as VInts. + WritableUtils.writeVInt(out, ourKV.getKeyLength()); + WritableUtils.writeVInt(out, ourKV.getValueLength()); + + // now we write the row key, since the row key is likely to be repeated. + Compressor.writeCompressed(ourKV.getRow(), out, writeContext.rowDict); + + // now family, if it exists. if it doesn't, we write a 0 length array. + Compressor.writeCompressed(ourKV.getFamily(), out, writeContext.familyDict); + + // qualifier next + Compressor.writeCompressed(ourKV.getQualifier(), out, + writeContext.qualifierDict); + + // now we write the rest uncompressed + int pos = ourKV.getTimestampOffset(); + int remainingLength = ourKV.getLength() + offset - (pos); + out.write(backingArray, pos, remainingLength); + } + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java new file mode 100644 index 0000000..fd3a62a --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -0,0 +1,35 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +class CompressionContext { + final WALDictionary regionDict; + final WALDictionary tableDict; + final WALDictionary familyDict; + final WALDictionary qualifierDict; + final WALDictionary rowDict; + + public CompressionContext(Class dictType) + throws SecurityException, NoSuchMethodException, + IllegalArgumentException, InstantiationException, IllegalAccessException, + InvocationTargetException { + Constructor dictConstructor = dictType + .getConstructor(); + + regionDict = dictConstructor.newInstance(); + tableDict = dictConstructor.newInstance(); + familyDict = dictConstructor.newInstance(); + qualifierDict = dictConstructor.newInstance(); + rowDict = dictConstructor.newInstance(); + } + + void clear() { + regionDict.clear(); + tableDict.clear(); + familyDict.clear(); + qualifierDict.clear(); + rowDict.clear(); + } + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java new file mode 100644 index 0000000..b1580b8 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -0,0 +1,196 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; + +public class Compressor { + + static final byte NOT_IN_DICTIONARY = -1; + /* + * This acts as a command line tool to compress and uncompress + */ + public static void main(String[] args) throws IOException { + if (args[0].equals("--help") || args.length != 3) { + printHelp(); + return; + } + + Path inputPath = new Path(args[1]); + Path outputPath = new Path(args[2]); + + if (args[0].equals("-u")) { + // as we're uncompressing, we read compressed, and write uncompressed. + writeFile(false, outputPath, readFile(true, inputPath)); + } else if (args[0].equals("-c")) { + writeFile(true, outputPath, readFile(false, inputPath)); + } else { + printHelp(); + } + } + + static void printHelp() { + System.out + .println("This a command line tool for compressing WALs using the WALCompressor"); + System.out.println("First option must be a mode specifier"); + System.out.println("-u Uncompress -c Compress"); + System.out + .println("Second argument is file to read, third argument is the output filename"); + System.out.println("Example: compressor -u input-hlog output-hlog"); + return; + } + + static Queue readFile(boolean compression, Path path) + throws IOException { + SequenceFileLogReader in = new SequenceFileLogReader(); + + try { + FileSystem fs = new RawLocalFileSystem(); + fs.setConf(new Configuration()); + Path inputPath = path; + + Queue output = new LinkedList(); + + Configuration conf = new Configuration(); + conf.setBoolean("WALCompression", compression); + + in.init(fs, inputPath, conf); + + Entry e; + + while ((e = in.next()) != null) { + output.add(e); + } + return output; + } finally { + in.close(); + } + } + + static void writeFile(boolean compression, Path path, Queue toBeWritten) + throws IOException { + SequenceFileLogWriter out = new SequenceFileLogWriter(); + try { + FileSystem fs = new RawLocalFileSystem(); + fs.setConf(new Configuration()); + Path inputPath = path; + + Configuration conf = new Configuration(); + conf.setBoolean("WALCompression", compression); + + out.init(fs, inputPath, conf); + + Entry e; + + while ((e = toBeWritten.poll()) != null) { + out.append(e); + } + } finally { + out.close(); + } + } + + /* + * Reads the next compressed entry and returns it as a byte array + */ + static byte[] readCompressed(DataInput in, WALDictionary dict) + throws IOException { + byte status = in.readByte(); + int length; + short dictIdx; + byte[] entry = null; + + if (status == NOT_IN_DICTIONARY) { + length = WritableUtils.readVInt(in); + } else { + dictIdx = toShort(status, in.readByte()); + entry = dict.getEntry(dictIdx); + length = entry.length; + } + + if (status == NOT_IN_DICTIONARY) { + // if this isn't in the dictionary, we need to add to the dictionary. + byte[] arr = new byte[length]; + in.readFully(arr, 0, length); + dict.addEntry(arr); + return arr; + } else { + return entry; + } + + } + + /* + * The sizebytes is the size of the length entry. Oftentimes we do not use an + * int, because we can represent the entry as a byte or a short + */ + static int readCompressedIntoArray(byte[] to, DataInput in, int offset, + int sizeBytes, WALDictionary dict) throws IOException { + byte status = in.readByte(); + int length; + short dictIdx; + int pos = offset; + byte[] entry = null; + + if (status == NOT_IN_DICTIONARY) { + length = WritableUtils.readVInt(in); + } else { + dictIdx = toShort(status, in.readByte()); + entry = dict.getEntry(dictIdx); + length = entry.length; + } + + // sometimes, we need to write the size of the byte array, different + // datatypes may be used + if (sizeBytes == Bytes.SIZEOF_BYTE) { + pos = Bytes.putByte(to, pos, (byte) (length & 0x0000ff)); + } else if (sizeBytes == Bytes.SIZEOF_SHORT) { + pos = Bytes.putShort(to, pos, (short) (length & 0x0000ffff)); + } else if (sizeBytes == Bytes.SIZEOF_INT) { + pos = Bytes.putInt(to, pos, length); + } + + if (status == NOT_IN_DICTIONARY) { + // if this isn't in the dictionary, we need to add to the dictionary. + byte[] arr = new byte[length]; + in.readFully(arr, 0, length); + pos = Bytes.putBytes(to, pos, arr, 0, length); + dict.addEntry(arr); + } else { + // now we write the compressed value. + pos = Bytes.putBytes(to, pos, entry, 0, length); + } + + return pos; + } + + static void writeCompressed(byte[] data, DataOutput out, WALDictionary dict) + throws IOException { + + short dictIdx = dict.findEntry(data); + if (dictIdx == -1) { + // not in dict + out.writeByte(NOT_IN_DICTIONARY); + WritableUtils.writeVInt(out, data.length); + out.write(data); + } else { + out.writeShort(dictIdx); + } + } + + private static short toShort(byte hi, byte lo) { + return (short) (((hi & 0xFF) << 8) | (lo & 0xFF)); + + } + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 24407af..1c2315a 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -1648,6 +1648,14 @@ public class HLog implements Syncable { public HLogKey getKey() { return key; } + + /** + * Enables the compression for this entry + */ + public void enableCompression(CompressionContext writeContext) { + edit.enableCompression(writeContext); + key.enableCompression(writeContext); + } @Override public String toString() { @@ -1665,6 +1673,7 @@ public class HLog implements Syncable { this.key.readFields(dataInput); this.edit.readFields(dataInput); } + } /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index f067221..d1968be 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -54,13 +54,15 @@ public class HLogKey implements WritableComparable { private long writeTime; private UUID clusterId; + + private CompressionContext keyContext; - /** Writable Consructor -- Do not use. */ + /** Writable Constructor -- Do not use. */ public HLogKey() { this(null, null, 0L, HConstants.LATEST_TIMESTAMP, HConstants.DEFAULT_CLUSTER_ID); } - + /** * Create the log key! * We maintain the tablename mainly for debugging purposes. @@ -82,6 +84,18 @@ public class HLogKey implements WritableComparable { this.clusterId = clusterId; } + /** + * Enables compression. + * + * @param tableDict + * dictionary used for compressing table + * @param regionDict + * dictionary used for compressing region + */ + public void enableCompression(CompressionContext keyContext) { + this.keyContext = keyContext; + } + /** @return encoded region name */ public byte [] getEncodedRegionName() { return encodedRegionName; @@ -214,8 +228,14 @@ public class HLogKey implements WritableComparable { @Override public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, VERSION); - Bytes.writeByteArray(out, this.encodedRegionName); - Bytes.writeByteArray(out, this.tablename); + if (keyContext == null) { + Bytes.writeByteArray(out, this.encodedRegionName); + Bytes.writeByteArray(out, this.tablename); + } else { + Compressor.writeCompressed(this.encodedRegionName, out, + keyContext.regionDict); + Compressor.writeCompressed(this.tablename, out, keyContext.tableDict); + } out.writeLong(this.logSeqNum); out.writeLong(this.writeTime); // avoid storing 16 bytes when replication is not enabled @@ -243,11 +263,22 @@ public class HLogKey implements WritableComparable { if (len < 0) { // what we just read was the version version = len; - len = WritableUtils.readVInt(in); + // We only compress V2 of HLogkey. + // If compression is on, the length is handled by the dictionary + if (keyContext == null) { + len = WritableUtils.readVInt(in); + } + } + if (keyContext == null) { + this.encodedRegionName = new byte[len]; + in.readFully(this.encodedRegionName); + this.tablename = Bytes.readByteArray(in); + } else { + this.encodedRegionName = Compressor.readCompressed(in, + keyContext.regionDict); + this.tablename = Compressor.readCompressed(in, keyContext.tableDict); } - this.encodedRegionName = new byte[len]; - in.readFully(this.encodedRegionName); - this.tablename = Bytes.readByteArray(in); + this.logSeqNum = in.readLong(); this.writeTime = in.readLong(); this.clusterId = HConstants.DEFAULT_CLUSTER_ID; diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index d9cd6de..d2b484c 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -23,10 +23,9 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.FilterInputStream; import java.io.IOException; import java.lang.Class; -import java.lang.reflect.Constructor; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.io.SequenceFile; public class SequenceFileLogReader implements HLog.Reader { @@ -55,6 +55,7 @@ public class SequenceFileLogReader implements HLog.Reader { WALReader(final FileSystem fs, final Path p, final Configuration c) throws IOException { + super(fs, p, c); } @@ -134,15 +135,20 @@ public class SequenceFileLogReader implements HLog.Reader { Configuration conf; WALReader reader; + // Needed logging exceptions Path path; int edit = 0; long entryStart = 0; + + private static CompressionContext readContext = null; protected Class keyClass; /** * Default constructor. + * + * If compression is enabled, will create new dictionaries. */ public SequenceFileLogReader() { } @@ -164,6 +170,16 @@ public class SequenceFileLogReader implements HLog.Reader { this.conf = conf; this.path = path; reader = new WALReader(fs, path, conf); + + if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false) + && readContext == null) { + try { + readContext = new CompressionContext(SimpleDictionary.class); + } catch (Exception e) { + throw new IOException("Failed to initialize CompressionContext"); + } + } + } @Override @@ -203,6 +219,9 @@ public class SequenceFileLogReader implements HLog.Reader { } boolean b = false; try { + if (readContext != null) { + e.enableCompression(readContext); + } b = this.reader.next(e.getKey(), e.getEdit()); } catch (IOException ioe) { throw addFileInfoToException(ioe); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index cbef70f..6ecc1b0 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Metadata; @@ -52,6 +53,10 @@ public class SequenceFileLogWriter implements HLog.Writer { private Class keyClass; + boolean compression = false; + + private CompressionContext writeContext; + private Method syncFs = null; private Method hflush = null; @@ -75,6 +80,19 @@ public class SequenceFileLogWriter implements HLog.Writer { @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { + + compression = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + if(compression){ + try { + if (writeContext == null) { + writeContext = new CompressionContext(SimpleDictionary.class); + } else { + writeContext.clear(); + } + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext"); + } + } if (null == keyClass) { keyClass = HLog.getKeyClass(conf); @@ -205,6 +223,9 @@ public class SequenceFileLogWriter implements HLog.Writer { @Override public void append(HLog.Entry entry) throws IOException { + if(compression){ + entry.enableCompression(writeContext); + } this.writer.append(entry.getKey(), entry.getEdit()); } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/SimpleDictionary.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/SimpleDictionary.java new file mode 100644 index 0000000..b45ec83 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SimpleDictionary.java @@ -0,0 +1,80 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; + +/** A simple one way associative dictionary */ + +class SimpleDictionary implements WALDictionary { + static final Log LOG = LogFactory.getLog(SimpleDictionary.class); + + final byte[][] lookup; + final int SIZE = Short.MAX_VALUE; + + public SimpleDictionary() { + lookup = new byte[SIZE][]; + } + + @Override + public byte[] getEntry(short idx) { + return lookup[idx]; + } + + @Override + public short findEntry(byte[] data) { + short hc = (short) Math.abs((Arrays.hashCode(data) % SIZE)); + if (lookup[hc] == null) { + lookup[hc] = data; + return -1; + } else if (Arrays.equals(lookup[hc], (data))) { + return hc; + } else { + lookup[hc] = data; + return -1; + } + + } + + @Override + public short addEntry(byte[] data) { + short hc = (short) Math.abs((Arrays.hashCode(data) % SIZE)); + lookup[hc] = data; + return hc; + } + + @Override + public void clear() { + for (int i = 0; i < SIZE; i++) { + lookup[i] = null; + } + } + + @Override + public int getEntryLength() { + return Bytes.SIZEOF_SHORT; + } + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDictionary.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDictionary.java new file mode 100644 index 0000000..7bc06f0 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDictionary.java @@ -0,0 +1,69 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +/** + * Dictionary interface + * + * Dictionary indexes should be either bytes or shorts, only positive. (The + * first bit is reserved for detecting whether something is compressed or not). + */ + +interface WALDictionary { + /** + * Gets an entry from the dictionary. + * + * @param idx + * index of the entry + * @return the entry, or null if nonexistent + */ + public byte[] getEntry(short idx); + + /** + * Finds the index of an entry + * + * @param data + * the byte array that we're looking up + * @return the index of the entry, or -1 if not found + */ + public short findEntry(byte[] data); + + /** + * Adds an entry to the dictionary + * + * @param data + * the entry to ad + * @return the index of the entry + */ + + public short addEntry(byte[] data); + + /** + * Flushes the dictionary, empties all values. + */ + public void clear(); + + /** + * Get entry length. Should either be 1, 2 (byte or short). + */ + + public int getEntryLength(); +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index e1117ef..c0d2adf 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -70,12 +70,20 @@ import org.apache.hadoop.io.Writable; public class WALEdit implements Writable, HeapSize { private final int VERSION_2 = -1; + + private boolean compression = false; private final ArrayList kvs = new ArrayList(); private NavigableMap scopes; + private CompressionContext dictContext; + public WALEdit() { } + + public void enableCompression(final CompressionContext dictContext) { + this.dictContext = dictContext; + } public void add(KeyValue kv) { this.kvs.add(kv); @@ -114,9 +122,13 @@ public class WALEdit implements Writable, HeapSize { // this is new style HLog entry containing multiple KeyValues. int numEdits = in.readInt(); for (int idx = 0; idx < numEdits; idx++) { + if (dictContext != null) { + this.add(CompressedKeyValue.readFields(in, dictContext)); + } else { KeyValue kv = new KeyValue(); kv.readFields(in); this.add(kv); + } } int numFamilies = in.readInt(); if (numFamilies > 0) { @@ -131,7 +143,7 @@ public class WALEdit implements Writable, HeapSize { } } else { // this is an old style HLog entry. The int that we just - // read is actually the length of a single KeyValue. + // read is actually the length of a single KeyValue KeyValue kv = new KeyValue(); kv.readFields(versionOrLength, in); this.add(kv); @@ -144,7 +156,11 @@ public class WALEdit implements Writable, HeapSize { out.writeInt(kvs.size()); // We interleave the two lists for code simplicity for (KeyValue kv : kvs) { - kv.write(out); + if (dictContext != null) { + CompressedKeyValue.write(out, kv, dictContext); + } else{ + kv.write(out); + } } if (scopes == null) { out.writeInt(0); diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSimpleDictionary.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSimpleDictionary.java new file mode 100644 index 0000000..e5c9167 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSimpleDictionary.java @@ -0,0 +1,70 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.*; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.Random; + +import org.junit.Before; +import org.junit.Test; + +public class TestSimpleDictionary { + + SimpleDictionary testee; + + @Before + public void setUp() throws Exception { + testee = new SimpleDictionary(); + } + + @Test + public void TestContainsNothing() { + assertTrue(dictIsEmpty(testee)); + } + + @Test + public void testBasic() { + Random rand = new Random(); + byte[] testBytes = new byte[10]; + rand.nextBytes(testBytes); + + // Verify that our randomly generated array doesn't exist in the dictionary + assertEquals(testee.findEntry(testBytes), -1); + + // now since we looked up an entry, we should have added it to the + // dictionary, so it isn't empty + + assertFalse(dictIsEmpty(testee)); + + // Check if we can find it using findEntry + short t = testee.findEntry(testBytes); + + // Making sure we do find what we're looking for + assertTrue(t != -1); + + byte[] testBytesCopy = Arrays.copyOf(testBytes, 10); + + // copy byte arrays, make sure that we check that equal byte arrays are + // equal without just checking the reference + assertEquals(testee.findEntry(testBytesCopy), t); + + // make sure the entry retrieved is the same as the one put in + assertTrue(Arrays.equals(testBytes, testee.getEntry(t))); + + testee.clear(); + + // making sure clear clears the dictionary + assertTrue(dictIsEmpty(testee)); + } + + static private boolean dictIsEmpty(SimpleDictionary testee) { + for (short i = 0; i < Short.MAX_VALUE; i++) { + if (testee.getEntry(i) != null) { + return false; + } + } + return true; + } + +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 59910bf..6e50e47 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -61,7 +61,7 @@ import org.mockito.Mockito; @Category(MediumTests.class) public class TestWALReplay { public static final Log LOG = LogFactory.getLog(TestWALReplay.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); private Path hbaseRootDir = null; private Path oldLogDir; diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java new file mode 100644 index 0000000..afdfeb5 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java @@ -0,0 +1,38 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.junit.BeforeClass; + +/** + * Enables compression and runs the TestWALReplay tests. + */ +public class TestWALReplayCompressed extends TestWALReplay { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestWALReplay.setUpBeforeClass(); + Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + } + +}