diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 763b8d4..0b323df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -71,6 +71,7 @@ public interface HLog { void seek(long pos) throws IOException; long getPosition() throws IOException; + void reset() throws IOException; } public interface Writer { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index ea83c87..6600d47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -62,7 +62,9 @@ public class HLogFactory { } /** - * Create a reader for the WAL. + * Create a reader for the WAL. If you are reading from a file that's being written to + * and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method + * then just seek back to the last known good position. * @return A WAL reader. Close when done with it. * @throws IOException */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index 7448f09..ed52c0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -140,11 +140,13 @@ public class SequenceFileLogReader implements HLog.Reader { Configuration conf; WALReader reader; + FileSystem fs; // Needed logging exceptions Path path; int edit = 0; long entryStart = 0; + boolean emptyCompressionContext = true; /** * Compression context to use reading. Can be null if no compression. */ @@ -174,6 +176,7 @@ public class SequenceFileLogReader implements HLog.Reader { this.conf = conf; this.path = path; reader = new WALReader(fs, path, conf); + this.fs = fs; // If compression is enabled, new dictionaries are created here. boolean compression = reader.isWALCompressionEnabled(); @@ -238,11 +241,22 @@ public class SequenceFileLogReader implements HLog.Reader { throw addFileInfoToException(ioe); } edit++; + if (compressionContext != null && emptyCompressionContext) { + emptyCompressionContext = false; + } return b? e: null; } @Override public void seek(long pos) throws IOException { + if (compressionContext != null && emptyCompressionContext) { + while (next() != null) { + if (getPosition() == pos) { + emptyCompressionContext = false; + break; + } + } + } try { reader.seek(pos); } catch (IOException ioe) { @@ -287,4 +301,11 @@ public class SequenceFileLogReader implements HLog.Reader { return ioe; } + + @Override + public void reset() throws IOException { + // Resetting the reader lets us see newly added data if the file is being written to + // We also keep the same compressionContext which was previously populated for this file + reader = new WALReader(fs, path, conf); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index bf9bc7e..93e938f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -126,7 +126,7 @@ public class SequenceFileLogWriter implements HLog.Writer { public void init(FileSystem fs, Path path, Configuration conf) throws IOException { // Should we do our custom WAL compression? - boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); if (compress) { try { if (this.compressionContext == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java new file mode 100644 index 0000000..a51ef89 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; + +import java.io.IOException; + +/** + * Wrapper class around HLog to help manage the implementation details + * such as compression. + */ +public class ReplicationHLogReaderManager { + + private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class); + private final FileSystem fs; + private final Configuration conf; + private long position = 0; + private HLog.Reader reader; + private Path lastPath; + + /** + * Creates the helper but doesn't open any file + * Use setInitialPosition after using the constructor if some content needs to be skipped + * @param fs + * @param conf + */ + public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) { + this.fs = fs; + this.conf = conf; + } + + /** + * Opens the file at the current position + * @param path + * @return + * @throws IOException + */ + public HLog.Reader openReader(Path path) throws IOException { + // Detect if this is a new file, if so get a new reader else + // reset the current reader so that we see the new data + if (this.reader == null || !this.lastPath.equals(path)) { + this.reader = HLogFactory.createReader(this.fs, path, this.conf); + this.lastPath = path; + } else { + this.reader.reset(); + } + return this.reader; + } + + /** + * Get the next entry, returned and also added in the array + * @param entriesArray + * @param currentNbEntries + * @return a new entry or null + * @throws IOException + */ + public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray, int currentNbEntries) throws IOException { + HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]); + // Store the position so that in the future the reader can start + // reading from here. If the above call to next() throws an + // exception, the position won't be changed and retry will happen + // from the last known good position + this.position = this.reader.getPosition(); + // We need to set the CC to null else it will be compressed when sent to the sink + if (entry != null) { + entry.setCompressionContext(null); + } + return entry; + } + + /** + * Advance the reader to the current position + * @throws IOException + */ + public void seek() throws IOException { + if (this.position != 0) { + this.reader.seek(this.position); + } + } + + /** + * Get the position that we stopped reading at + * @return current position, cannot be negative + */ + public long getPosition() { + return this.position; + } + + public void setPosition(long pos) { + this.position = pos; + } + + /** + * Close the current reader + * @throws IOException + */ + public void closeReader() throws IOException { + if (this.reader != null) { + this.reader.close(); + } + } + + /** + * Tell the helper to reset internal state + */ + public void finishCurrentFile() { + this.position = 0; + this.reader = null; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f08738b..5e02bbd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -107,8 +107,6 @@ public class ReplicationSource extends Thread private int replicationQueueNbCapacity; // Our reader for the current log private HLog.Reader reader; - // Current position in the log - private long position = 0; // Last position in the log that we sent to ZooKeeper private long lastLoggedPosition = -1; // Path of the current log @@ -134,10 +132,14 @@ public class ReplicationSource extends Thread private int currentNbEntries = 0; // Current number of operations (Put/Delete) that we need to replicate private int currentNbOperations = 0; + // Current size of data we need to replicate + private int currentSize = 0; // Indicates if this particular source is running private volatile boolean running = true; // Metrics for this source private MetricsSource metrics; + // Handle on the log reader helper + private ReplicationHLogReaderManager repLogReader; /** * Instantiation method used by region servers @@ -185,7 +187,7 @@ public class ReplicationSource extends Thread this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; this.metrics = new MetricsSource(peerClusterZnode); - + this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); try { this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher()); } catch (KeeperException ke) { @@ -266,8 +268,8 @@ public class ReplicationSource extends Thread // normally has a position (unless the RS failed between 2 logs) if (this.queueRecovered) { try { - this.position = this.zkHelper.getHLogRepPosition( - this.peerClusterZnode, this.queue.peek().getName()); + this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition( + this.peerClusterZnode, this.queue.peek().getName())); } catch (KeeperException e) { this.terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); @@ -325,6 +327,7 @@ public class ReplicationSource extends Thread boolean gotIOE = false; currentNbEntries = 0; + currentSize = 0; try { if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) { continue; @@ -360,9 +363,7 @@ public class ReplicationSource extends Thread } } finally { try { - if (this.reader != null) { - this.reader.close(); - } + this.repLogReader.closeReader(); } catch (IOException e) { gotIOE = true; LOG.warn("Unable to finalize the tailing of a file", e); @@ -373,10 +374,11 @@ public class ReplicationSource extends Thread // wait a bit and retry. // But if we need to stop, don't bother sleeping if (this.isActive() && (gotIOE || currentNbEntries == 0)) { - if (this.lastLoggedPosition != this.position) { + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo); - this.lastLoggedPosition = this.position; + this.peerClusterZnode, this.repLogReader.getPosition(), + queueRecovered, currentWALisBeingWrittenTo); + this.lastLoggedPosition = this.repLogReader.getPosition(); } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { sleepMultiplier++; @@ -409,11 +411,8 @@ public class ReplicationSource extends Thread protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo) throws IOException{ long seenEntries = 0; - if (this.position != 0) { - this.reader.seek(this.position); - } - long startPosition = this.position; - HLog.Entry entry = readNextAndSetPosition(); + this.repLogReader.seek(); + HLog.Entry entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); while (entry != null) { WALEdit edit = entry.getEdit(); this.metrics.incrLogEditsRead(); @@ -437,18 +436,18 @@ public class ReplicationSource extends Thread } currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; + currentSize += entry.getEdit().size(); } else { this.metrics.incrLogEditsFiltered(); } } // Stop if too many entries or too big - if ((this.reader.getPosition() - startPosition) - >= this.replicationQueueSizeCapacity || + if (currentSize >= this.replicationQueueSizeCapacity || currentNbEntries >= this.replicationQueueNbCapacity) { break; } try { - entry = readNextAndSetPosition(); + entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); } catch (IOException ie) { LOG.debug("Break on IOE: " + ie.getMessage()); break; @@ -462,16 +461,6 @@ public class ReplicationSource extends Thread return seenEntries == 0 && processEndOfFile(); } - private HLog.Entry readNextAndSetPosition() throws IOException { - HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]); - // Store the position so that in the future the reader can start - // reading from here. If the above call to next() throws an - // exception, the position won't be changed and retry will happen - // from the last known good position - this.position = this.reader.getPosition(); - return entry; - } - private void connectToPeers() { // Connect to peer cluster first, unless we have to stop while (this.isActive() && this.currentPeers.size() == 0) { @@ -510,9 +499,7 @@ public class ReplicationSource extends Thread protected boolean openReader(int sleepMultiplier) { try { try { - this.reader = null; - this.reader = HLogFactory.createReader(this.fs, - this.currentPath, this.conf); + this.reader = repLogReader.openReader(this.currentPath); } catch (FileNotFoundException fnfe) { if (this.queueRecovered) { // We didn't find the log in the archive directory, look if it still @@ -648,10 +635,11 @@ public class ReplicationSource extends Thread AdminProtocol rrs = getRS(); ProtobufUtil.replicateWALEntry(rrs, Arrays.copyOf(this.entriesArray, currentNbEntries)); - if (this.lastLoggedPosition != this.position) { + if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo); - this.lastLoggedPosition = this.position; + this.peerClusterZnode, this.repLogReader.getPosition(), + queueRecovered, currentWALisBeingWrittenTo); + this.lastLoggedPosition = this.repLogReader.getPosition(); } this.totalReplicatedEdits += currentNbEntries; this.metrics.shipBatch(this.currentNbOperations); @@ -718,7 +706,8 @@ public class ReplicationSource extends Thread protected boolean processEndOfFile() { if (this.queue.size() != 0) { this.currentPath = null; - this.position = 0; + this.repLogReader.finishCurrentFile(); + this.reader = null; return true; } else if (this.queueRecovered) { this.manager.closeRecoveredQueue(this); @@ -842,8 +831,14 @@ public class ReplicationSource extends Thread @Override public String getStats() { + String position; + try { + position = this.reader.getPosition()+""; + } catch (IOException ioe) { + position = "N/A"; + } return "Total replicated edits: " + totalReplicatedEdits + ", currently replicating from: " + this.currentPath + - " at position: " + this.position; + " at position: " + position; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 265f777..7006395 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -477,15 +477,13 @@ public class TestHLog { throw t.exception; // Make sure you can read all the content - SequenceFile.Reader reader - = new SequenceFile.Reader(this.fs, walPath, this.conf); + HLog.Reader reader = HLogFactory.createReader(this.fs, walPath, this.conf); int count = 0; - HLogKey key = HLogUtil.newKey(conf); - WALEdit val = new WALEdit(); - while (reader.next(key, val)) { + HLog.Entry entry = new HLog.Entry(); + while (reader.next(entry) != null) { count++; assertTrue("Should be one KeyValue per WALEdit", - val.getKeyValues().size() == 1); + entry.getEdit().getKeyValues().size() == 1); } assertEquals(total, count); reader.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java deleted file mode 100644 index 7e57359..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.wal; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MediumTests; -import org.junit.BeforeClass; -import org.junit.experimental.categories.Category; - -/** - * Enables compression and runs the TestWALReplay tests. - */ -@Category(MediumTests.class) -public class TestWALReplayCompressed extends TestWALReplay { - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TestWALReplay.setUpBeforeClass(); - Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); - conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); - } - -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java index 616d194..5741a07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java @@ -60,7 +60,7 @@ public class TestReplication { private static final Log LOG = LogFactory.getLog(TestReplication.class); - private static Configuration conf1; + protected static Configuration conf1 = HBaseConfiguration.create(); private static Configuration conf2; private static Configuration CONF_WITH_LOCALFS; @@ -90,7 +90,6 @@ public class TestReplication { */ @BeforeClass public static void setUpBeforeClass() throws Exception { - conf1 = HBaseConfiguration.create(); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // smaller block size and capacity to trigger more operations // and test them @@ -520,7 +519,7 @@ public class TestReplication { // disable and start the peer admin.disablePeer("2"); - utility2.startMiniHBaseCluster(1, 1); + utility2.startMiniHBaseCluster(1, 2); Get get = new Get(rowkey); for (int i = 0; i < NB_RETRIES; i++) { Result res = htable2.get(get); @@ -760,7 +759,8 @@ public class TestReplication { int lastCount = 0; final long start = System.currentTimeMillis(); - for (int i = 0; i < NB_RETRIES; i++) { + int i = 0; + while (true) { if (i==NB_RETRIES-1) { fail("Waited too much time for queueFailover replication. " + "Waited "+(System.currentTimeMillis() - start)+"ms."); @@ -772,6 +772,8 @@ public class TestReplication { if (res2.length < initialCount) { if (lastCount < res2.length) { i--; // Don't increment timeout if we make progress + } else { + i++; } lastCount = res2.length; LOG.info("Only got " + lastCount + " rows instead of " + @@ -791,7 +793,7 @@ public class TestReplication { Thread.sleep(timeout); utility.expireRegionServerSession(rs); } catch (Exception e) { - LOG.error(e); + LOG.error("Couldn't kill a region server", e); } } };