Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (revision 1513718) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (working copy) @@ -33,7 +33,7 @@ final Dictionary qualifierDict; final Dictionary rowDict; - public CompressionContext(Class dictType) + public CompressionContext(Class dictType, boolean recoveredEdits) throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { Constructor dictConstructor = @@ -43,6 +43,17 @@ familyDict = dictConstructor.newInstance(); qualifierDict = dictConstructor.newInstance(); rowDict = dictConstructor.newInstance(); + if (recoveredEdits) { + // This will never change + regionDict.init(1); + tableDict.init(1); + } else { + regionDict.init(Short.MAX_VALUE); + tableDict.init(Short.MAX_VALUE); + } + rowDict.init(Short.MAX_VALUE); + familyDict.init(Byte.MAX_VALUE); + qualifierDict.init(Byte.MAX_VALUE); } void clear() { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java (revision 1513718) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Dictionary.java (working copy) @@ -30,6 +30,7 @@ interface Dictionary { byte NOT_IN_DICTIONARY = -1; + void init(int initialSize); /** * Gets an entry from the dictionary. * Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java (revision 1513718) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LRUDictionary.java (working copy) @@ -35,14 +35,18 @@ */ @InterfaceAudience.Private public class LRUDictionary implements Dictionary { - private final BidirectionalLRUMap backingStore = new BidirectionalLRUMap(); + BidirectionalLRUMap backingStore; @Override public byte[] getEntry(short idx) { return backingStore.get(idx); } @Override + public void init(int initialSize) { + backingStore = new BidirectionalLRUMap(initialSize); + } + @Override public short findEntry(byte[] data, int offset, int length) { short ret = backingStore.findIdx(data, offset, length); if (ret == NOT_IN_DICTIONARY) { @@ -69,7 +73,6 @@ * This is not thread safe. Don't use in multi-threaded applications. */ static class BidirectionalLRUMap { - static final int MAX_SIZE = Short.MAX_VALUE; private int currSize = 0; // Head and tail of the LRU list. @@ -77,10 +80,13 @@ private Node tail; private HashMap nodeToIndex = new HashMap(); - private Node[] indexToNode = new Node[MAX_SIZE]; + private Node[] indexToNode; + private int initSize = 0; - public BidirectionalLRUMap() { - for (int i = 0; i < MAX_SIZE; i++) { + public BidirectionalLRUMap(int initialSize) { + initSize = initialSize; + indexToNode = new Node[initialSize]; + for (int i = 0; i < initialSize; i++) { indexToNode[i] = new Node(); } } @@ -91,7 +97,7 @@ byte[] stored = new byte[length]; Bytes.putBytes(stored, 0, array, offset, length); - if (currSize < MAX_SIZE) { + if (currSize < initSize) { // There is space to add without evicting. indexToNode[currSize].setContents(stored, 0, stored.length); setHead(indexToNode[currSize]); @@ -174,7 +180,7 @@ n.container = null; } - for (int i = 0; i < MAX_SIZE; i++) { + for (int i = 0; i < initSize; i++) { indexToNode[i].next = null; indexToNode[i].prev = null; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (revision 1513718) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (working copy) @@ -39,7 +39,7 @@ * Writer for protobuf-based WAL. */ @InterfaceAudience.Private -public class ProtobufLogWriter implements HLog.Writer { +public class ProtobufLogWriter extends WriterBase { private final Log LOG = LogFactory.getLog(this.getClass()); private FSDataOutputStream output; private Codec.Encoder cellEncoder; @@ -50,10 +50,6 @@ // than this size, it is written/read respectively, with a WARN message in the log. private int trailerWarnSize; - /** Context used by our wal dictionary compressor. - * Null if we're not to do our custom dictionary compression. */ - private CompressionContext compressionContext; - public ProtobufLogWriter() { super(); } @@ -61,14 +57,7 @@ @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { assert this.output == null; - boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); - if (doCompress) { - try { - this.compressionContext = new CompressionContext(LRUDictionary.class); - } catch (Exception e) { - throw new IOException("Failed to initiate CompressionContext", e); - } - } + boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE, HLog.DEFAULT_WAL_TRAILER_WARN_SIZE); int bufferSize = FSUtils.getDefaultBufferSize(fs); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java (revision 1513718) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.FSUtils; @InterfaceAudience.Private public abstract class ReaderBase implements HLog.Reader { @@ -67,7 +68,8 @@ // If compression is enabled, new dictionaries are created here. try { if (compressionContext == null) { - compressionContext = new CompressionContext(LRUDictionary.class); + compressionContext = new CompressionContext(LRUDictionary.class, + FSUtils.isRecoveredEdits(path)); } else { compressionContext.clear(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java (working copy) @@ -0,0 +1,50 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * Context used by our wal dictionary compressor. Null if we're not to do our + * custom dictionary compression. + */ +@InterfaceAudience.Private +public abstract class WriterBase implements HLog.Writer { + + protected CompressionContext compressionContext; + + public boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { + boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + if (doCompress) { + try { + this.compressionContext = new CompressionContext(LRUDictionary.class, + FSUtils.isRecoveredEdits(path)); + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext", e); + } + } + return doCompress; + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1513718) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -1322,6 +1322,15 @@ } /** + * Checks if the given path is the one with 'recovered.edits' dir. + * @param path + * @return + */ + public static boolean isRecoveredEdits(Path path) { + return path.toString().contains(HConstants.RECOVERED_EDITS_DIR); + } + + /** * Filter for all dirs that don't start with '.' */ public static class RegionDirFilter implements PathFilter { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (revision 1513718) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (working copy) @@ -46,7 +46,7 @@ * SequenceFile.Writer. Legacy implementation only used for compat tests. */ @InterfaceAudience.Private -public class SequenceFileLogWriter implements HLog.Writer { +public class SequenceFileLogWriter extends WriterBase { private final Log LOG = LogFactory.getLog(this.getClass()); // The sequence file we delegate to. private SequenceFile.Writer writer; @@ -58,13 +58,6 @@ private static final Text WAL_VERSION_KEY = new Text("version"); private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type"); private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary"); - - /** - * Context used by our wal dictionary compressor. Null if we're not to do - * our custom dictionary compression. This custom WAL compression is distinct - * from sequencefile native compression. - */ - private CompressionContext compressionContext; /** * Default constructor. @@ -72,7 +65,6 @@ public SequenceFileLogWriter() { super(); } - /** * Create sequence file Metadata for our WAL file with version and compression * type (if any). @@ -94,19 +86,7 @@ @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { - // Should we do our custom WAL compression? - boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); - if (compress) { - try { - if (this.compressionContext == null) { - this.compressionContext = new CompressionContext(LRUDictionary.class); - } else { - this.compressionContext.clear(); - } - } catch (Exception e) { - throw new IOException("Failed to initiate CompressionContext", e); - } - } + boolean compress = initializeCompressionContext(conf, path); // Create a SF.Writer instance. try { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java (revision 1513718) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCompressor.java (working copy) @@ -74,6 +74,7 @@ ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); Dictionary dictionary = new LRUDictionary(); + dictionary.init(Short.MAX_VALUE); byte [] blahBytes = Bytes.toBytes("blah"); Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, dictionary); dos.close(); @@ -81,6 +82,7 @@ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dosbytes)); dictionary = new LRUDictionary(); + dictionary.init(Short.MAX_VALUE); byte [] product = Compressor.readCompressed(dis, dictionary); assertTrue(Bytes.equals(blahBytes, product)); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java (revision 1513718) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java (working copy) @@ -64,7 +64,7 @@ } private void runTestCycle(List kvs) throws Exception { - CompressionContext ctx = new CompressionContext(LRUDictionary.class); + CompressionContext ctx = new CompressionContext(LRUDictionary.class, false); DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE); for (KeyValue kv : kvs) { KeyValueCompression.writeKV(buf, kv, ctx); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java (revision 1513718) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLRUDictionary.java (working copy) @@ -41,6 +41,7 @@ @Before public void setUp() throws Exception { testee = new LRUDictionary(); + testee.init(Short.MAX_VALUE); } @Test @@ -110,7 +111,7 @@ @Test public void TestLRUPolicy(){ //start by filling the dictionary up with byte arrays - for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { + for (int i = 0; i < Short.MAX_VALUE; i++) { testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0, (BigInteger.valueOf(i)).toByteArray().length); } @@ -132,13 +133,13 @@ assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0, BigInteger.ZERO.toByteArray().length) != -1); // Now go from beyond 1 to the end. - for(int i = 1; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { + for(int i = 1; i < Short.MAX_VALUE; i++) { assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, BigInteger.valueOf(i).toByteArray().length) == -1); } // check we can find all of these. - for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) { + for (int i = 0; i < Short.MAX_VALUE; i++) { assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0, BigInteger.valueOf(i).toByteArray().length) != -1); }