Index: hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (revision 1352697) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (working copy) @@ -32,6 +32,8 @@ import java.util.Random; import java.util.TreeMap; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.lang.reflect.Constructor; @@ -79,6 +81,14 @@ import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; import org.apache.hadoop.util.LineReader; +import com.stumbleupon.async.Callback; +import com.stumbleupon.async.Deferred; +import org.hbase.async.GetRequest; +import org.hbase.async.HBaseClient; +import org.hbase.async.PleaseThrottleException; +import org.hbase.async.PutRequest; +import org.hbase.async.Scanner; + /** * Script used evaluating HBase performance and scalability. Runs a HBase * client that steps through one of a set of hardcoded tests or 'experiments' @@ -101,6 +111,7 @@ private static final int ROW_LENGTH = 1000; private static final int ONE_GB = 1024 * 1024 * 1000; private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; + private static final int CLIENT_THREADS = 3; public static final byte[] TABLE_NAME = Bytes.toBytes("TestTable"); public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); @@ -119,6 +130,7 @@ private boolean nomapred = false; private int N = 1; private int R = ROWS_PER_GB; + private int clientThreads = CLIENT_THREADS; private boolean flushCommits = true; private boolean writeToWAL = true; private int presplitRegions = 0; @@ -155,6 +167,8 @@ addCommandDescriptor(RandomReadTest.class, "randomRead", "Run random read test"); + addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead", + "Run random read test with asynchbase"); addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", "Run random seek and scan 100 test"); addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", @@ -167,12 +181,20 @@ "Run random seek scan with both start and stop row (max 10000 rows)"); addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); + addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite", + "Run random write test with asynchbase"); addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); + addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead", + "Run sequential read test with asynchbase"); addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); + addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite", + "Run sequential write test with asynchbase"); addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); + addCommandDescriptor(AsyncScanTest.class, "asyncScan", + "Run scan test with asynchbase (read every row)"); addCommandDescriptor(FilteredScanTest.class, "filterScan", "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)"); } @@ -448,7 +470,7 @@ long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), value.getRows(), value.getTotalRows(), value.isFlushCommits(), value.isWriteToWAL(), - status); + status, CLIENT_THREADS); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); @@ -551,7 +573,7 @@ public void setStatus(final String msg) throws IOException { LOG.info("client-" + getName() + " " + msg); } - }); + }, clientThreads); LOG.info("Finished " + getName() + " in " + elapsedTime + "ms writing " + perClientRows + " rows"); } catch (IOException e) { @@ -695,23 +717,30 @@ private byte[] tableName; private boolean flushCommits; private boolean writeToWAL = true; + private int numClientThreads; TestOptions() { } - TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, boolean flushCommits, boolean writeToWAL) { + TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, + boolean flushCommits, boolean writeToWAL, int numClientThreads) { this.startRow = startRow; this.perClientRunRows = perClientRunRows; this.totalRows = totalRows; this.tableName = tableName; this.flushCommits = flushCommits; this.writeToWAL = writeToWAL; + this.numClientThreads = numClientThreads; } public int getStartRow() { return startRow; } + public int getNumClientThreads() { + return numClientThreads; + } + public int getPerClientRunRows() { return perClientRunRows; } @@ -838,6 +867,178 @@ } } + static abstract class AsyncTest extends Test { + /** Maximum number of RPCs we're allowed in flight at a time. */ + private static final int MAX_OUTSTANDING_RPCS = 200000; // Sized for 2G heap. + + private static HBaseClient client; // Only one client regardless of number of threads. + + AsyncTest(final Configuration conf, final TestOptions options, final Status status) { + super(null, options, status); + final String zkquorum = conf.get(HConstants.ZOOKEEPER_QUORUM); + final String znode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + synchronized (AsyncTest.class) { + if (client == null) { + client = new HBaseClient(zkquorum, znode); + // Sanity check. + try { + client.ensureTableFamilyExists(TABLE_NAME, FAMILY_NAME).joinUninterruptibly(); + } catch (Exception e) { + throw new RuntimeException("Missing test table/family?", e); + } + } + } + latch = new CountDownLatch(super.perClientRunRows); + final int maxrpcs = MAX_OUTSTANDING_RPCS / options.getNumClientThreads(); + sem = new Semaphore(Math.max(100, maxrpcs)); + } + + /** + * If true, make sure that every read returns a valid-looking KeyValue. + */ + private static final boolean CHECK_READS = false; + + /** Checks that the row retrieved from HBase looks valid. */ + protected static void check(final ArrayList row) throws IOException { + if (!CHECK_READS) { + return; + } + if (row.size() != 1) { + throw new IOException((row.isEmpty() ? "No" : "Multiple (" + row.size() + ')') + + " KeyValue found in row"); + } else if (row.get(0).value().length != ROW_LENGTH) { + throw new IOException("Invalid value length (found: " + row.get(0).value().length + + ", expected: " + ROW_LENGTH + ") in row \"" + + new String(row.get(0).key()) + '"'); + } + } + + private Exception error = null; // Last exception caught asynchronously. + private volatile boolean failed = false; // True if we caught an exception asynchronously. + /** Used by sub-classes to handle asynchronous exceptions. */ + protected final Callback errback = new Callback() { + public Exception call(final Exception e) throws Exception { + rpcCompleted(); + if (e instanceof PleaseThrottleException) { + LOG.warn("Throttling thread " + Thread.currentThread().getName() + + ", HBase isn't keeping up", e); + final int permits = sem.drainPermits(); // Prevent creation of further RPCs. + ((PleaseThrottleException) e).getDeferred().addBoth(new Callback() { + public Object call(final Object arg) { + sem.release(permits); + LOG.warn("Done throttling thread " + Thread.currentThread().getName()); + return arg; + } + public String toString() { + return "error recovery after " + e; + } + }); + return null; + } + error = e; + failed = true; // Volatile-write. + LOG.error(this + " caught an exception", e); + return e; + } + + private final String toString = "errback for " + AsyncTest.this + " in " + Thread.currentThread().getName(); + public String toString() { + return toString; + } + }; + + /** + * Latch to guarantee we have gotten a response for every single RPC sent. + * This latch is initialized up with the number of RPCs we intend to send. + * Every time an RPC completes successfully, we decrement its count down + * by one. This way we guarantee that all RPCs have completed and their + * responses have been handled within the section of the code we're + * timing. + */ + private final CountDownLatch latch; + + /** + * Semaphore to control the number of outstanding RPCs. + * Because the producer code is synchronous and asynchbase is + * non-blocking, the tests will try to create and send all RPCs at once, + * thus running the app out of memory. In order to limit the number of + * RPCs in flight at the same time, we acquire a permit from this + * semaphore each time we access the client to send an RPC, and we release + * the permit each time the RPC completes. + */ + private final Semaphore sem; + + /** Records the completion of an RPC. */ + protected final void rpcCompleted() { + sem.release(); + latch.countDown(); + } + + /** Callback used on successful read RPCs. */ + protected final Callback> readCallback = new Callback>() { + public Object call(final ArrayList row) throws IOException { + rpcCompleted(); + check(row); + return row; + } + + private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName(); + public String toString() { + return toString; + } + }; + + /** Callback used on other successful RPCs. */ + protected final Callback callback = new Callback() { + public Object call(final Object arg) { + rpcCompleted(); + return arg; + } + + private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName(); + public String toString() { + return toString; + } + }; + + @Override + final void testSetup() { + // Nothing. + } + + @Override + final void testTakedown() throws IOException { + try { + // For tests with few writes, asking for a flush before waiting on the + // latch tells asynchbase to start flushing writes instead of waiting + // until the timer flushes them. + client.flush().join(); + latch.await(); // Make sure the last RPC completed. + if (failed) { // Volatile-read + throw error; + } + } catch (RuntimeException e) { + throw e; + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException("Uncaught exception from flush()", e); + } + } + + /** Returns the client to use to send an RPC. Call once per RPC. */ + protected final HBaseClient client() { + try { + sem.acquire(); + } catch (InterruptedException e) { + LOG.error("Shouldn't happen!", e); + return null; + } + return client; + } + } + @SuppressWarnings("unused") static class RandomSeekScanTest extends Test { RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { @@ -970,6 +1171,27 @@ } + static class AsyncRandomReadTest extends AsyncTest { + AsyncRandomReadTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) throws IOException { + final GetRequest get = new GetRequest(TABLE_NAME, getRandomRow(this.rand, this.totalRows)); + get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME); + + client().get(get).addCallback(readCallback).addErrback(errback); + } + + @Override + protected int getReportingPeriod() { + int period = this.perClientRunRows / 100; + return period == 0 ? this.perClientRunRows : period; + } + + } + static class RandomWriteTest extends Test { RandomWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -986,6 +1208,22 @@ } } + static class AsyncRandomWriteTest extends AsyncTest { + AsyncRandomWriteTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) { + final PutRequest put = new PutRequest(TABLE_NAME, getRandomRow(this.rand, this.totalRows), + FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand)); + put.setDurable(writeToWAL); + put.setBufferable(flushCommits); + client().put(put).addCallbacks(callback, errback); + } + + } + static class ScanTest extends Test { private ResultScanner testScanner; @@ -1019,6 +1257,50 @@ } + static class AsyncScanTest extends AsyncTest { + private final Scanner scanner; + private final Callback continueScan = new Callback>>() { + public Object call(final ArrayList> rows) throws Exception { + if (rows != null) { + testTimed(); + for (final ArrayList row : rows) { + int n = row.size(); + while (n-- >= 0) { + rpcCompleted(); + } + } + for (final ArrayList row : rows) { + check(row); // Do this separate as it might throw. + } + } // else arg is null, we're done scanning. + return rows; + } + public String toString() { + return "continueScan on " + scanner; + } + }; + + AsyncScanTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + scanner = client().newScanner(TABLE_NAME); + scanner.setStartKey(format(this.startRow)); + scanner.setFamily(FAMILY_NAME); + scanner.setQualifier(QUALIFIER_NAME); + } + + @Override + void testTimed() { + scanner.nextRows() + .addCallback(continueScan) + .addCallbacks(callback, errback); + } + + @Override + void testRow(final int i) { + // Unused because we completely redefined testTimed(). + } + } + static class SequentialReadTest extends Test { SequentialReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -1033,6 +1315,20 @@ } + static class AsyncSequentialReadTest extends AsyncTest { + AsyncSequentialReadTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) throws IOException { + final GetRequest get = new GetRequest(TABLE_NAME, format(i)); + get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME); + client().get(get).addCallback(readCallback).addErrback(errback); + } + + } + static class SequentialWriteTest extends Test { SequentialWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -1049,6 +1345,22 @@ } + static class AsyncSequentialWriteTest extends AsyncTest { + AsyncSequentialWriteTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) { + final PutRequest put = new PutRequest(TABLE_NAME, format(i), + FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand)); + put.setDurable(writeToWAL); + put.setBufferable(flushCommits); + client().put(put).addCallbacks(callback, errback); + } + + } + static class FilteredScanTest extends Test { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); @@ -1117,7 +1429,7 @@ long runOneClient(final Class cmd, final int startRow, final int perClientRunRows, final int totalRows, boolean flushCommits, boolean writeToWAL, - final Status status) + final Status status, final int numClientThreads) throws IOException { status.setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); @@ -1125,7 +1437,7 @@ Test t = null; TestOptions options = new TestOptions(startRow, perClientRunRows, - totalRows, getTableDescriptor().getName(), flushCommits, writeToWAL); + totalRows, getTableDescriptor().getName(), flushCommits, writeToWAL, numClientThreads); try { Constructor constructor = cmd.getDeclaredConstructor( Configuration.class, TestOptions.class, Status.class); @@ -1157,7 +1469,7 @@ admin = new HBaseAdmin(this.conf); checkTable(admin); runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, - status); + status, this.clientThreads); } catch (Exception e) { LOG.error("Failed", e); } @@ -1195,7 +1507,7 @@ runNIsMoreThanOne(cmd); } } finally { - if(this.miniCluster) { + if (this.miniCluster) { if (hbaseMiniCluster != null) hbaseMiniCluster.shutdown(); if (zooKeeperCluster != null) zooKeeperCluster.shutdown(); HBaseTestCase.shutdownDfs(dfsCluster); @@ -1286,6 +1598,12 @@ continue; } + final String threads = "--numClientThreads="; + if (cmd.startsWith(threads)) { + this.clientThreads = Integer.parseInt(cmd.substring(threads.length())); + continue; + } + final String flushCommits = "--flushCommits="; if (cmd.startsWith(flushCommits)) { this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); Index: hbase-server/pom.xml =================================================================== --- hbase-server/pom.xml (revision 1352697) +++ hbase-server/pom.xml (working copy) @@ -290,6 +290,11 @@ + io.netty + netty + 3.5.0.Final-SNAPSHOT + + com.yammer.metrics metrics-core @@ -450,6 +455,34 @@ stax stax-api + + org.hbase + asynchbase + [1.3.1,) + + + + org.slf4j + slf4j-api + + + + org.slf4j + jcl-over-slf4j + + + org.slf4j + log4j-over-slf4j + + + test + Index: pom.xml =================================================================== --- pom.xml (revision 1352660) +++ pom.xml (working copy) @@ -256,6 +256,10 @@ + cloudbees netty + http://repository-netty.forge.cloudbees.com/snapshot/ + + apache release https://repository.apache.org/content/repositories/releases/