From e5ec49b123c4070355182f04397d8c8c347c9aff Mon Sep 17 00:00:00 2001 From: Devaraj Das Date: Fri, 23 May 2014 17:59:05 -0700 Subject: [PATCH 41/45] HBASE-10818. Add integration test for bulkload with replicas (Nick Dimiduk and Devaraj Das) --- .../hbase/IntegrationTestRegionReplicaPerf.java | 39 +------ .../hbase/mapreduce/IntegrationTestBulkLoad.java | 127 +++++++++++++++++---- ...nTestTimeBoundedRequestsWithRegionReplicas.java | 5 + .../hbase/mapreduce/TableRecordReaderImpl.java | 9 +- .../hbase/mapreduce/TableSnapshotInputFormat.java | 2 +- .../apache/hadoop/hbase/HBaseTestingUtility.java | 39 +++++++ 6 files changed, 158 insertions(+), 63 deletions(-) diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java index ca3a8f0..8ea27bf 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.chaos.policies.Policy; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ToolRunner; @@ -222,42 +221,6 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase { return null; } - /** - * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}. - */ - private static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc) throws Exception { - admin.modifyTable(desc.getTableName(), desc); - Pair status = new Pair() {{ - setFirst(0); - setSecond(0); - }}; - for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500 seconds - status = admin.getAlterStatus(desc.getTableName()); - if (status.getSecond() != 0) { - LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond() - + " regions updated."); - Thread.sleep(1 * 1000l); - } else { - LOG.debug("All regions updated."); - } - } - if (status.getSecond() != 0) { - throw new Exception("Failed to update replica count after 500 seconds."); - } - } - - /** - * Set the number of Region replicas. - */ - private static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount) - throws Exception { - admin.disableTable(table); - HTableDescriptor desc = admin.getTableDescriptor(table); - desc.setRegionReplication(replicaCount); - modifyTableSync(admin, desc); - admin.enableTable(table); - } - public void test() throws Exception { int maxIters = 3; String mr = nomapred ? "--nomapred" : ""; @@ -294,7 +257,7 @@ public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase { // disable monkey, enable region replicas, enable monkey cleanUpMonkey("Altering table."); LOG.debug("Altering " + tableName + " replica count to " + replicaCount); - setReplicas(util.getHBaseAdmin(), tableName, replicaCount); + util.setReplicas(util.getHBaseAdmin(), tableName, replicaCount); setUpMonkey(); startMonkey(); diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index dd4415b..4112014 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.junit.Assert.assertEquals; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -28,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.CommandLine; import org.apache.commons.lang.RandomStringUtils; @@ -38,14 +37,25 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.IntegrationTests; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.RegionSplitter; @@ -69,6 +79,9 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test Bulk Load and MR on a distributed cluster. * It starts an MR job that creates linked chains @@ -99,15 +112,17 @@ import org.junit.experimental.categories.Category; * hbase.IntegrationTestBulkLoad.tableName * The name of the table. * + * hbase.IntegrationTestBulkLoad.replicaCount + * How many region replicas to configure for the table under test. */ @Category(IntegrationTests.class) public class IntegrationTestBulkLoad extends IntegrationTestBase { private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class); - private static byte[] CHAIN_FAM = Bytes.toBytes("L"); - private static byte[] SORT_FAM = Bytes.toBytes("S"); - private static byte[] DATA_FAM = Bytes.toBytes("D"); + private static final byte[] CHAIN_FAM = Bytes.toBytes("L"); + private static final byte[] SORT_FAM = Bytes.toBytes("S"); + private static final byte[] DATA_FAM = Bytes.toBytes("D"); private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength"; private static int CHAIN_LENGTH = 500000; @@ -123,9 +138,73 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName"; private static String TABLE_NAME = "IntegrationTestBulkLoad"; + private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount"; + private static int NUM_REPLICA_COUNT_DEFAULT = 1; + + private static final String OPT_LOAD = "load"; + private static final String OPT_CHECK = "check"; + + private boolean load = false; + private boolean check = false; + + public static class SlowMeCoproScanOperations extends BaseRegionObserver { + static final AtomicLong sleepTime = new AtomicLong(2000); + Random r = new Random(); + AtomicLong countOfNext = new AtomicLong(0); + AtomicLong countOfOpen = new AtomicLong(0); + public SlowMeCoproScanOperations() {} + @Override + public RegionScanner preScannerOpen(final ObserverContext e, + final Scan scan, final RegionScanner s) throws IOException { + if (countOfOpen.incrementAndGet() % 4 == 0) { //slowdown openScanner randomly + slowdownCode(e); + } + return s; + } + + @Override + public boolean preScannerNext(final ObserverContext e, + final InternalScanner s, final List results, + final int limit, final boolean hasMore) throws IOException { + //this will slow down a certain next operation if the conditions are met. The slowness + //will allow the call to go to a replica + if (countOfNext.incrementAndGet() % 4 == 0) { + slowdownCode(e); + } + return true; + } + protected void slowdownCode(final ObserverContext e) { + if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) { + try { + if (sleepTime.get() > 0) { + LOG.info("Sleeping for " + sleepTime.get() + " ms"); + Thread.sleep(sleepTime.get()); + } + } catch (InterruptedException e1) { + LOG.error(e1); + } + } + } + } + + /** + * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}. + */ + private void installSlowingCoproc() throws IOException, InterruptedException { + int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); + if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; + + TableName t = TableName.valueOf(getTablename()); + HBaseAdmin admin = util.getHBaseAdmin(); + HTableDescriptor desc = admin.getTableDescriptor(t); + desc.addCoprocessor(SlowMeCoproScanOperations.class.getName()); + HBaseTestingUtility.modifyTableSync(admin, desc); + } + @Test public void testBulkLoad() throws Exception { runLoad(); + installSlowingCoproc(); runCheck(); } @@ -145,7 +224,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { return split.split(numRegions); } - private void setupTable() throws IOException { + private void setupTable() throws IOException, InterruptedException { if (util.getHBaseAdmin().tableExists(getTablename())) { util.deleteTable(getTablename()); } @@ -155,6 +234,12 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM}, getSplits(16) ); + + int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); + if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return; + + TableName t = TableName.valueOf(getTablename()); + HBaseTestingUtility.setReplicas(util.getHBaseAdmin(), t, replicaCount); } private void runLinkedListMRJob(int iteration) throws Exception { @@ -556,23 +641,23 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { Path p = util.getDataTestDirOnTestFS(jobName); Job job = new Job(conf); - job.setJarByClass(getClass()); + job.setJobName(jobName); job.setPartitionerClass(NaturalKeyPartitioner.class); job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); job.setSortComparatorClass(CompositeKeyComparator.class); - Scan s = new Scan(); - s.addFamily(CHAIN_FAM); - s.addFamily(SORT_FAM); - s.setMaxVersions(1); - s.setCacheBlocks(false); - s.setBatch(1000); + Scan scan = new Scan(); + scan.addFamily(CHAIN_FAM); + scan.addFamily(SORT_FAM); + scan.setMaxVersions(1); + scan.setCacheBlocks(false); + scan.setBatch(1000); TableMapReduceUtil.initTableMapperJob( Bytes.toBytes(getTablename()), - new Scan(), + scan, LinkedListCheckingMapper.class, LinkKey.class, LinkChain.class, @@ -595,6 +680,10 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { public void setUpCluster() throws Exception { util = getTestingUtil(getConf()); util.initializeCluster(1); + int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT); + if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) { + LOG.debug("Region Replicas enabled: " + replicaCount); + } // Scale this up on a real cluster if (util.isDistributedCluster()) { @@ -607,12 +696,6 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { } } - private static final String OPT_LOAD = "load"; - private static final String OPT_CHECK = "check"; - - private boolean load = false; - private boolean check = false; - @Override protected void addOptions() { super.addOptions(); @@ -632,6 +715,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { if (load) { runLoad(); } else if (check) { + installSlowingCoproc(); runCheck(); } else { testBulkLoad(); @@ -655,5 +739,4 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args); System.exit(status); } - -} \ No newline at end of file +} diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java index 66f3155..eb3bb70 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java @@ -234,6 +234,7 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr protected AtomicLong timedOutReads = new AtomicLong(); protected long runTime; protected Thread timeoutThread; + protected AtomicLong staleReads = new AtomicLong(); public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, double verifyPercent) throws IOException { @@ -263,6 +264,7 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr @Override protected String progressInfo() { StringBuilder builder = new StringBuilder(super.progressInfo()); + appendToStatus(builder, "stale_reads", staleReads.get()); appendToStatus(builder, "get_timeouts", timedOutReads.get()); return builder.toString(); } @@ -327,6 +329,9 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr Result[] results, HTableInterface table, boolean isNullExpected) throws IOException { super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected); + for (Result r : results) { + if (r.isStale()) staleReads.incrementAndGet(); + } // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC // to complete, but if the request took longer than timeout, we treat that as error. if (elapsedNano > timeoutNano) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 7eb7871..e8e6e8b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -65,6 +65,7 @@ public class TableRecordReaderImpl { private TaskAttemptContext context = null; private Method getCounter = null; private long numRestarts = 0; + private long numStale = 0; private long timestamp; private int rowcount; private boolean logScannerActivity = false; @@ -203,6 +204,7 @@ public class TableRecordReaderImpl { try { try { value = this.scanner.next(); + if (value != null && value.isStale()) numStale++; if (logScannerActivity) { rowcount ++; if (rowcount >= logPerRowCount) { @@ -230,6 +232,7 @@ public class TableRecordReaderImpl { scanner.next(); // skip presumed already mapped row } value = scanner.next(); + if (value != null && value.isStale()) numStale++; numRestarts++; } if (value != null && value.size() > 0) { @@ -270,11 +273,11 @@ public class TableRecordReaderImpl { ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics); - updateCounters(scanMetrics, numRestarts, getCounter, context); + updateCounters(scanMetrics, numRestarts, getCounter, context, numStale); } protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, - Method getCounter, TaskAttemptContext context) { + Method getCounter, TaskAttemptContext context, long numStale) { // we can get access to counters only if hbase uses new mapreduce APIs if (getCounter == null) { return; @@ -289,6 +292,8 @@ public class TableRecordReaderImpl { } ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); + ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, + "NUM_SCAN_RESULTS_STALE")).increment(numStale); } catch (Exception e) { LOG.debug("can't update counter." + StringUtils.stringifyException(e)); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java index f8d4d18..8071c56 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -152,7 +152,7 @@ public class TableSnapshotInputFormat extends InputFormat status = new Pair() {{ + setFirst(0); + setSecond(0); + }}; + for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500 seconds + status = admin.getAlterStatus(desc.getTableName()); + if (status.getSecond() != 0) { + LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond() + + " regions updated."); + Thread.sleep(1 * 1000l); + } else { + LOG.debug("All regions updated."); + break; + } + } + if (status.getSecond() != 0) { + throw new IOException("Failed to update replica count after 500 seconds."); + } + } + + /** + * Set the number of Region replicas. + */ + public static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount) + throws IOException, InterruptedException { + admin.disableTable(table); + HTableDescriptor desc = admin.getTableDescriptor(table); + desc.setRegionReplication(replicaCount); + modifyTableSync(admin, desc); + admin.enableTable(table); + } + + /** * Drop an existing table * @param tableName existing table */ -- 2.0.0