From 41b2ae4e868f745dc0f42532a1ff55407f541e0c Mon Sep 17 00:00:00 2001 From: Vincent Date: Wed, 7 Jun 2017 14:48:45 -0700 Subject: [PATCH] HBASE-18137 Replication gets stuck for empty WALs --- .../hadoop/hbase/regionserver/HRegionServer.java | 3 +- .../regionserver/ReplicationSource.java | 2 +- .../replication/TestReplicationSmallTests.java | 90 ++++++++++++++++++++++ 3 files changed, 93 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index dd8bb5b..c32294d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2210,7 +2210,8 @@ public class HRegionServer extends HasThread implements * @return Return the object that implements the replication * source service. */ - ReplicationSourceService getReplicationSourceService() { + @VisibleForTesting + public ReplicationSourceService getReplicationSourceService() { return replicationSourceHandler; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 72d02f9..43fc7d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -537,9 +537,9 @@ public class ReplicationSource extends Thread terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); } } + int sleepMultiplier = 1; // Loop until we close down while (isWorkerActive()) { - int sleepMultiplier = 1; // Sleep until replication is enabled again if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 42a127f..387d663 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -30,6 +30,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ClusterStatus; @@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; @@ -53,9 +55,14 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; @@ -758,4 +765,87 @@ public class TestReplicationSmallTests extends TestReplicationBase { } } } + + @Test + public void testEmptyWAL() throws Exception { + final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size(); + + // for each RS, create an empty wal with same walGroupId + List emptyWalPaths = new ArrayList<>(); + long ts = System.currentTimeMillis(); + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal); + String walGroupId = DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); + Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts); + utility1.getTestFileSystem().create(emptyWalPath).close(); + emptyWalPaths.add(emptyWalPath); + } + + // inject our empty wal into the replication queue + for (int i = 0; i < numRs; i++) { + Replication replicationService = + (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); + replicationService.preLogRoll(null, emptyWalPaths.get(i)); + replicationService.postLogRoll(null, emptyWalPaths.get(i)); + } + + // wait for ReplicationSource to start reading from our empty wal + Waiter.waitFor(conf1, 5000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + for (int i = 0; i < numRs; i++) { + Replication replicationService = (Replication) utility1.getHBaseCluster() + .getRegionServer(i).getReplicationSourceService(); + for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() + .getSources()) { + ReplicationSource source = (ReplicationSource) rsi; + MetricsSource sourceMetrics = source.getSourceMetrics(); + int sizeOfLogQueue = sourceMetrics.getSizeOfLogQueue(); + if (sizeOfLogQueue > 0) { + return false; + } + } + } + return true; + } + }); + + // roll the original wal, which enqueues a new wal behind our empty wal + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + System.out.println("rolling writers"); + wal.rollWriter(true); + } + + // ReplicationSource should advance past the empty wal, or else the test will fail + Waiter.waitFor(conf1, 5000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + for (int i = 0; i < numRs; i++) { + Replication replicationService = (Replication) utility1.getHBaseCluster() + .getRegionServer(i).getReplicationSourceService(); + for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() + .getSources()) { + ReplicationSource source = (ReplicationSource) rsi; + MetricsSource sourceMetrics = source.getSourceMetrics(); + int sizeOfLogQueue = sourceMetrics.getSizeOfLogQueue(); + if (sizeOfLogQueue > 0) { + return false; + } + } + } + return true; + } + }); + + // we're now writing to the new wal + // if everything works, the source should've stopped reading from the empty wal, and start + // replicating from the new wal + testSimplePutDelete(); + } } -- 2.7.4