From a37e1f7365b78ead70f91d6c7c1d48e0b04e8b16 Mon Sep 17 00:00:00 2001 From: Rajesh Nishtala Date: Thu, 2 Apr 2015 14:51:51 -0700 Subject: [PATCH] New integration test for oss => oss replication Summary: This test subclasses the BigLinkedList test. It takes two hbase clusters as arguments, sets up tables, sets up replication, and runs the BigLinkedList generator. The verification portion of the loop checks that the sink of the replication has the data and it is correct. Test Plan: ran the test on my laptop. more testing required on a live cluster but this is an RFC Reviewers: eclark Subscribers: asameet Differential Revision: https://reviews.facebook.net/D36423 --- .../apache/hadoop/hbase/IntegrationTestBase.java | 8 +- .../hbase/test/IntegrationTestReplication.java | 332 +++++++++++++++++++++ 2 files changed, 338 insertions(+), 2 deletions(-) create mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java index f45fb04..cef0917 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java @@ -71,8 +71,7 @@ public abstract class IntegrationTestBase extends AbstractHBaseTool { + "monkey properties."); } - @Override - protected void processOptions(CommandLine cmd) { + protected void processBaseOptions(CommandLine cmd) { if (cmd.hasOption(MONKEY_LONG_OPT)) { monkeyToUse = cmd.getOptionValue(MONKEY_LONG_OPT); } @@ -95,6 +94,11 @@ public abstract class IntegrationTestBase extends AbstractHBaseTool { } @Override + protected void processOptions(CommandLine cmd) { + processBaseOptions(cmd); + } + + @Override public Configuration getConf() { Configuration c = super.getConf(); if (c == null && util != null) { diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java new file mode 100644 index 0000000..16088cd --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestReplication.java @@ -0,0 +1,332 @@ +package org.apache.hadoop.hbase.test; + +import com.google.common.base.Joiner; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + + +public class IntegrationTestReplication extends IntegrationTestBigLinkedList { + + protected String sourceClusterIdString; + protected String sinkClusterIdString; + protected int numIterations; + protected int numMappers; + protected long numNodes; + protected String outputDir; + protected int numReducers; + protected int generateVerifyGap; + protected Integer width; + protected Integer wrapMultiplier; + + private final String SOURCE_CLUSTER_OPT = "sourceCluster"; + private final String DEST_CLUSTER_OPT = "destCluster"; + private final String ITERATIONS_OPT = "iterations"; + private final String NUM_MAPPERS_OPT = "numMappers"; + private final String NUM_NODES_OPT = "numNodes"; + private final String OUTPUT_DIR_OPT = "outputDir"; + private final String NUM_REDUCERS_OPT = "numReducers"; + private final String GENERATE_VERIFY_GAP_OPT = "generateVerifyGap"; + private final String WIDTH_OPT = "width"; + private final String WRAP_MULTIPLIER_OPT = "wrapMultiplier"; + + private final int DEFAULT_NUM_MAPPERS = 1; + private final int DEFAULT_NUM_REDUCERS = 1; + private final int DEFAULT_NUM_ITERATIONS = 1; + private final int DEFAULT_GENERATE_VERIFY_GAP = 60; + private final int DEFAULT_WIDTH = 1000000; + private final int DEFAULT_WRAP_MULTIPLIER = 25; + private final int DEFAULT_NUM_NODES = DEFAULT_WIDTH * DEFAULT_WRAP_MULTIPLIER; + + /** + * Wrapper around an HBase ClusterID allowing us + * to get admin connections and configurations for it + */ + protected class ClusterID { + private final Configuration configuration; + private Connection connection = null; + + /** + * This creates a new ClusterID wrapper that will automatically build connections and configurations + * to be able to talk to the specified cluster + * + * @param base the base configuration that this class will add to + * @param key the cluster key in the form of zk_quorum:zk_port:zk_parent_node + */ + public ClusterID(Configuration base, + String key) { + configuration = new Configuration(base); + String[] parts = key.split(":"); + configuration.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); + configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]); + configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]); + } + + public String toString() { + return Joiner.on(":").join(configuration.get(HConstants.ZOOKEEPER_QUORUM), + configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT), + configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + } + + public Configuration getConfiguration() { + return this.configuration; + } + + public Connection getConnection() throws Exception { + if (this.connection == null) { + this.connection = ConnectionFactory.createConnection(this.configuration); + } + return this.connection; + } + + public boolean equals(ClusterID other) { + return this.toString().equalsIgnoreCase(other.toString()); + } + } + + /** + * The main runner loop for the test. It uses the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} + * for the generation and verification of the linked list. It is heavily based on + * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Loop} + */ + protected class VerifyReplicationLoop extends Configured implements Tool { + private final Log LOG = LogFactory.getLog(Loop.class); + protected ClusterID source; + protected ClusterID sink; + + IntegrationTestBigLinkedList it; + + /** + * This tears down any tables that existed from before and rebuilds the tables and schemas on the source cluster. + * It then sets up replication from the source to the sink cluster by using the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin} + * connection. + * + * @throws Exception + */ + protected void setupTablesAndReplication() throws Exception { + TableName tableName = getTableName(source.getConfiguration()); + + ClusterID[] clusters = {source, sink}; + + // delete any old tables in the source and sink + for (ClusterID cluster : clusters) { + Admin admin = cluster.getConnection().getAdmin(); + + if (admin.tableExists(tableName)) { + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName); + } + + // TODO: This is a work around on a replication bug + // When we recreate a table against that has recently been + // deleted, the contents of the logs are replayed even though + // they should not. This ensures that we flush the logs + // before the table gets deleted. Eventually the bug should be + // fixed and this should be removed. + Set regionServers = new TreeSet<>(); + for (HRegionLocation rl : cluster.getConnection().getRegionLocator(tableName).getAllRegionLocations()) { + regionServers.add(rl.getServerName()); + } + + for (ServerName server : regionServers) { + source.getConnection().getAdmin().rollWALWriter(server); + } + + admin.deleteTable(tableName); + } + } + + // create the schema + Generator generator = new Generator(); + generator.setConf(source.getConfiguration()); + generator.createSchema(); + + // setup the replication on the source + if (!source.equals(sink)) { + ReplicationAdmin replicationAdmin = new ReplicationAdmin(source.getConfiguration()); + // remove any old replication peers + for (String oldPeer : replicationAdmin.listPeerConfigs().keySet()) { + replicationAdmin.removePeer(oldPeer); + } + + // set the sink to be the target + ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); + peerConfig.setClusterKey(sink.toString()); + + // set the test table to be the table to replicate + HashMap> toReplicate = new HashMap<>(); + toReplicate.put(getTableName(source.getConfiguration()), new ArrayList(0)); + + replicationAdmin.addPeer("TestPeer", peerConfig, toReplicate); + + replicationAdmin.enableTableRep(getTableName(source.getConfiguration())); + replicationAdmin.close(); + } + } + + protected void waitForReplication() throws Exception { + // TODO: we shouldn't be sleeping here. It would be better to query the region servers + // and wait for them to report 0 replication lag. + Thread.sleep(generateVerifyGap * 1000); + } + + /** + * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Generator} in the source cluster + * Assumes that the tables have been setup via setupTablesAndReplication + * + * @throws Exception + */ + protected void runGenerator() throws Exception { + Path outputPath = new Path(outputDir); + UUID uuid = UUID.randomUUID(); //create a random UUID. + Path generatorOutput = new Path(outputPath, uuid.toString()); + + Generator generator = new Generator(); + generator.setConf(source.getConfiguration()); + + int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier); + if (retCode > 0) { + throw new RuntimeException("Generator failed with return code: " + retCode); + } + } + + + /** + * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Verify} in the sink cluster. + * If replication is working properly the data written at the source cluster should be available in the sink cluster + * after a reasonable gap + * + * @param expectedNumNodes the number of nodes we are expecting to see in the sink cluster + * @throws Exception + */ + protected void runVerify(long expectedNumNodes) throws Exception { + Path outputPath = new Path(outputDir); + UUID uuid = UUID.randomUUID(); //create a random UUID. + Path iterationOutput = new Path(outputPath, uuid.toString()); + + Verify verify = new Verify(); + verify.setConf(sink.getConfiguration()); + + int retCode = verify.run(iterationOutput, numReducers); + if (retCode > 0) { + throw new RuntimeException("Verify.run failed with return code: " + retCode); + } + + if (!verify.verify(expectedNumNodes)) { + throw new RuntimeException("Verify.verify failed"); + } + + LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); + } + + /** + * The main test runner + *

