From 6c397f9920b1e8d7689edc427104e06f530d4c48 Mon Sep 17 00:00:00 2001 From: Benoit Sigoure Date: Tue, 29 Oct 2013 03:20:07 -0700 Subject: [PATCH] AsyncHBase PerformanceEvaluation. --- hbase-server/pom.xml | 28 ++ .../apache/hadoop/hbase/PerformanceEvaluation.java | 311 ++++++++++++++++++++- 2 files changed, 337 insertions(+), 2 deletions(-) diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 3c6fd7d..e09aab5 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -434,6 +434,34 @@ javax.xml.bind jaxb-api + + org.hbase + asynchbase + [1.5.0,) + + + + org.slf4j + slf4j-api + + + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + + + test + org.cloudera.htrace diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 8272ac0..eb860df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -32,10 +32,14 @@ import java.util.Map; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; @@ -81,6 +85,14 @@ import org.codehaus.jackson.map.ObjectMapper; import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY; +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; +import org.hbase.async.GetRequest; +import org.hbase.async.HBaseClient; +import org.hbase.async.PleaseThrottleException; +import org.hbase.async.PutRequest; +import org.hbase.async.Scanner; + /** * Script used evaluating HBase performance and scalability. Runs a HBase * client that steps through one of a set of hardcoded tests or 'experiments' @@ -138,6 +150,8 @@ public class PerformanceEvaluation extends Configured implements Tool { addCommandDescriptor(RandomReadTest.class, "randomRead", "Run random read test"); + addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", + "Run random read test with AsyncHBase"); addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", "Run random seek and scan 100 test"); addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", @@ -150,12 +164,20 @@ public class PerformanceEvaluation extends Configured implements Tool { "Run random seek scan with both start and stop row (max 10000 rows)"); addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); + addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", + "Run random write test with AsyncHBase"); addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); + addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", + "Run sequential read test with AsyncHBase"); addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); + addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", + "Run sequential write test with AsyncHBase"); addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); + addCommandDescriptor(AsyncScanTest.class, "asyncScan", + "Run scan test with AsyncHBase (read every row)"); addCommandDescriptor(FilteredScanTest.class, "filterScan", "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)"); } @@ -605,6 +627,181 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException; } + static abstract class AsyncTest extends Test { + /** Maximum number of RPCs we're allowed in flight at a time. */ + private static final int MAX_OUTSTANDING_RPCS = 200000; // Sized for 2G heap. + + private static HBaseClient client; // Only one client regardless of number of threads. + + protected final byte[] tableBytes; + + AsyncTest(final Configuration conf, final TestOptions options, final Status status) { + super(null, options, status); + final String zkquorum = conf.get(HConstants.ZOOKEEPER_QUORUM); + final String znode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + this.tableBytes = tableName.getName(); + synchronized (AsyncTest.class) { + if (client == null) { + client = new HBaseClient(zkquorum, znode); + // Sanity check. + try { + client.ensureTableFamilyExists(tableBytes, FAMILY_NAME).joinUninterruptibly(); + } catch (Exception e) { + throw new RuntimeException("Missing test table/family?", e); + } + } + } + latch = new CountDownLatch(super.perClientRunRows); + final int maxrpcs = MAX_OUTSTANDING_RPCS / options.getNumClientThreads(); + sem = new Semaphore(Math.max(100, maxrpcs)); + } + + /** + * If true, make sure that every read returns a valid-looking KeyValue. + */ + private static final boolean CHECK_READS = false; + + /** Checks that the row retrieved from HBase looks valid. */ + protected static void check(final ArrayList row) throws IOException { + if (!CHECK_READS) { + return; + } + if (row.size() != 1) { + throw new IOException((row.isEmpty() ? "No" : "Multiple (" + row.size() + ')') + + " KeyValue found in row"); + } else if (row.get(0).value().length != VALUE_LENGTH) { + throw new IOException("Invalid value length (found: " + row.get(0).value().length + + ", expected: " + VALUE_LENGTH + ") in row \"" + + new String(row.get(0).key()) + '"'); + } + } + + private Exception error = null; // Last exception caught asynchronously. + private volatile boolean failed = false; // True if we caught an exception asynchronously. + /** Used by sub-classes to handle asynchronous exceptions. */ + protected final Callback errback = new Callback() { + public Exception call(final Exception e) throws Exception { + rpcCompleted(); + if (e instanceof PleaseThrottleException) { + LOG.warn("Throttling thread " + Thread.currentThread().getName() + + ", HBase isn't keeping up", e); + final int permits = sem.drainPermits(); // Prevent creation of further RPCs. + final Deferred d = (Deferred) ((PleaseThrottleException) e).getDeferred(); + ((Deferred) d).addBoth(new Callback() { + public Object call(final Object arg) { + sem.release(permits); + LOG.warn("Done throttling thread " + Thread.currentThread().getName()); + return arg; + } + public String toString() { + return "error recovery after " + e; + } + }); + return null; + } + error = e; + failed = true; // Volatile-write. + LOG.error(this + " caught an exception", e); + return e; + } + + private final String toString = "errback for " + AsyncTest.this + " in " + Thread.currentThread().getName(); + public String toString() { + return toString; + } + }; + + /** + * Latch to guarantee we have gotten a response for every single RPC sent. + * This latch is initialized up with the number of RPCs we intend to send. + * Every time an RPC completes successfully, we decrement its count down + * by one. This way we guarantee that all RPCs have completed and their + * responses have been handled within the section of the code we're + * timing. + */ + private final CountDownLatch latch; + + /** + * Semaphore to control the number of outstanding RPCs. + * Because the producer code is synchronous and asynchbase is + * non-blocking, the tests will try to create and send all RPCs at once, + * thus running the app out of memory. In order to limit the number of + * RPCs in flight at the same time, we acquire a permit from this + * semaphore each time we access the client to send an RPC, and we release + * the permit each time the RPC completes. + */ + private final Semaphore sem; + + /** Records the completion of an RPC. */ + protected final void rpcCompleted() { + sem.release(); + latch.countDown(); + } + + /** Callback used on successful read RPCs. */ + protected final Callback> readCallback = new Callback>() { + public Object call(final ArrayList row) throws IOException { + rpcCompleted(); + check(row); + return row; + } + + private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName(); + public String toString() { + return toString; + } + }; + + /** Callback used on other successful RPCs. */ + protected final Callback callback = new Callback() { + public Object call(final Object arg) { + rpcCompleted(); + return arg; + } + + private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName(); + public String toString() { + return toString; + } + }; + + @Override + final void testSetup() { + // Nothing. + } + + @Override + final void testTakedown() throws IOException { + try { + // For tests with few writes, asking for a flush before waiting on the + // latch tells asynchbase to start flushing writes instead of waiting + // until the timer flushes them. + client.flush().join(); + latch.await(); // Make sure the last RPC completed. + if (failed) { // Volatile-read + throw error; + } + } catch (RuntimeException e) { + throw e; + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException("Uncaught exception from flush()", e); + } + } + + /** Returns the client to use to send an RPC. Call once per RPC. */ + protected final HBaseClient client() { + try { + sem.acquire(); + } catch (InterruptedException e) { + LOG.error("Shouldn't happen!", e); + return null; + } + return client; + } + } @SuppressWarnings("unused") static class RandomSeekScanTest extends Test { @@ -795,6 +992,27 @@ public class PerformanceEvaluation extends Configured implements Tool { } } + static class AsyncRandomReadTest extends AsyncTest { + AsyncRandomReadTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) throws IOException { + final GetRequest get = new GetRequest(tableBytes, getRandomRow(this.rand, this.totalRows)); + get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME); + + client().get(get).addCallback(readCallback).addErrback(errback); + } + + @Override + protected int getReportingPeriod() { + int period = this.perClientRunRows / 100; + return period == 0 ? this.perClientRunRows : period; + } + + } + static class RandomWriteTest extends Test { RandomWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -811,6 +1029,21 @@ public class PerformanceEvaluation extends Configured implements Tool { } } + static class AsyncRandomWriteTest extends AsyncTest { + AsyncRandomWriteTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) { + final PutRequest put = new PutRequest(tableBytes, getRandomRow(this.rand, this.totalRows), + FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand)); + put.setDurable(writeToWAL); + put.setBufferable(flushCommits); + client().put(put).addCallbacks(callback, errback); + } + + } static class ScanTest extends Test { private ResultScanner testScanner; @@ -841,6 +1074,50 @@ public class PerformanceEvaluation extends Configured implements Tool { } + static class AsyncScanTest extends AsyncTest { + private final Scanner scanner; + private final Callback continueScan = new Callback>>() { + public Object call(final ArrayList> rows) throws Exception { + if (rows != null) { + testTimed(); + for (final ArrayList row : rows) { + int n = row.size(); + while (n-- >= 0) { + rpcCompleted(); + } + } + for (final ArrayList row : rows) { + check(row); // Do this separate as it might throw. + } + } // else arg is null, we're done scanning. + return rows; + } + public String toString() { + return "continueScan on " + scanner; + } + }; + + AsyncScanTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + scanner = client().newScanner(tableBytes); + scanner.setStartKey(format(this.startRow)); + scanner.setFamily(FAMILY_NAME); + scanner.setQualifier(QUALIFIER_NAME); + } + + @Override + void testTimed() { + scanner.nextRows() + .addCallback(continueScan) + .addCallbacks(callback, errback); + } + + @Override + void testRow(final int i) { + // Unused because we completely redefined testTimed(). + } + } + static class SequentialReadTest extends Test { SequentialReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -854,6 +1131,20 @@ public class PerformanceEvaluation extends Configured implements Tool { } } + static class AsyncSequentialReadTest extends AsyncTest { + AsyncSequentialReadTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) throws IOException { + final GetRequest get = new GetRequest(tableBytes, format(i)); + get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME); + client().get(get).addCallback(readCallback).addErrback(errback); + } + + } + static class SequentialWriteTest extends Test { SequentialWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -869,6 +1160,22 @@ public class PerformanceEvaluation extends Configured implements Tool { } } + static class AsyncSequentialWriteTest extends AsyncTest { + AsyncSequentialWriteTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) { + final PutRequest put = new PutRequest(tableBytes, format(i), + FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand)); + put.setDurable(writeToWAL); + put.setBufferable(flushCommits); + client().put(put).addCallbacks(callback, errback); + } + + } + static class FilteredScanTest extends Test { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); @@ -1153,7 +1460,7 @@ public class PerformanceEvaluation extends Configured implements Tool { opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); continue; } - + final String inMemory = "--inmemory="; if (cmd.startsWith(inMemory)) { opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); @@ -1171,7 +1478,7 @@ public class PerformanceEvaluation extends Configured implements Tool { opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); continue; } - + Class cmdClass = determineCommandClass(cmd); if (cmdClass != null) { opts.numClientThreads = getNumClients(i + 1, args); -- 1.9.2.460.gfb82504