From 816457b9f463ed61de65f7531b89b121286b2082 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Mon, 22 May 2017 09:34:04 -0500 Subject: [PATCH] HBASE-15930 make ITReplication smarter Add methods to check that replication is correctly set up before running the tests and to actively poll for replication to complete rather than naively waiting for a set time. --- .../hbase/test/IntegrationTestReplication.java | 69 ++++++++++++++++++---- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java index bf534f3f9a..b95e7fc4c8 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java @@ -25,17 +25,22 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; -import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.replication.TableCFs; +import org.apache.hadoop.hbase.replication.ReplicationLoadSource; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -60,7 +65,7 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList { protected long numNodes; protected String outputDir; protected int numReducers; - protected int generateVerifyGap; + protected int generateVerifyGap = -1; protected Integer width; protected Integer wrapMultiplier; protected boolean noReplicationSetup = false; @@ -172,6 +177,29 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList { IntegrationTestBigLinkedList integrationTestBigLinkedList; /** + * This checks that replication has been set up on the cluster. + * + * @throws Exception if replication has not been setup + */ + protected void checkReplicationSetup() throws Exception { + TableName expected = getTableName(source.getConfiguration()); + try { + Admin admin = source.getConnection().getAdmin(); + for (TableCFs tableCFs : admin.listReplicatedTableCFs()) { + if (tableCFs.getTable().equals(expected)) { + if (tableCFs.getColumnFamilyMap().isEmpty()) { + // Replicating at least one CF is good enough + return; + } + } + } + throw new RuntimeException("Aborting test: replication not configured and we were instructed to not set it up."); + } finally { + source.closeConnection(); + } + } + + /** * This tears down any tables that existed from before and rebuilds the tables and schemas on * the source cluster. It then sets up replication from the source to the sink cluster by using * the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin} @@ -248,9 +276,24 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList { } protected void waitForReplication() throws Exception { - // TODO: we shouldn't be sleeping here. It would be better to query the region servers - // and wait for them to report 0 replication lag. - Thread.sleep(generateVerifyGap * 1000); + if (generateVerifyGap > 0) { + Thread.sleep(generateVerifyGap * 1000); + } else { + while (stillReplicating()) { + Thread.sleep(1000); + } + } + } + + private boolean stillReplicating() throws Exception { + ClusterStatus status = source.getConnection().getAdmin().getClusterStatus(); + for (ServerName sn : status.getServers()) { + ServerLoad load = status.getLoad(sn); + for (ReplicationLoadSource rls : load.getReplicationLoadSourceList()) { + if (rls.getReplicationLag() > 0) return true; + } + } + return false; } /** @@ -321,7 +364,9 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList { source = new ClusterID(getConf(), sourceClusterIdString); sink = new ClusterID(getConf(), sinkClusterIdString); - if (!noReplicationSetup) { + if (noReplicationSetup) { + checkReplicationSetup(); + } else { setupTablesAndReplication(); } int expectedNumNodes = 0; @@ -365,8 +410,8 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList { addOptWithArg("i", ITERATIONS_OPT, "Number of iterations to run (default: " + DEFAULT_NUM_ITERATIONS + ")"); addOptWithArg("t", GENERATE_VERIFY_GAP_OPT, - "Gap between generate and verify steps in seconds (default: " + - DEFAULT_GENERATE_VERIFY_GAP + ")"); + "Deprecated. If specified, the gap between generate and verify steps in seconds. " + + "By default, the tool will poll the source cluster until replication is completed."); addOptWithArg("w", WIDTH_OPT, "Width of the linked list chain (default: " + DEFAULT_WIDTH + ")"); addOptWithArg("wm", WRAP_MULTIPLIER_OPT, "How many times to wrap around (default: " + @@ -390,9 +435,9 @@ public class IntegrationTestReplication extends IntegrationTestBigLinkedList { 1, Integer.MAX_VALUE); numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)), 1, Integer.MAX_VALUE); - generateVerifyGap = parseInt(cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT, - Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)), - 1, Integer.MAX_VALUE); + if (cmd.hasOption(GENERATE_VERIFY_GAP_OPT)) { + generateVerifyGap = parseInt(cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT),1, Integer.MAX_VALUE); + } numIterations = parseInt(cmd.getOptionValue(ITERATIONS_OPT, Integer.toString(DEFAULT_NUM_ITERATIONS)), 1, Integer.MAX_VALUE); -- 2.13.0