diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 10aec00..a9143b7 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -24,7 +24,7 @@ import java.lang.reflect.InvocationTargetException; /** * Context that holds the various dictionaries for compression in HLog. */ -class CompressionContext { +public class CompressionContext { final Dictionary regionDict; final Dictionary tableDict; final Dictionary familyDict; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 2aa3419..02709d9 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -162,11 +162,13 @@ public class HLog implements Syncable { public interface Reader { void init(FileSystem fs, Path path, Configuration c) throws IOException; + void init(FileSystem fs, Path path, Configuration c, CompressionContext cc) throws IOException; void close() throws IOException; Entry next() throws IOException; Entry next(Entry reuse) throws IOException; void seek(long pos) throws IOException; long getPosition() throws IOException; + CompressionContext getCompressionContext(); } public interface Writer { @@ -703,6 +705,21 @@ public class HLog implements Syncable { public static Reader getReader(final FileSystem fs, final Path path, Configuration conf) throws IOException { + return getReader(fs, path, conf, null); + } + + /** + * Get a reader for the WAL. + * @param fs + * @param path + * @param conf + * @param cc if not null, its dictionary will be used for reading the compressed entries + * @return A WAL reader. Close when done with it. + * @throws IOException + */ + public static Reader getReader(final FileSystem fs, final Path path, + Configuration conf, CompressionContext cc) + throws IOException { try { if (logReaderClass == null) { @@ -713,7 +730,7 @@ public class HLog implements Syncable { HLog.Reader reader = logReaderClass.newInstance(); - reader.init(fs, path, conf); + reader.init(fs, path, conf, cc); return reader; } catch (IOException e) { throw e; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index f5fb00f..822634b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -170,6 +170,12 @@ public class SequenceFileLogReader implements HLog.Reader { @Override public void init(FileSystem fs, Path path, Configuration conf) throws IOException { + this.init(fs, path, conf, null); + } + + @Override + public void init(FileSystem fs, Path path, Configuration conf, CompressionContext cc) + throws IOException { this.conf = conf; this.path = path; reader = new WALReader(fs, path, conf); @@ -178,7 +184,10 @@ public class SequenceFileLogReader implements HLog.Reader { boolean compression = reader.isWALCompressionEnabled(); if (compression) { try { - if (compressionContext == null) { + // Use the context that's passed (useful for replication) or build/clear one + if (cc != null) { + compressionContext = cc; + } else if (compressionContext == null) { compressionContext = new CompressionContext(LRUDictionary.class); } else { compressionContext.clear(); @@ -254,6 +263,11 @@ public class SequenceFileLogReader implements HLog.Reader { return reader.getPosition(); } + @Override + public CompressionContext getCompressionContext() { + return compressionContext; + } + protected IOException addFileInfoToException(final IOException ioe) throws IOException { long pos = -1; diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReader.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReader.java new file mode 100644 index 0000000..af57b6b --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReader.java @@ -0,0 +1,148 @@ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.regionserver.wal.HLog; + +import java.io.IOException; + +/** + * Wrapper class around HLog to help manage the implementation details + * such as compression. + */ +public class ReplicationHLogReader { + + private static final Log LOG = LogFactory.getLog(ReplicationHLogReader.class); + private final FileSystem fs; + private final Configuration conf; + private long position = 0; + private HLog.Reader reader; + + // If hlog compression is enabled, we need to keep track of this while reopening logs + private CompressionContext compressionContext; + // Keeping track if we're using log compression + private final boolean logCompressed; + private long positionToSkipTo = 0; + + /** + * Creates the helper but doesn't open any file + * Use setInitialPosition after using the constructor if some content needs to be skipped + * @param fs + * @param conf + */ + public ReplicationHLogReader(FileSystem fs, Configuration conf) { + this.fs = fs; + this.conf = conf; + this.logCompressed = this.conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + } + + /** + * Opens the file at the current position + * if a positionToSkipTo was specified, this method will take care of seeking there + * @param path + * @return + * @throws IOException + */ + public HLog.Reader openReader(Path path) throws IOException { + LOG.debug("Opening log for replication " + path.getName() + + " at " + this.position); + this.reader = null; + this.reader = HLog.getReader(this.fs, path, + this.conf, this.compressionContext); + + // We're opening a new file, it's going to build it's own dict that we need + // to retrieve in order to pass it back later + if (this.position == 0) { + this.compressionContext = this.reader.getCompressionContext(); + } + if (this.positionToSkipTo != 0) { + if (this.logCompressed) { + skipEdits(); + } else { + this.position = this.positionToSkipTo; + } + positionToSkipTo = 0; + } + return this.reader; + } + + private void skipEdits() throws IOException { + while (this.reader.next() != null) { + this.position = reader.getPosition(); + if (this.position == this.positionToSkipTo) { + return; + } + } + } + + /** + * Get the next entry, returned and also added in the array + * @param entriesArray + * @param currentNbEntries + * @return a new entry or null + * @throws IOException + */ + public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray, int currentNbEntries) throws IOException { + HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]); + // Store the position so that in the future the reader can start + // reading from here. If the above call to next() throws an + // exception, the position won't be changed and retry will happen + // from the last known good position + this.position = this.reader.getPosition(); + if (entry != null) { + entry.setCompressionContext(null); + } + return entry; + } + + /** + * Advance the reader to the current position + * @throws IOException + */ + public void seek() throws IOException { + if (this.position != 0) { + this.reader.seek(this.position); + } + } + + /** + * Before opening a reader that needs to seek in the file on the first read, + * use this method to set the initial position to seek to + * @param position + */ + public void setInitialPosition(long position) { + this.positionToSkipTo = position; + } + + /** + * Get the position that we stopped reading at + * @return current position, cannot be negative + */ + public long getPosition() { + return this.position; + } + + /** + * Close the current reader + * @throws IOException + */ + public void closeReader() throws IOException { + if (this.reader != null) { + this.reader.close(); + } + } + + /** + * Tell the helper to reset internal state + */ + public void finishCurrentFile() { + this.position = 0; + this.compressionContext = null; + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 7710b47..4fbd061 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -105,8 +105,6 @@ public class ReplicationSource extends Thread private int replicationQueueNbCapacity; // Our reader for the current log private HLog.Reader reader; - // Current position in the log - private long position = 0; // Last position in the log that we sent to ZooKeeper private long lastLoggedPosition = -1; // Path of the current log @@ -132,10 +130,15 @@ public class ReplicationSource extends Thread private int currentNbEntries = 0; // Current number of operations (Put/Delete) that we need to replicate private int currentNbOperations = 0; + // Current size of data we need to replicate + private int currentSize = 0; // Indicates if this particular source is running private volatile boolean running = true; // Metrics for this source private ReplicationSourceMetrics metrics; + // Handle on the log reader helper + private ReplicationHLogReader repLogReader; + /** * Instantiation method used by region servers @@ -183,7 +186,7 @@ public class ReplicationSource extends Thread this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; this.metrics = new ReplicationSourceMetrics(peerClusterZnode); - + this.repLogReader = new ReplicationHLogReader(this.fs, this.conf); try { this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher()); } catch (KeeperException ke) { @@ -263,8 +266,8 @@ public class ReplicationSource extends Thread // normally has a position (unless the RS failed between 2 logs) if (this.queueRecovered) { try { - this.position = this.zkHelper.getHLogRepPosition( - this.peerClusterZnode, this.queue.peek().getName()); + this.repLogReader.setInitialPosition(this.zkHelper.getHLogRepPosition( + this.peerClusterZnode, this.queue.peek().getName())); } catch (KeeperException e) { this.terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); @@ -338,9 +341,7 @@ public class ReplicationSource extends Thread } } finally { try { - if (this.reader != null) { - this.reader.close(); - } + this.repLogReader.closeReader(); } catch (IOException e) { gotIOE = true; LOG.warn("Unable to finalize the tailing of a file", e); @@ -351,10 +352,10 @@ public class ReplicationSource extends Thread // wait a bit and retry. // But if we need to stop, don't bother sleeping if (this.isActive() && (gotIOE || currentNbEntries == 0)) { - if (this.lastLoggedPosition != this.position) { + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); - this.lastLoggedPosition = this.position; + this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered); + this.lastLoggedPosition = this.repLogReader.getPosition(); } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { sleepMultiplier++; @@ -384,11 +385,8 @@ public class ReplicationSource extends Thread */ protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{ long seenEntries = 0; - if (this.position != 0) { - this.reader.seek(this.position); - } - long startPosition = this.position; - HLog.Entry entry = readNextAndSetPosition(); + this.repLogReader.seek(); + HLog.Entry entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); while (entry != null) { WALEdit edit = entry.getEdit(); this.metrics.logEditsReadRate.inc(1); @@ -412,18 +410,18 @@ public class ReplicationSource extends Thread } currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; + currentSize += entry.getEdit().size(); } else { this.metrics.logEditsFilteredRate.inc(1); } } // Stop if too many entries or too big - if ((this.reader.getPosition() - startPosition) - >= this.replicationQueueSizeCapacity || + if (currentSize >= this.replicationQueueSizeCapacity || currentNbEntries >= this.replicationQueueNbCapacity) { break; } try { - entry = readNextAndSetPosition(); + entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); } catch (IOException ie) { LOG.debug("Break on IOE: " + ie.getMessage()); break; @@ -431,22 +429,12 @@ public class ReplicationSource extends Thread } LOG.debug("currentNbOperations:" + currentNbOperations + " and seenEntries:" + seenEntries + - " and size: " + (this.reader.getPosition() - startPosition)); + " and size: " + currentSize); // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure return seenEntries == 0 && processEndOfFile(); } - private HLog.Entry readNextAndSetPosition() throws IOException { - HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]); - // Store the position so that in the future the reader can start - // reading from here. If the above call to next() throws an - // exception, the position won't be changed and retry will happen - // from the last known good position - this.position = this.reader.getPosition(); - return entry; - } - private void connectToPeers() { // Connect to peer cluster first, unless we have to stop while (this.isActive() && this.currentPeers.size() == 0) { @@ -484,11 +472,9 @@ public class ReplicationSource extends Thread */ protected boolean openReader(int sleepMultiplier) { try { - LOG.debug("Opening log for replication " + this.currentPath.getName() + - " at " + this.position); + try { - this.reader = null; - this.reader = HLog.getReader(this.fs, this.currentPath, this.conf); + this.reader = repLogReader.openReader(this.currentPath); } catch (FileNotFoundException fnfe) { if (this.queueRecovered) { // We didn't find the log in the archive directory, look if it still @@ -622,10 +608,10 @@ public class ReplicationSource extends Thread HRegionInterface rrs = getRS(); LOG.debug("Replicating " + currentNbEntries); rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); - if (this.lastLoggedPosition != this.position) { + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); - this.lastLoggedPosition = this.position; + this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered); + this.lastLoggedPosition = this.repLogReader.getPosition(); } this.totalReplicatedEdits += currentNbEntries; this.metrics.shippedBatchesRate.inc(1); @@ -695,7 +681,8 @@ public class ReplicationSource extends Thread protected boolean processEndOfFile() { if (this.queue.size() != 0) { this.currentPath = null; - this.position = 0; + this.repLogReader.finishCurrentFile(); + this.reader = null; return true; } else if (this.queueRecovered) { this.manager.closeRecoveredQueue(this); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 9fa4480..8feb5d6 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -475,15 +475,13 @@ public class TestHLog { throw t.exception; // Make sure you can read all the content - SequenceFile.Reader reader - = new SequenceFile.Reader(this.fs, walPath, this.conf); + HLog.Reader reader = HLog.getReader(this.fs, walPath, this.conf); int count = 0; - HLogKey key = HLog.newKey(conf); - WALEdit val = new WALEdit(); - while (reader.next(key, val)) { + HLog.Entry entry = new HLog.Entry(); + while (reader.next(entry) != null) { count++; assertTrue("Should be one KeyValue per WALEdit", - val.getKeyValues().size() == 1); + entry.getEdit().getKeyValues().size() == 1); } assertEquals(total, count); reader.close(); diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java index 96eb211..14ad44f 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java @@ -61,7 +61,7 @@ public class TestReplication { private static final Log LOG = LogFactory.getLog(TestReplication.class); - private static Configuration conf1; + protected static Configuration conf1 = HBaseConfiguration.create(); private static Configuration conf2; private static Configuration CONF_WITH_LOCALFS; @@ -91,7 +91,6 @@ public class TestReplication { */ @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1 = HBaseConfiguration.create(); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // smaller block size and capacity to trigger more operations // and test them diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java new file mode 100644 index 0000000..108f853 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + */ +@Category(LargeTests.class) +public class TestReplicationWithCompression extends TestReplication { + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + TestReplication.setUpBeforeClass(); + } +}