diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 10aec00..a9143b7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -24,7 +24,7 @@ import java.lang.reflect.InvocationTargetException; /** * Context that holds the various dictionaries for compression in HLog. */ -class CompressionContext { +public class CompressionContext { final Dictionary regionDict; final Dictionary tableDict; final Dictionary familyDict; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 2aa3419..02709d9 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -162,11 +162,13 @@ public class HLog implements Syncable { public interface Reader { void init(FileSystem fs, Path path, Configuration c) throws IOException; + void init(FileSystem fs, Path path, Configuration c, CompressionContext cc) throws IOException; void close() throws IOException; Entry next() throws IOException; Entry next(Entry reuse) throws IOException; void seek(long pos) throws IOException; long getPosition() throws IOException; + CompressionContext getCompressionContext(); } public interface Writer { @@ -703,6 +705,21 @@ public class HLog implements Syncable { public static Reader getReader(final FileSystem fs, final Path path, Configuration conf) throws IOException { + return getReader(fs, path, conf, null); + } + + /** + * Get a reader for the WAL. + * @param fs + * @param path + * @param conf + * @param cc if not null, its dictionary will be used for reading the compressed entries + * @return A WAL reader. Close when done with it. + * @throws IOException + */ + public static Reader getReader(final FileSystem fs, final Path path, + Configuration conf, CompressionContext cc) + throws IOException { try { if (logReaderClass == null) { @@ -713,7 +730,7 @@ public class HLog implements Syncable { HLog.Reader reader = logReaderClass.newInstance(); - reader.init(fs, path, conf); + reader.init(fs, path, conf, cc); return reader; } catch (IOException e) { throw e; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index f5fb00f..822634b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -170,6 +170,12 @@ public class SequenceFileLogReader implements HLog.Reader { @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { + this.init(fs, path, conf, null); + } + + @Override + public void init(FileSystem fs, Path path, Configuration conf, CompressionContext cc) + throws IOException { this.conf = conf; this.path = path; reader = new WALReader(fs, path, conf); @@ -178,7 +184,10 @@ public class SequenceFileLogReader implements HLog.Reader { boolean compression = reader.isWALCompressionEnabled(); if (compression) { try { - if (compressionContext == null) { + // Use the context that's passed (useful for replication) or build/clear one + if (cc != null) { + compressionContext = cc; + } else if (compressionContext == null) { compressionContext = new CompressionContext(LRUDictionary.class); } else { compressionContext.clear(); @@ -254,6 +263,11 @@ public class SequenceFileLogReader implements HLog.Reader { return reader.getPosition(); } + @Override + public CompressionContext getCompressionContext() { + return compressionContext; + } + protected IOException addFileInfoToException(final IOException ioe) throws IOException { long pos = -1; diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 7710b47..cacce79 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -136,6 +137,14 @@ public class ReplicationSource extends Thread private volatile boolean running = true; // Metrics for this source private ReplicationSourceMetrics metrics; + // If hlog compression is enabled, we need to keep track of this while reopening logs + private CompressionContext compressionContext; + // Keeping track if we're using log compression + private boolean logCompressed; + // If this is true it means we're rebuilding the compression dict + // and that we have to skip edits in the current log up to positionToSkipTo + private boolean skipEditsForCompression = false; + private long positionToSkipTo; /** * Instantiation method used by region servers @@ -183,6 +192,7 @@ public class ReplicationSource extends Thread this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; this.metrics = new ReplicationSourceMetrics(peerClusterZnode); + this.logCompressed = this.conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); try { this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher()); @@ -263,8 +273,17 @@ public class ReplicationSource extends Thread // normally has a position (unless the RS failed between 2 logs) if (this.queueRecovered) { try { - this.position = this.zkHelper.getHLogRepPosition( + long lastPos = this.zkHelper.getHLogRepPosition( this.peerClusterZnode, this.queue.peek().getName()); + // If we're compressing logs and the oldest recovered log's last position is greater + // than 0, we need to rebuild the dictionary up to that point without replicating + // the edits again. The rebuilding part is simply done by reading the log. + if (logCompressed && lastPos != 0) { + this.positionToSkipTo = lastPos; + this.skipEditsForCompression = true; + } else { + this.position = lastPos; + } } catch (KeeperException e) { this.terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); @@ -351,7 +370,10 @@ public class ReplicationSource extends Thread // wait a bit and retry. // But if we need to stop, don't bother sleeping if (this.isActive() && (gotIOE || currentNbEntries == 0)) { - if (this.lastLoggedPosition != this.position) { + // Don't write the position to ZK if we are skipping edits since it's going to + // overwrite the position to which we're trying to get back to. This is so that if we + // also die, the next source will know about the right position to skip to + if (this.lastLoggedPosition != this.position && !this.skipEditsForCompression) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.position, queueRecovered); this.lastLoggedPosition = this.position; @@ -390,6 +412,8 @@ public class ReplicationSource extends Thread long startPosition = this.position; HLog.Entry entry = readNextAndSetPosition(); while (entry != null) { + // Setting it to null prevents from sending compressed edits that the sink wouldn't parse + entry.setCompressionContext(null); WALEdit edit = entry.getEdit(); this.metrics.logEditsReadRate.inc(1); seenEntries++; @@ -416,6 +440,11 @@ public class ReplicationSource extends Thread this.metrics.logEditsFilteredRate.inc(1); } } + // We know we're done when we reach the position where the previous source + // died so now we can replicate using the rebuilt dict + if (this.skipEditsForCompression && (this.position == this.positionToSkipTo)) { + this.skipEditsForCompression = false; + } // Stop if too many entries or too big if ((this.reader.getPosition() - startPosition) >= this.replicationQueueSizeCapacity || @@ -487,8 +516,14 @@ public class ReplicationSource extends Thread LOG.debug("Opening log for replication " + this.currentPath.getName() + " at " + this.position); try { - this.reader = null; - this.reader = HLog.getReader(this.fs, this.currentPath, this.conf); + this.reader = null; + this.reader = HLog.getReader(this.fs, this.currentPath, + this.conf, this.compressionContext); + // We're opening a new file, it's going to build it's own dict that we need + // to retrieve in order to pass it back later + if (this.position == 0) { + this.compressionContext = this.reader.getCompressionContext(); + } } catch (FileNotFoundException fnfe) { if (this.queueRecovered) { // We didn't find the log in the archive directory, look if it still @@ -578,7 +613,7 @@ public class ReplicationSource extends Thread KeyValue kv = kvs.get(i); // The scope will be null or empty if // there's nothing to replicate in that WALEdit - if (scopes == null || !scopes.containsKey(kv.getFamily())) { + if (this.skipEditsForCompression || scopes == null || !scopes.containsKey(kv.getFamily())) { kvs.remove(i); } } @@ -696,6 +731,7 @@ public class ReplicationSource extends Thread if (this.queue.size() != 0) { this.currentPath = null; this.position = 0; + this.compressionContext = null; return true; } else if (this.queueRecovered) { this.manager.closeRecoveredQueue(this); diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java index 96eb211..14ad44f 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java @@ -61,7 +61,7 @@ public class TestReplication { private static final Log LOG = LogFactory.getLog(TestReplication.class); - private static Configuration conf1; + protected static Configuration conf1 = HBaseConfiguration.create(); private static Configuration conf2; private static Configuration CONF_WITH_LOCALFS; @@ -91,7 +91,6 @@ public class TestReplication { */ @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1 = HBaseConfiguration.create(); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // smaller block size and capacity to trigger more operations // and test them