From bd74e2866a6776d27d440fcdd06a1c6c6e04b7e5 Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 7 Jun 2017 14:48:45 -0700 Subject: [PATCH] HBASE-18137 Replication gets stuck for empty WALs --- .../hadoop/hbase/regionserver/HRegionServer.java | 3 +- .../regionserver/ReplicationSource.java | 13 ++-- .../replication/TestReplicationSmallTests.java | 83 ++++++++++++++++++++++ 3 files changed, 93 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index dd8bb5b..c32294d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2210,7 +2210,8 @@ public class HRegionServer extends HasThread implements * @return Return the object that implements the replication * source service. */ - ReplicationSourceService getReplicationSourceService() { + @VisibleForTesting + public ReplicationSourceService getReplicationSourceService() { return replicationSourceHandler; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 72d02f9..c2c4571 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -537,9 +537,9 @@ public class ReplicationSource extends Thread terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); } } + int sleepMultiplier = 1; // Loop until we close down while (isWorkerActive()) { - int sleepMultiplier = 1; // Sleep until replication is enabled again if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -617,7 +617,7 @@ public class ReplicationSource extends Thread if (considerDumping && sleepMultiplier == maxRetriesMultiplier && - processEndOfFile()) { + processEndOfFile(false)) { continue; } } @@ -740,7 +740,7 @@ public class ReplicationSource extends Thread } // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure - return seenEntries == 0 && processEndOfFile(); + return seenEntries == 0 && processEndOfFile(false); } /** @@ -925,7 +925,7 @@ public class ReplicationSource extends Thread // TODO Need a better way to determine if a file is really gone but // TODO without scanning all logs dir LOG.warn("Waited too long for this file, considering dumping"); - return !processEndOfFile(); + return !processEndOfFile(true); } } return true; @@ -1091,7 +1091,7 @@ public class ReplicationSource extends Thread */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE", justification = "Yeah, this is how it works") - protected boolean processEndOfFile() { + protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) { // We presume this means the file we're reading is closed. if (this.queue.size() != 0) { // -1 means the wal wasn't closed cleanly. @@ -1126,6 +1126,9 @@ public class ReplicationSource extends Thread LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats() + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen())); } + if (dumpOnlyIfZeroLength && stat.getLen() != 0) { + return false; + } this.currentPath = null; this.repLogReader.finishCurrentFile(); this.reader = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 42a127f..8364493 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -30,6 +30,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ClusterStatus; @@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; @@ -53,9 +55,14 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; @@ -758,4 +765,80 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } } + + @Test + public void testEmptyWAL() throws Exception { + final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size(); + + // for each RS, create an empty wal with same walGroupId + final List emptyWalPaths = new ArrayList<>(); + long ts = System.currentTimeMillis(); + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal); + String walGroupId = DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); + Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts); + utility1.getTestFileSystem().create(emptyWalPath).close(); + emptyWalPaths.add(emptyWalPath); + } + + // inject our empty wal into the replication queue + for (int i = 0; i < numRs; i++) { + Replication replicationService = + (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); + replicationService.preLogRoll(null, emptyWalPaths.get(i)); + replicationService.postLogRoll(null, emptyWalPaths.get(i)); + } + + // wait for ReplicationSource to start reading from our empty wal + waitForLogAdvance(numRs, emptyWalPaths, false); + + // roll the original wal, which enqueues a new wal behind our empty wal + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + wal.rollWriter(true); + } + + // ReplicationSource should advance past the empty wal, or else the test will fail + waitForLogAdvance(numRs, emptyWalPaths, true); + + // we're now writing to the new wal + // if everything works, the source should've stopped reading from the empty wal, and start + // replicating from the new wal + testSimplePutDelete(); + } + + /** + * Waits for the ReplicationSource to start reading from the given paths + * @param numRs number of regionservers + * @param emptyWalPaths path for each regionserver + * @param invert if true, waits until ReplicationSource is NOT reading from the given paths + */ + private void waitForLogAdvance(final int numRs, final List emptyWalPaths, + final boolean invert) throws Exception { + Waiter.waitFor(conf1, 5000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + for (int i = 0; i < numRs; i++) { + Replication replicationService = (Replication) utility1.getHBaseCluster() + .getRegionServer(i).getReplicationSourceService(); + for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() + .getSources()) { + ReplicationSource source = (ReplicationSource) rsi; + if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) { + return false; + } + if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) { + return false; + } + } + } + return true; + } + }); + } } -- 2.7.4