diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 2c5cb65..ddbf9bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -408,6 +408,17 @@ public class PerformanceEvaluation extends Configured implements Tool { return splits; } + static void setupConnectionCount(final TestOptions opts) { + if (opts.oneCon) { + opts.connCount = 1; + } else { + if (opts.connCount == -1) { + // set to thread number if connCount is not set + opts.connCount = opts.numClientThreads; + } + } + } + /* * Run all clients in this vm each to its own thread. */ @@ -420,13 +431,20 @@ public class PerformanceEvaluation extends Configured implements Tool { RunResult[] results = new RunResult[opts.numClientThreads]; ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); - final Connection con = ConnectionFactory.createConnection(conf); + setupConnectionCount(opts); + final Connection[] cons = new Connection[opts.connCount]; + for (int i = 0; i < opts.connCount; i++) { + cons[i] = ConnectionFactory.createConnection(conf); + } + LOG.info("Created " + opts.connCount + " connections for " + + opts.numClientThreads + " threads"); for (int i = 0; i < threads.length; i++) { final int index = i; threads[i] = pool.submit(new Callable() { @Override public RunResult call() throws Exception { TestOptions threadOpts = new TestOptions(opts); + final Connection con = cons[index % cons.length]; if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows; RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() { @Override @@ -454,15 +472,25 @@ public class PerformanceEvaluation extends Configured implements Tool { + Arrays.toString(results)); Arrays.sort(results); long total = 0; + float avgLatency = 0 ; + float avgTPS = 0; for (RunResult result : results) { total += result.duration; + avgLatency += result.hist.getSnapshot().getMean(); + avgTPS += opts.perClientRunRows * 1.0f / result.duration; } - LOG.info("[" + test + "]" + avgTPS *= 1000; // ms to second + avgLatency = avgLatency / results.length; + LOG.info("[" + test + " duration ]" + "\tMin: " + results[0] + "ms" + "\tMax: " + results[results.length - 1] + "ms" + "\tAvg: " + (total / results.length) + "ms"); + LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency)); + LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second"); + for (int i = 0; i < opts.connCount; i++) { + cons[i].close(); + } - con.close(); return results; } @@ -612,12 +640,14 @@ public class PerformanceEvaluation extends Configured implements Tool { String tableName = TABLE_NAME; boolean flushCommits = true; boolean writeToWAL = true; - boolean autoFlush = false; - boolean oneCon = false; + boolean autoFlush = false; //TODO: never used + boolean oneCon = false; //TODO: never used + int connCount = 1; //wil decide the actual num later boolean useTags = false; int noOfTags = 1; boolean reportLatency = false; int multiGet = 0; + int multiPut = 0; int randomSleep = 0; boolean inMemoryCF = false; int presplitRegions = 0; @@ -664,10 +694,12 @@ public class PerformanceEvaluation extends Configured implements Tool { this.writeToWAL = that.writeToWAL; this.autoFlush = that.autoFlush; this.oneCon = that.oneCon; + this.connCount = that.connCount; this.useTags = that.useTags; this.noOfTags = that.noOfTags; this.reportLatency = that.reportLatency; this.multiGet = that.multiGet; + this.multiPut = that.multiPut; this.inMemoryCF = that.inMemoryCF; this.presplitRegions = that.presplitRegions; this.replicas = that.replicas; @@ -812,6 +844,14 @@ public class PerformanceEvaluation extends Configured implements Tool { this.oneCon = oneCon; } + public int getConnCount() { + return connCount; + } + + public void setConnCount(int connCount) { + this.connCount = connCount; + } + public void setUseTags(boolean useTags) { this.useTags = useTags; } @@ -828,6 +868,10 @@ public class PerformanceEvaluation extends Configured implements Tool { this.multiGet = multiGet; } + public void setMultiPut(int multiPut) { + this.multiPut = multiPut; + } + public void setInMemoryCF(boolean inMemoryCF) { this.inMemoryCF = inMemoryCF; } @@ -932,6 +976,10 @@ public class PerformanceEvaluation extends Configured implements Tool { return multiGet; } + public int getMultiPut() { + return multiPut; + } + public boolean isInMemoryCF() { return inMemoryCF; } @@ -1119,10 +1167,14 @@ public class PerformanceEvaluation extends Configured implements Tool { latencyHistogram)); status.setStatus("Num measures (latency) : " + latencyHistogram.getCount()); status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram)); - status.setStatus("ValueSize (bytes) : " - + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); - status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); - status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); + if (valueSizeHistogram.getCount() > 0) { + status.setStatus("ValueSize (bytes) : " + + YammerHistogramUtils.getHistogramReport(valueSizeHistogram)); + status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount()); + status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram)); + } else { + status.setStatus("No valueSize statistics available"); + } } if (!opts.oneCon) { connection.close(); @@ -1169,17 +1221,19 @@ public class PerformanceEvaluation extends Configured implements Tool { for (int i = startRow; i < lastRow; i++) { if (i % everyN != 0) continue; long startTime = System.nanoTime(); + boolean requestSent = false; TraceScope scope = Trace.startSpan("test row", traceSampler); try { - testRow(i); + requestSent = testRow(i); } finally { scope.close(); } if ( (i - startRow) > opts.measureAfter) { - // If multiget is enabled, say set to 10, testRow() returns immediately first 9 times - // and sends the actual get request in the 10th iteration. We should only set latency - // when actual request is sent because otherwise it turns out to be 0. - if (opts.multiGet == 0 || (i - startRow + 1) % opts.multiGet == 0) { + // If multiget or multiput is enabled, say set to 10, testRow() returns immediately + // first 9 times and sends the actual get request in the 10th iteration. + // We should only set latency when actual request is sent because otherwise + // it turns out to be 0. + if (requestSent) { latencyHistogram.update((System.nanoTime() - startTime) / 1000); } if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { @@ -1204,11 +1258,15 @@ public class PerformanceEvaluation extends Configured implements Tool { return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram); } - /* - * Test for individual row. - * @param i Row index. - */ - abstract void testRow(final int i) throws IOException, InterruptedException; + + /** + * Test for individual row. + * @param i Row index. + * @return true if the row was sent to server and need to record metrics. + * False if not, like multiGet and multiPut, the rows are sent + * to server only if enough gets/puts are gathered. + */ + abstract boolean testRow(final int i) throws IOException, InterruptedException; } static abstract class TableTest extends Test { @@ -1229,7 +1287,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static abstract class BufferedMutatorTest extends Test { + static abstract class BufferedMutatorTest extends TableTest { protected BufferedMutator mutator; BufferedMutatorTest(Connection con, TestOptions options, Status status) { @@ -1238,11 +1296,13 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void onStartup() throws IOException { + super.onStartup(); this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName)); } @Override void onTakedown() throws IOException { + super.onTakedown(); mutator.close(); } } @@ -1253,7 +1313,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(final int i) throws IOException { + boolean testRow(final int i) throws IOException { Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows)) .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks) .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType); @@ -1273,6 +1333,7 @@ public class PerformanceEvaluation extends Configured implements Tool { updateValueSize(rr); } s.close(); + return true; } @Override @@ -1289,7 +1350,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(final int i) throws IOException { + boolean testRow(final int i) throws IOException { Pair startAndStopRow = getStartAndStopRow(); Scan scan = new Scan().withStartRow(startAndStopRow.getFirst()) .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching) @@ -1317,6 +1378,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } s.close(); + return true; } protected abstract Pair getStartAndStopRow(); @@ -1393,7 +1455,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(final int i) throws IOException, InterruptedException { + boolean testRow(final int i) throws IOException, InterruptedException { if (opts.randomSleep > 0) { Thread.sleep(rd.nextInt(opts.randomSleep)); } @@ -1414,10 +1476,13 @@ public class PerformanceEvaluation extends Configured implements Tool { Result [] rs = this.table.get(this.gets); updateValueSize(rs); this.gets.clear(); + } else { + return false; } } else { updateValueSize(this.table.get(get)); } + return true; } @Override @@ -1436,37 +1501,17 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomWriteTest extends BufferedMutatorTest { + static class RandomWriteTest extends SequentialWriteTest { RandomWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @Override - void testRow(final int i) throws IOException { - byte[] row = getRandomRow(this.rand, opts.totalRows); - Put put = new Put(row); - for (int column = 0; column < opts.columns; column++) { - byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); - byte[] value = generateData(this.rand, getValueLength(this.rand)); - if (opts.useTags) { - byte[] tag = generateData(this.rand, TAG_LENGTH); - Tag[] tags = new Tag[opts.noOfTags]; - for (int n = 0; n < opts.noOfTags; n++) { - Tag t = new ArrayBackedTag((byte) n, tag); - tags[n] = t; - } - KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP, - value, tags); - put.add(kv); - updateValueSize(kv.getValueLength()); - } else { - put.addColumn(FAMILY_NAME, qualifier, value); - updateValueSize(value.length); - } - } - put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - mutator.mutate(put); + protected byte[] generateRow(final int i) { + return getRandomRow(this.rand, opts.totalRows); } + + } static class ScanTest extends TableTest { @@ -1486,7 +1531,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override - void testRow(final int i) throws IOException { + boolean testRow(final int i) throws IOException { if (this.testScanner == null) { Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching) .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch) @@ -1503,6 +1548,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } Result r = testScanner.next(); updateValueSize(r); + return true; } } @@ -1542,10 +1588,11 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(final int i) throws IOException { + boolean testRow(final int i) throws IOException { Increment increment = new Increment(format(i)); increment.addColumn(FAMILY_NAME, getQualifier(), 1l); updateValueSize(this.table.increment(increment)); + return true; } } @@ -1555,11 +1602,12 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(final int i) throws IOException { + boolean testRow(final int i) throws IOException { byte [] bytes = format(i); Append append = new Append(bytes); append.add(FAMILY_NAME, getQualifier(), bytes); updateValueSize(this.table.append(append)); + return true; } } @@ -1569,7 +1617,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(final int i) throws IOException { + boolean testRow(final int i) throws IOException { byte [] bytes = format(i); // Put a known value so when we go to check it, it is there. Put put = new Put(bytes); @@ -1579,6 +1627,7 @@ public class PerformanceEvaluation extends Configured implements Tool { mutations.add(put); this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, mutations); + return true; } } @@ -1588,13 +1637,14 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(final int i) throws IOException { + boolean testRow(final int i) throws IOException { byte [] bytes = format(i); // Put a known value so when we go to check it, it is there. Put put = new Put(bytes); put.addColumn(FAMILY_NAME, getQualifier(), bytes); this.table.put(put); this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, put); + return true; } } @@ -1604,7 +1654,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(final int i) throws IOException { + boolean testRow(final int i) throws IOException { byte [] bytes = format(i); // Put a known value so when we go to check it, it is there. Put put = new Put(bytes); @@ -1613,6 +1663,7 @@ public class PerformanceEvaluation extends Configured implements Tool { Delete delete = new Delete(put.getRow()); delete.addColumn(FAMILY_NAME, getQualifier()); this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, delete); + return true; } } @@ -1622,7 +1673,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(final int i) throws IOException { + boolean testRow(final int i) throws IOException { Get get = new Get(format(i)); if (opts.addColumns) { get.addColumn(FAMILY_NAME, QUALIFIER_NAME); @@ -1631,17 +1682,29 @@ public class PerformanceEvaluation extends Configured implements Tool { get.setFilter(new FilterAllFilter()); } updateValueSize(table.get(get)); + return true; } } static class SequentialWriteTest extends BufferedMutatorTest { + private ArrayList puts; + + SequentialWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); + if (opts.multiPut > 0) { + LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + "."); + this.puts = new ArrayList<>(opts.multiPut); + } + } + + protected byte[] generateRow(final int i) { + return format(i); } @Override - void testRow(final int i) throws IOException { - byte[] row = format(i); + boolean testRow(final int i) throws IOException { + byte[] row = generateRow(i); Put put = new Put(row); for (int column = 0; column < opts.columns; column++) { byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column); @@ -1663,7 +1726,18 @@ public class PerformanceEvaluation extends Configured implements Tool { } } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - mutator.mutate(put); + if (opts.multiPut > 0) { + this.puts.add(put); + if (this.puts.size() == opts.multiPut) { + table.put(this.puts); + this.puts.clear(); + } else { + return false; + } + } else { + mutator.mutate(put); + } + return true; } } @@ -1675,7 +1749,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @Override - void testRow(int i) throws IOException { + boolean testRow(int i) throws IOException { byte[] value = generateData(this.rand, getValueLength(this.rand)); Scan scan = constructScan(value); ResultScanner scanner = null; @@ -1687,6 +1761,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } finally { if (scanner != null) scanner.close(); } + return true; } protected Scan constructScan(byte[] valuePrefix) throws IOException { @@ -1858,6 +1933,9 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); System.err.println(" oneCon all the threads share the same connection. Default: False"); + System.err.println(" connCount connections all threads share. " + + "For example, if set to 2, then all thread share 2 connection. " + + "Default: 1"); System.err.println(" sampleRate Execute test on a sample of total " + "rows. Only supported by randomRead. Default: 1.0"); System.err.println(" period Report every 'period' rows: " + @@ -1910,6 +1988,8 @@ public class PerformanceEvaluation extends Configured implements Tool { + " performance. Uses FilterAllFilter internally. "); System.err.println(" multiGet Batch gets together into groups of N. Only supported " + "by randomRead. Default: disabled"); + System.err.println(" multiPut Batch puts together into groups of N. Only supported " + + "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0"); System.err.println(" inmemory Tries to keep the HFiles of the CF " + "inmemory as far as possible. Not guaranteed that reads are always served " + "from memory. Default: false"); @@ -2044,12 +2124,38 @@ public class PerformanceEvaluation extends Configured implements Tool { final String autoFlush = "--autoFlush="; if (cmd.startsWith(autoFlush)) { opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length())); + if (!opts.autoFlush && opts.multiPut > 0) { + throw new IllegalStateException("autoFlush must be true when multiPut is more than 0"); + } + continue; + } + + final String multiPut = "--multiPut="; + if (cmd.startsWith(multiPut)) { + opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length())); + if (!opts.autoFlush && opts.multiPut > 0) { + throw new IllegalStateException("autoFlush must be true when multiPut is more than 0"); + } continue; } final String onceCon = "--oneCon="; if (cmd.startsWith(onceCon)) { opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length())); + if (opts.oneCon && opts.connCount > 1) { + throw new IllegalStateException("oneCon is set to true, " + + "connCount should not bigger than 1"); + } + continue; + } + + final String connCount = "--connCount="; + if (cmd.startsWith(connCount)) { + opts.connCount = Integer.parseInt(cmd.substring(connCount.length())); + if (opts.oneCon && opts.connCount > 1) { + throw new IllegalStateException("oneCon is set to true, " + + "connCount should not bigger than 1"); + } continue; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java index 86a3d3f..c00a7ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java @@ -215,4 +215,50 @@ public class TestPerformanceEvaluation { assertTrue(e.getCause() instanceof NoSuchElementException); } } + + @Test + public void testParseOptsMultiPuts() { + Queue opts = new LinkedList<>(); + String cmdName = "sequentialWrite"; + opts.offer("--multiPut=10"); + opts.offer(cmdName); + opts.offer("64"); + PerformanceEvaluation.TestOptions options = null; + try { + options = PerformanceEvaluation.parseOpts(opts); + fail("should fail"); + } catch (IllegalStateException e) { + System.out.println(e.getMessage()); + } + ((LinkedList) opts).offerFirst("--multiPut=10"); + ((LinkedList) opts).offerFirst("--autoFlush=true"); + options = PerformanceEvaluation.parseOpts(opts); + assertNotNull(options); + assertNotNull(options.getCmdName()); + assertEquals(cmdName, options.getCmdName()); + assertTrue(options.getMultiPut() == 10); + } + + @Test + public void testParseOptsConCount() { + Queue opts = new LinkedList<>(); + String cmdName = "sequentialWrite"; + opts.offer("--oneCon=true"); + opts.offer("--connCount=10"); + opts.offer(cmdName); + opts.offer("64"); + PerformanceEvaluation.TestOptions options = null; + try { + options = PerformanceEvaluation.parseOpts(opts); + fail("should fail"); + } catch (IllegalStateException e) { + System.out.println(e.getMessage()); + } + ((LinkedList) opts).offerFirst("--connCount=10"); + options = PerformanceEvaluation.parseOpts(opts); + assertNotNull(options); + assertNotNull(options.getCmdName()); + assertEquals(cmdName, options.getCmdName()); + assertTrue(options.getConnCount() == 10); + } }