diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 2aa3419..7f5fcd0 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -167,6 +167,7 @@ public class HLog implements Syncable { Entry next(Entry reuse) throws IOException; void seek(long pos) throws IOException; long getPosition() throws IOException; + void reset() throws IOException; } public interface Writer { @@ -693,16 +694,18 @@ public class HLog implements Syncable { } /** - * Get a reader for the WAL. + * Get a reader for the WAL. If you are reading from a file that's being written to + * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method + * then just seek back to the last known good position. * @param fs * @param path * @param conf * @return A WAL reader. Close when done with it. * @throws IOException */ - public static Reader getReader(final FileSystem fs, - final Path path, Configuration conf) - throws IOException { + public static Reader getReader(final FileSystem fs, final Path path, + Configuration conf) + throws IOException { try { if (logReaderClass == null) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index f5fb00f..7e271bf 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -139,11 +139,13 @@ public class SequenceFileLogReader implements HLog.Reader { Configuration conf; WALReader reader; + FileSystem fs; // Needed logging exceptions Path path; int edit = 0; long entryStart = 0; + boolean emptyCompressionContext = true; /** * Compression context to use reading. Can be null if no compression. */ @@ -173,6 +175,7 @@ public class SequenceFileLogReader implements HLog.Reader { this.conf = conf; this.path = path; reader = new WALReader(fs, path, conf); + this.fs = fs; // If compression is enabled, new dictionaries are created here. boolean compression = reader.isWALCompressionEnabled(); @@ -237,11 +240,22 @@ public class SequenceFileLogReader implements HLog.Reader { throw addFileInfoToException(ioe); } edit++; + if (compressionContext != null && emptyCompressionContext) { + emptyCompressionContext = false; + } return b? e: null; } @Override public void seek(long pos) throws IOException { + if (compressionContext != null && emptyCompressionContext) { + while (next() != null) { + if (getPosition() == pos) { + emptyCompressionContext = false; + break; + } + } + } try { reader.seek(pos); } catch (IOException ioe) { @@ -286,4 +300,11 @@ public class SequenceFileLogReader implements HLog.Reader { return ioe; } + + @Override + public void reset() throws IOException { + // Resetting the reader lets us see newly added data if the file is being written to + // We also keep the same compressionContext which was previously populated for this file + reader = new WALReader(fs, path, conf); + } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java new file mode 100644 index 0000000..02b2086 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.wal.HLog; + +import java.io.IOException; + +/** + * Wrapper class around HLog to help manage the implementation details + * such as compression. + */ +public class ReplicationHLogReaderManager { + + private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class); + private final FileSystem fs; + private final Configuration conf; + private long position = 0; + private HLog.Reader reader; + private Path lastPath; + + /** + * Creates the helper but doesn't open any file + * Use setInitialPosition after using the constructor if some content needs to be skipped + * @param fs + * @param conf + */ + public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) { + this.fs = fs; + this.conf = conf; + } + + /** + * Opens the file at the current position + * @param path + * @return + * @throws IOException + */ + public HLog.Reader openReader(Path path) throws IOException { + // Detect if this is a new file, if so get a new reader else + // reset the current reader so that we see the new data + if (this.reader == null || !this.lastPath.equals(path)) { + this.reader = HLog.getReader(this.fs, path, this.conf); + this.lastPath = path; + } else { + this.reader.reset(); + } + return this.reader; + } + + /** + * Get the next entry, returned and also added in the array + * @param entriesArray + * @param currentNbEntries + * @return a new entry or null + * @throws IOException + */ + public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray, int currentNbEntries) throws IOException { + HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]); + // Store the position so that in the future the reader can start + // reading from here. If the above call to next() throws an + // exception, the position won't be changed and retry will happen + // from the last known good position + this.position = this.reader.getPosition(); + // We need to set the CC to null else it will be compressed when sent to the sink + if (entry != null) { + entry.setCompressionContext(null); + } + return entry; + } + + /** + * Advance the reader to the current position + * @throws IOException + */ + public void seek() throws IOException { + if (this.position != 0) { + this.reader.seek(this.position); + } + } + + /** + * Get the position that we stopped reading at + * @return current position, cannot be negative + */ + public long getPosition() { + return this.position; + } + + public void setPosition(long pos) { + this.position = pos; + } + + /** + * Close the current reader + * @throws IOException + */ + public void closeReader() throws IOException { + if (this.reader != null) { + this.reader.close(); + } + } + + /** + * Tell the helper to reset internal state + */ + public void finishCurrentFile() { + this.position = 0; + this.reader = null; + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a632977..62c475a 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -105,8 +105,6 @@ public class ReplicationSource extends Thread private int replicationQueueNbCapacity; // Our reader for the current log private HLog.Reader reader; - // Current position in the log - private long position = 0; // Last position in the log that we sent to ZooKeeper private long lastLoggedPosition = -1; // Path of the current log @@ -132,10 +130,15 @@ public class ReplicationSource extends Thread private int currentNbEntries = 0; // Current number of operations (Put/Delete) that we need to replicate private int currentNbOperations = 0; + // Current size of data we need to replicate + private int currentSize = 0; // Indicates if this particular source is running private volatile boolean running = true; // Metrics for this source private ReplicationSourceMetrics metrics; + // Handle on the log reader helper + private ReplicationHLogReaderManager repLogReader; + /** * Instantiation method used by region servers @@ -183,7 +186,7 @@ public class ReplicationSource extends Thread this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; this.metrics = new ReplicationSourceMetrics(peerClusterZnode); - + this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); try { this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher()); } catch (KeeperException ke) { @@ -263,8 +266,8 @@ public class ReplicationSource extends Thread // normally has a position (unless the RS failed between 2 logs) if (this.queueRecovered) { try { - this.position = this.zkHelper.getHLogRepPosition( - this.peerClusterZnode, this.queue.peek().getName()); + this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition( + this.peerClusterZnode, this.queue.peek().getName())); } catch (KeeperException e) { this.terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); @@ -338,9 +341,7 @@ public class ReplicationSource extends Thread } } finally { try { - if (this.reader != null) { - this.reader.close(); - } + this.repLogReader.closeReader(); } catch (IOException e) { gotIOE = true; LOG.warn("Unable to finalize the tailing of a file", e); @@ -351,10 +352,10 @@ public class ReplicationSource extends Thread // wait a bit and retry. // But if we need to stop, don't bother sleeping if (this.isActive() && (gotIOE || currentNbEntries == 0)) { - if (this.lastLoggedPosition != this.position) { + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); - this.lastLoggedPosition = this.position; + this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered); + this.lastLoggedPosition = this.repLogReader.getPosition(); } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { sleepMultiplier++; @@ -384,11 +385,8 @@ public class ReplicationSource extends Thread */ protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{ long seenEntries = 0; - if (this.position != 0) { - this.reader.seek(this.position); - } - long startPosition = this.position; - HLog.Entry entry = readNextAndSetPosition(); + this.repLogReader.seek(); + HLog.Entry entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); while (entry != null) { WALEdit edit = entry.getEdit(); this.metrics.logEditsReadRate.inc(1); @@ -412,18 +410,18 @@ public class ReplicationSource extends Thread } currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; + currentSize += entry.getEdit().size(); } else { this.metrics.logEditsFilteredRate.inc(1); } } // Stop if too many entries or too big - if ((this.reader.getPosition() - startPosition) - >= this.replicationQueueSizeCapacity || + if (currentSize >= this.replicationQueueSizeCapacity || currentNbEntries >= this.replicationQueueNbCapacity) { break; } try { - entry = readNextAndSetPosition(); + entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); } catch (IOException ie) { LOG.debug("Break on IOE: " + ie.getMessage()); break; @@ -434,16 +432,6 @@ public class ReplicationSource extends Thread return seenEntries == 0 && processEndOfFile(); } - private HLog.Entry readNextAndSetPosition() throws IOException { - HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]); - // Store the position so that in the future the reader can start - // reading from here. If the above call to next() throws an - // exception, the position won't be changed and retry will happen - // from the last known good position - this.position = this.reader.getPosition(); - return entry; - } - private void connectToPeers() { // Connect to peer cluster first, unless we have to stop while (this.isActive() && this.currentPeers.size() == 0) { @@ -482,8 +470,7 @@ public class ReplicationSource extends Thread protected boolean openReader(int sleepMultiplier) { try { try { - this.reader = null; - this.reader = HLog.getReader(this.fs, this.currentPath, this.conf); + this.reader = repLogReader.openReader(this.currentPath); } catch (FileNotFoundException fnfe) { if (this.queueRecovered) { // We didn't find the log in the archive directory, look if it still @@ -616,10 +603,10 @@ public class ReplicationSource extends Thread try { HRegionInterface rrs = getRS(); rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); - if (this.lastLoggedPosition != this.position) { + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); - this.lastLoggedPosition = this.position; + this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered); + this.lastLoggedPosition = this.repLogReader.getPosition(); } this.totalReplicatedEdits += currentNbEntries; this.metrics.shippedBatchesRate.inc(1); @@ -688,7 +675,8 @@ public class ReplicationSource extends Thread protected boolean processEndOfFile() { if (this.queue.size() != 0) { this.currentPath = null; - this.position = 0; + this.repLogReader.finishCurrentFile(); + this.reader = null; return true; } else if (this.queueRecovered) { this.manager.closeRecoveredQueue(this); @@ -820,8 +808,14 @@ public class ReplicationSource extends Thread @Override public String getStats() { + String position; + try { + position = this.reader.getPosition()+""; + } catch (IOException ioe) { + position = "N/A"; + } return "Total replicated edits: " + totalReplicatedEdits + ", currently replicating from: " + this.currentPath + - " at position: " + this.position; + " at position: " + position; } } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 9fa4480..8feb5d6 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -475,15 +475,13 @@ public class TestHLog { throw t.exception; // Make sure you can read all the content - SequenceFile.Reader reader - = new SequenceFile.Reader(this.fs, walPath, this.conf); + HLog.Reader reader = HLog.getReader(this.fs, walPath, this.conf); int count = 0; - HLogKey key = HLog.newKey(conf); - WALEdit val = new WALEdit(); - while (reader.next(key, val)) { + HLog.Entry entry = new HLog.Entry(); + while (reader.next(entry) != null) { count++; assertTrue("Should be one KeyValue per WALEdit", - val.getKeyValues().size() == 1); + entry.getEdit().getKeyValues().size() == 1); } assertEquals(total, count); reader.close(); diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java index ebfcef5..fc89ca7 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java @@ -61,7 +61,7 @@ public class TestReplication { private static final Log LOG = LogFactory.getLog(TestReplication.class); - private static Configuration conf1; + protected static Configuration conf1 = HBaseConfiguration.create(); private static Configuration conf2; private static Configuration CONF_WITH_LOCALFS; @@ -91,7 +91,6 @@ public class TestReplication { */ @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1 = HBaseConfiguration.create(); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // smaller block size and capacity to trigger more operations // and test them @@ -521,7 +520,7 @@ public class TestReplication { // disable and start the peer admin.disablePeer("2"); - utility2.startMiniHBaseCluster(1, 1); + utility2.startMiniHBaseCluster(1, 2); Get get = new Get(rowkey); for (int i = 0; i < NB_RETRIES; i++) { Result res = htable2.get(get); @@ -761,7 +760,8 @@ public class TestReplication { int lastCount = 0; final long start = System.currentTimeMillis(); - for (int i = 0; i < NB_RETRIES; i++) { + int i = 0; + while (true) { if (i==NB_RETRIES-1) { fail("Waited too much time for queueFailover replication. " + "Waited "+(System.currentTimeMillis() - start)+"ms."); @@ -773,6 +773,8 @@ public class TestReplication { if (res2.length < initialCount) { if (lastCount < res2.length) { i--; // Don't increment timeout if we make progress + } else { + i++; } lastCount = res2.length; LOG.info("Only got " + lastCount + " rows instead of " + @@ -792,7 +794,7 @@ public class TestReplication { Thread.sleep(timeout); utility.expireRegionServerSession(rs); } catch (Exception e) { - LOG.error(e); + LOG.error("Couldn't kill a region server", e); } } }; diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java new file mode 100644 index 0000000..2c87a7b --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithCompression.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Run the same test as TestReplication but with HLog compression enabled + */ +@Category(LargeTests.class) +public class TestReplicationWithCompression extends TestReplication { + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + TestReplication.setUpBeforeClass(); + } +}