+ * This test has 3 steps: + * 1: setupTablesAndReplication + * 2: generate the data into the source cluster + * 3: wait for replication to propagate + * 4: verify that the data is available in the sink cluster + *

+ * + * @param args should be empty + * @return 0 on success + * @throws Exception on an error + */ + @Override + public int run(String[] args) throws Exception { + source = new ClusterID(getConf(), sourceClusterIdString); + sink = new ClusterID(getConf(), sinkClusterIdString); + + setupTablesAndReplication(); + int expectedNumNodes = 0; + for (int i = 0; i < numIterations; i++) { + LOG.info("Starting iteration = " + i); + + expectedNumNodes += numMappers * numNodes; + + runGenerator(); + waitForReplication(); + runVerify(expectedNumNodes); + } + + return 0; + } + } + + @Override + protected void addOptions() { + super.addOptions(); + addRequiredOptWithArg("s", SOURCE_CLUSTER_OPT, "Cluster ID of the source cluster (e.g. localhost:2181:/hbase)"); + addRequiredOptWithArg("r", DEST_CLUSTER_OPT, "Cluster ID of the sink cluster (e.g. localhost:2182:/hbase)"); + addRequiredOptWithArg("d", OUTPUT_DIR_OPT, "temporary directory where to write keys for the test"); + + + addOptWithArg("nm", NUM_MAPPERS_OPT, "number of mappers (default: " + DEFAULT_NUM_MAPPERS + ")"); + addOptWithArg("nr", NUM_REDUCERS_OPT, "number of reducers (default: " + DEFAULT_NUM_MAPPERS + ")"); + addOptWithArg("n", NUM_NODES_OPT, + "number of nodes. This should be a multiple of width * wrapMultiplier. (default: " + + DEFAULT_NUM_NODES + ")"); + addOptWithArg("i", ITERATIONS_OPT, "number of iterations to run (default:" + DEFAULT_NUM_ITERATIONS + ")"); + addOptWithArg("t", GENERATE_VERIFY_GAP_OPT, "gap between generate and verify steps in seconds (default:" + DEFAULT_GENERATE_VERIFY_GAP + ")"); + addOptWithArg("w", WIDTH_OPT, "width of the linked list chain (default: " + DEFAULT_WIDTH + ")"); + addOptWithArg("wm", WRAP_MULTIPLIER_OPT, "how many times to wrap around (default: " + DEFAULT_WRAP_MULTIPLIER + ")"); + } + + @Override + protected void processOptions(CommandLine cmd) { + processBaseOptions(cmd); + + sourceClusterIdString = cmd.getOptionValue(SOURCE_CLUSTER_OPT); + sinkClusterIdString = cmd.getOptionValue(DEST_CLUSTER_OPT); + outputDir = cmd.getOptionValue(OUTPUT_DIR_OPT); + + numMappers = parseInt(cmd.getOptionValue(NUM_MAPPERS_OPT, Integer.toString(DEFAULT_NUM_MAPPERS)), 1, Integer.MAX_VALUE); + numReducers = parseInt(cmd.getOptionValue(NUM_REDUCERS_OPT, Integer.toString(DEFAULT_NUM_REDUCERS)), 1, Integer.MAX_VALUE); + numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)), 1, Integer.MAX_VALUE); + generateVerifyGap = parseInt(cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT, Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)), 1, Integer.MAX_VALUE); + numIterations = parseInt(cmd.getOptionValue(ITERATIONS_OPT, Integer.toString(DEFAULT_NUM_ITERATIONS)), 1, Integer.MAX_VALUE); + width = parseInt(cmd.getOptionValue(WIDTH_OPT, Integer.toString(DEFAULT_WIDTH)), 1, Integer.MAX_VALUE); + wrapMultiplier = parseInt(cmd.getOptionValue(WRAP_MULTIPLIER_OPT, Integer.toString(DEFAULT_WRAP_MULTIPLIER)), 1, Integer.MAX_VALUE); + + if (numNodes % (width * wrapMultiplier) != 0) { + throw new RuntimeException("numNodes must be a multiple of width and wrap multiplier"); + } + } + + @Override + public int runTestFromCommandLine() throws Exception { + VerifyReplicationLoop tool = new VerifyReplicationLoop(); + tool.it = this; + return ToolRunner.run(getConf(), tool, null); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestReplication(), args); + System.exit(ret); + } +} -- 2.2.2