diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java index 81327c3..6a6a710 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; import org.apache.hadoop.hbase.chaos.policies.Policy; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ToolRunner; @@ -221,42 +220,6 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase { return null; } - /** - * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}. - */ - private static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc) throws Exception { - admin.modifyTable(desc.getTableName(), desc); - Pair status = new Pair() {{ - setFirst(0); - setSecond(0); - }}; - for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500 seconds - status = admin.getAlterStatus(desc.getTableName()); - if (status.getSecond() != 0) { - LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond() - + " regions updated."); - Thread.sleep(1 * 1000l); - } else { - LOG.debug("All regions updated."); - } - } - if (status.getSecond() != 0) { - throw new Exception("Failed to update replica count after 500 seconds."); - } - } - - /** - * Set the number of Region replicas. - */ - private static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount) - throws Exception { - admin.disableTable(table); - HTableDescriptor desc = admin.getTableDescriptor(table); - desc.setRegionReplication(replicaCount); - modifyTableSync(admin, desc); - admin.enableTable(table); - } - public void test() throws Exception { int maxIters = 3; String mr = nomapred ? "--nomapred" : ""; @@ -293,7 +256,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase { // disable monkey, enable region replicas, enable monkey cleanUpMonkey("Altering table."); LOG.debug("Altering " + tableName + " replica count to " + replicaCount); - setReplicas(util.getHBaseAdmin(), tableName, replicaCount); + util.setReplicas(util.getHBaseAdmin(), tableName, replicaCount); setUpMonkey(); startMonkey(); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index dd4415b..65fc44d 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.junit.Assert.assertEquals; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -38,13 +36,19 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTests; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TestReplicasClient; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -69,6 +73,9 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test Bulk Load and MR on a distributed cluster. * It starts an MR job that creates linked chains @@ -99,15 +106,17 @@ import org.junit.experimental.categories.Category; * hbase.IntegrationTestBulkLoad.tableName * The name of the table. * + * hbase.IntegrationTestBulkLoad.replicaCount + * How many region replicas to configure for the table under test. */ @Category(IntegrationTests.class) public class IntegrationTestBulkLoad extends IntegrationTestBase { private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class); - private static byte[] CHAIN_FAM = Bytes.toBytes("L"); - private static byte[] SORT_FAM = Bytes.toBytes("S"); - private static byte[] DATA_FAM = Bytes.toBytes("D"); + private static final byte[] CHAIN_FAM = Bytes.toBytes("L"); + private static final byte[] SORT_FAM = Bytes.toBytes("S"); + private static final byte[] DATA_FAM = Bytes.toBytes("D"); private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength"; private static int CHAIN_LENGTH = 500000; @@ -123,9 +132,33 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName"; private static String TABLE_NAME = "IntegrationTestBulkLoad"; + private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount"; + private static int NUM_REPLICA_COUNT_DEFAULT = 1; + + private static final String OPT_LOAD = "load"; + private static final String OPT_CHECK = "check"; + + private boolean load = false; + private boolean check = false; + + /** + * Modify table {@code getTableName()} to carry {@link TestReplicasClient.SlowMeCopro}. + */ + private void installSlowingCoproc() throws IOException, InterruptedException { + int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); + if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; + + TableName t = TableName.valueOf(getTablename()); + HBaseAdmin admin = util.getHBaseAdmin(); + HTableDescriptor desc = admin.getTableDescriptor(t); + desc.addCoprocessor(TestReplicasClient.SlowMeCopro.class.getName()); + HBaseTestingUtility.modifyTableSync(admin, desc); + } + @Test public void testBulkLoad() throws Exception { runLoad(); + installSlowingCoproc(); runCheck(); } @@ -145,7 +178,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { return split.split(numRegions); } - private void setupTable() throws IOException { + private void setupTable() throws IOException, InterruptedException { if (util.getHBaseAdmin().tableExists(getTablename())) { util.deleteTable(getTablename()); } @@ -155,6 +188,12 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM}, getSplits(16) ); + + int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); + if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; + + TableName t = TableName.valueOf(getTablename()); + HBaseTestingUtility.setReplicas(util.getHBaseAdmin(), t, replicaCount); } private void runLinkedListMRJob(int iteration) throws Exception { @@ -556,23 +595,28 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { Path p = util.getDataTestDirOnTestFS(jobName); Job job = new Job(conf); - job.setJarByClass(getClass()); + job.setJobName(jobName); job.setPartitionerClass(NaturalKeyPartitioner.class); job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); job.setSortComparatorClass(CompositeKeyComparator.class); - Scan s = new Scan(); - s.addFamily(CHAIN_FAM); - s.addFamily(SORT_FAM); - s.setMaxVersions(1); - s.setCacheBlocks(false); - s.setBatch(1000); + Scan scan = new Scan(); + scan.addFamily(CHAIN_FAM); + scan.addFamily(SORT_FAM); + scan.setMaxVersions(1); + scan.setCacheBlocks(false); + scan.setBatch(1000); + + int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); + if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) { + scan.setConsistency(Consistency.TIMELINE); + } TableMapReduceUtil.initTableMapperJob( Bytes.toBytes(getTablename()), - new Scan(), + scan, LinkedListCheckingMapper.class, LinkKey.class, LinkChain.class, @@ -595,6 +639,10 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { public void setUpCluster() throws Exception { util = getTestingUtil(getConf()); util.initializeCluster(1); + int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); + if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) { + LOG.debug("Region Replicas enabled: " + replicaCount); + } // Scale this up on a real cluster if (util.isDistributedCluster()) { @@ -602,17 +650,29 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { Integer.toString(util.getHBaseAdmin().getClusterStatus().getServersSize() * 10) ); util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5"); + if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) { + // sanity check cluster + // TODO: this should reach out to master and verify online state instead + assertEquals("Master must be configured with StochasticLoadBalancer", + "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer", + util.getConfiguration().get("hbase.master.loadbalancer.class")); + // TODO: this should reach out to master and verify online state instead + assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.", + util.getConfiguration().getLong("hbase.regionserver.storefile.refresh.period", 0) > 0); + } } else { + if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) { + // modify minicluster + util.getConfiguration().set( + "hbase.master.loadbalancer.class", + "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer"); + util.getConfiguration().setIfUnset( + "hbase.regionserver.storefile.refresh.period", "30000"); + } util.startMiniMapReduceCluster(); } } - private static final String OPT_LOAD = "load"; - private static final String OPT_CHECK = "check"; - - private boolean load = false; - private boolean check = false; - @Override protected void addOptions() { super.addOptions(); @@ -632,6 +692,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { if (load) { runLoad(); } else if (check) { + installSlowingCoproc(); runCheck(); } else { testBulkLoad(); @@ -655,5 +716,4 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args); System.exit(status); } - -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 50f6d3f..4c75faf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.Threads; @@ -1444,6 +1445,44 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** + * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}. + */ + public static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc) + throws IOException, InterruptedException { + admin.modifyTable(desc.getTableName(), desc); + Pair status = new Pair() {{ + setFirst(0); + setSecond(0); + }}; + for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500 seconds + status = admin.getAlterStatus(desc.getTableName()); + if (status.getSecond() != 0) { + LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond() + + " regions updated."); + Thread.sleep(1 * 1000l); + } else { + LOG.debug("All regions updated."); + break; + } + } + if (status.getSecond() != 0) { + throw new IOException("Failed to update replica count after 500 seconds."); + } + } + + /** + * Set the number of Region replicas. + */ + public static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount) + throws IOException, InterruptedException { + admin.disableTable(table); + HTableDescriptor desc = admin.getTableDescriptor(table); + desc.setRegionReplication(replicaCount); + modifyTableSync(admin, desc); + admin.enableTable(table); + } + + /** * Drop an existing table * @param tableName existing table */