From d7080192838af631d25b119f90b612d05cd5c635 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Tue, 19 Nov 2013 17:19:03 -0800 Subject: [PATCH] HBASE-10007 PerformanceEval: add sampling, latency collection and throughput summary --- .../apache/hadoop/hbase/PerformanceEvaluation.java | 247 ++++++++++++++------- 1 file changed, 170 insertions(+), 77 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index fcbf0ff..f4df6db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -22,7 +22,9 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.PrintStream; -import java.io.File; +import java.math.BigDecimal; +import java.math.MathContext; +import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; @@ -83,7 +85,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.LineReader; - /** * Script used evaluating HBase performance and scalability. Runs a HBase * client that steps through one of a set of hardcoded tests or 'experiments' @@ -103,46 +104,50 @@ import org.apache.hadoop.util.LineReader; public class PerformanceEvaluation extends Configured implements Tool { protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); - private static final int DEFAULT_ROW_PREFIX_LENGTH = 16; - private static final int VALUE_LENGTH = 1000; - private static final int ONE_GB = 1024 * 1024 * 1000; - private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH; - - public static final byte[] COMPRESSION = Bytes.toBytes("NONE"); public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); + public static final int VALUE_LENGTH = 1000; + public static final int ROW_LENGTH = 26; - protected HTableDescriptor TABLE_DESCRIPTOR; + private static final int ONE_GB = 1024 * 1024 * 1000; + private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH; + private static final DecimalFormat FMT = new DecimalFormat("0.##"); + private static final MathContext CXT = MathContext.DECIMAL64; + private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); + private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); + protected HTableDescriptor TABLE_DESCRIPTOR; protected Map commands = new TreeMap(); private boolean nomapred = false; - private int rowPrefixLength = DEFAULT_ROW_PREFIX_LENGTH; private int N = 1; private int R = ROWS_PER_GB; + private float sampleRate = 1.0f; private TableName tableName = TABLE_NAME; private Compression.Algorithm compression = Compression.Algorithm.NONE; private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; private boolean flushCommits = true; private boolean writeToWAL = true; private boolean inMemoryCF = false; + private boolean reportLatency = false; private int presplitRegions = 0; private HConnection connection; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); - /** - * Regex to parse lines in input file passed to mapreduce task. - */ + + /** Regex to parse lines in input file passed to mapreduce task. */ public static final Pattern LINE_PATTERN = Pattern.compile("tableName=(\\w+),\\s+" + "startRow=(\\d+),\\s+" + "perClientRunRows=(\\d+),\\s+" + "totalRows=(\\d+),\\s+" + + "sampleRate=([-+]?[0-9]*\\.?[0-9]+),\\s+" + "clients=(\\d+),\\s+" + "flushCommits=(\\w+),\\s+" + - "writeToWAL=(\\w+)"); + "writeToWAL=(\\w+),\\s+" + + "reportLatency=(\\w+)"); /** * Enum for map metrics. Keep it out here rather than inside in the Map @@ -154,7 +159,6 @@ public class PerformanceEvaluation extends Configured implements Tool { /** number of rows */ ROWS} - /** * Constructor * @param conf Configuration object @@ -216,28 +220,25 @@ public class PerformanceEvaluation extends Configured implements Tool { private int startRow = 0; private int rows = 0; private int totalRows = 0; + private float sampleRate = 1.0f; private int clients = 0; private boolean flushCommits = false; private boolean writeToWAL = true; + private boolean reportLatency = false; - public PeInputSplit() { - this.startRow = 0; - this.rows = 0; - this.totalRows = 0; - this.clients = 0; - this.flushCommits = false; - this.writeToWAL = true; - } + public PeInputSplit() {} - public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients, - boolean flushCommits, boolean writeToWAL) { + public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, float sampleRate, + int clients, boolean flushCommits, boolean writeToWAL, boolean reportLatency) { this.tableName = tableName; this.startRow = startRow; this.rows = rows; this.totalRows = totalRows; + this.sampleRate = sampleRate; this.clients = clients; this.flushCommits = flushCommits; this.writeToWAL = writeToWAL; + this.reportLatency = reportLatency; } @Override @@ -250,9 +251,11 @@ public class PerformanceEvaluation extends Configured implements Tool { this.startRow = in.readInt(); this.rows = in.readInt(); this.totalRows = in.readInt(); + this.sampleRate = in.readFloat(); this.clients = in.readInt(); this.flushCommits = in.readBoolean(); this.writeToWAL = in.readBoolean(); + this.reportLatency = in.readBoolean(); } @Override @@ -263,9 +266,11 @@ public class PerformanceEvaluation extends Configured implements Tool { out.writeInt(startRow); out.writeInt(rows); out.writeInt(totalRows); + out.writeFloat(sampleRate); out.writeInt(clients); out.writeBoolean(flushCommits); out.writeBoolean(writeToWAL); + out.writeBoolean(reportLatency); } @Override @@ -294,6 +299,10 @@ public class PerformanceEvaluation extends Configured implements Tool { return totalRows; } + public float getSampleRate() { + return sampleRate; + } + public int getClients() { return clients; } @@ -305,6 +314,10 @@ public class PerformanceEvaluation extends Configured implements Tool { public boolean isWriteToWAL() { return writeToWAL; } + + public boolean isReportLatency() { + return reportLatency; + } } /** @@ -339,22 +352,26 @@ public class PerformanceEvaluation extends Configured implements Tool { int startRow = Integer.parseInt(m.group(2)); int rows = Integer.parseInt(m.group(3)); int totalRows = Integer.parseInt(m.group(4)); - int clients = Integer.parseInt(m.group(5)); - boolean flushCommits = Boolean.parseBoolean(m.group(6)); - boolean writeToWAL = Boolean.parseBoolean(m.group(7)); + float sampleRate = Float.parseFloat(m.group(5)); + int clients = Integer.parseInt(m.group(6)); + boolean flushCommits = Boolean.parseBoolean(m.group(7)); + boolean writeToWAL = Boolean.parseBoolean(m.group(8)); + boolean reportLatency = Boolean.parseBoolean(m.group(9)); LOG.debug("tableName=" + tableName + " split["+ splitList.size() + "] " + " startRow=" + startRow + " rows=" + rows + " totalRows=" + totalRows + + " sampleRate=" + sampleRate + " clients=" + clients + " flushCommits=" + flushCommits + - " writeToWAL=" + writeToWAL); + " writeToWAL=" + writeToWAL + + " reportLatency=" + reportLatency); PeInputSplit newSplit = - new PeInputSplit(tableName, startRow, rows, totalRows, clients, - flushCommits, writeToWAL); + new PeInputSplit(tableName, startRow, rows, totalRows, sampleRate, clients, + flushCommits, writeToWAL, reportLatency); splitList.add(newSplit); } } @@ -391,7 +408,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } key = NullWritable.get(); - value = (PeInputSplit)split; + value = split; readOver = true; return true; @@ -475,9 +492,9 @@ public class PerformanceEvaluation extends Configured implements Tool { // Evaluation task pe.tableName = value.getTableName(); long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), - value.getRows(), value.getTotalRows(), - value.isFlushCommits(), value.isWriteToWAL(), - HConnectionManager.createConnection(context.getConfiguration()), status); + value.getRows(), value.getTotalRows(), value.getSampleRate(), + value.isFlushCommits(), value.isWriteToWAL(), value.isReportLatency(), + HConnectionManager.createConnection(context.getConfiguration()), status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); @@ -516,8 +533,7 @@ public class PerformanceEvaluation extends Configured implements Tool { LOG.info("Table " + tableDescriptor + " created"); } } - boolean tableExists = admin.tableExists(tableDescriptor.getTableName()); - return tableExists; + return admin.tableExists(tableDescriptor.getTableName()); } protected HTableDescriptor getTableDescriptor() { @@ -578,11 +594,13 @@ public class PerformanceEvaluation extends Configured implements Tool { final List threads = new ArrayList(this.N); final long[] timings = new long[this.N]; final int perClientRows = R/N; + final float sampleRate = this.sampleRate; final TableName tableName = this.tableName; final DataBlockEncoding encoding = this.blockEncoding; final boolean flushCommits = this.flushCommits; final Compression.Algorithm compression = this.compression; final boolean writeToWal = this.writeToWAL; + final boolean reportLatency = this.reportLatency; final int preSplitRegions = this.presplitRegions; final HConnection connection = HConnectionManager.createConnection(getConf()); for (int i = 0; i < this.N; i++) { @@ -599,11 +617,13 @@ public class PerformanceEvaluation extends Configured implements Tool { pe.writeToWAL = writeToWal; pe.presplitRegions = preSplitRegions; pe.N = N; + pe.sampleRate = sampleRate; + pe.reportLatency = reportLatency; pe.connection = connection; try { long elapsedTime = pe.runOneClient(cmd, index * perClientRows, - perClientRows, R, - flushCommits, writeToWAL, connection, new Status() { + perClientRows, R, sampleRate, + flushCommits, writeToWAL, reportLatency, connection, new Status() { public void setStatus(final String msg) throws IOException { LOG.info("client-" + getName() + " " + msg); } @@ -712,9 +732,11 @@ public class PerformanceEvaluation extends Configured implements Tool { ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + ", perClientRunRows=" + (perClientRows / 10) + ", totalRows=" + this.R + + ", sampleRate=" + this.sampleRate + ", clients=" + this.N + ", flushCommits=" + this.flushCommits + - ", writeToWAL=" + this.writeToWAL; + ", writeToWAL=" + this.writeToWAL + + ", reportLatency=" + this.reportLatency; int hash = h.hash(Bytes.toBytes(s)); m.put(hash, s); } @@ -763,25 +785,26 @@ public class PerformanceEvaluation extends Configured implements Tool { private int startRow; private int perClientRunRows; private int totalRows; + private float sampleRate; private int numClientThreads; private TableName tableName; private boolean flushCommits; private boolean writeToWAL = true; + private boolean reportLatency; private HConnection connection; - TestOptions() { - } - - TestOptions(int startRow, int perClientRunRows, int totalRows, - int numClientThreads, TableName tableName, - boolean flushCommits, boolean writeToWAL, HConnection connection) { + TestOptions(int startRow, int perClientRunRows, int totalRows, float sampleRate, + int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL, + boolean reportLatency, HConnection connection) { this.startRow = startRow; this.perClientRunRows = perClientRunRows; this.totalRows = totalRows; + this.sampleRate = sampleRate; this.numClientThreads = numClientThreads; this.tableName = tableName; this.flushCommits = flushCommits; this.writeToWAL = writeToWAL; + this.reportLatency = reportLatency; this.connection = connection; } @@ -797,6 +820,10 @@ public class PerformanceEvaluation extends Configured implements Tool { return totalRows; } + public float getSampleRate() { + return sampleRate; + } + public int getNumClientThreads() { return numClientThreads; } @@ -813,6 +840,10 @@ public class PerformanceEvaluation extends Configured implements Tool { return writeToWAL; } + public boolean isReportLatency() { + return reportLatency; + } + public HConnection getConnection() { return connection; } @@ -835,12 +866,14 @@ public class PerformanceEvaluation extends Configured implements Tool { protected final int startRow; protected final int perClientRunRows; protected final int totalRows; + protected final float sampleRate; private final Status status; protected TableName tableName; protected HTableInterface table; protected volatile Configuration conf; protected boolean flushCommits; protected boolean writeToWAL; + protected boolean reportLatency; protected HConnection connection; /** @@ -852,12 +885,14 @@ public class PerformanceEvaluation extends Configured implements Tool { this.startRow = options.getStartRow(); this.perClientRunRows = options.getPerClientRunRows(); this.totalRows = options.getTotalRows(); + this.sampleRate = options.getSampleRate(); this.status = status; this.tableName = options.getTableName(); this.table = null; this.conf = conf; this.flushCommits = options.isFlushCommits(); this.writeToWAL = options.isWriteToWAL(); + this.reportLatency = options.isReportLatency(); this.connection = options.getConnection(); } @@ -867,7 +902,7 @@ public class PerformanceEvaluation extends Configured implements Tool { protected int getReportingPeriod() { int period = this.perClientRunRows / 10; - return period == 0? this.perClientRunRows: period; + return period == 0 ? this.perClientRunRows : period; } void testSetup() throws IOException { @@ -933,17 +968,14 @@ public class PerformanceEvaluation extends Configured implements Tool { scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setFilter(new WhileMatchFilter(new PageFilter(120))); ResultScanner s = this.table.getScanner(scan); - //int count = 0; - for (Result rr = null; (rr = s.next()) != null;) { - // LOG.info("" + count++ + " " + rr.toString()); - } + for (Result rr; (rr = s.next()) != null;) ; s.close(); } @Override protected int getReportingPeriod() { int period = this.perClientRunRows / 100; - return period == 0? this.perClientRunRows: period; + return period == 0 ? this.perClientRunRows : period; } } @@ -961,7 +993,7 @@ public class PerformanceEvaluation extends Configured implements Tool { scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); ResultScanner s = this.table.getScanner(scan); int count = 0; - for (Result rr = null; (rr = s.next()) != null;) { + for (Result rr; (rr = s.next()) != null;) { count++; } @@ -1034,23 +1066,49 @@ public class PerformanceEvaluation extends Configured implements Tool { } static class RandomReadTest extends Test { + private final int everyN; + private final boolean reportLatency; + private final float[] times; + int idx = 0; + RandomReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); + everyN = (int) (this.totalRows / (this.totalRows * this.sampleRate)); + LOG.info("Sampling 1 every " + everyN + " out of " + perClientRunRows + " total rows."); + this.reportLatency = options.isReportLatency(); + if (this.reportLatency) { + times = new float[(int) Math.ceil(this.perClientRunRows * this.sampleRate)]; + } else { + times = null; + } } @Override void testRow(final int i) throws IOException { - Get get = new Get(getRandomRow(this.rand, this.totalRows)); - get.addColumn(FAMILY_NAME, QUALIFIER_NAME); - this.table.get(get); + if (i % everyN == 0) { + Get get = new Get(getRandomRow(this.rand, this.totalRows)); + get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + long start = System.nanoTime(); + this.table.get(get); + if (this.reportLatency) { + times[idx++] = (float) ((System.nanoTime() - start) / 1000000.0); + } + } } @Override protected int getReportingPeriod() { int period = this.perClientRunRows / 100; - return period == 0? this.perClientRunRows: period; + return period == 0 ? this.perClientRunRows : period; } + @Override + protected void testTakedown() throws IOException { + super.testTakedown(); + if (this.reportLatency) { + LOG.info("randomRead latency log (ms): " + Arrays.toString(times)); + } + } } static class RandomWriteTest extends Test { @@ -1110,7 +1168,6 @@ public class PerformanceEvaluation extends Configured implements Tool { get.addColumn(FAMILY_NAME, QUALIFIER_NAME); table.get(get); } - } static class SequentialWriteTest extends Test { @@ -1126,7 +1183,6 @@ public class PerformanceEvaluation extends Configured implements Tool { put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); table.put(put); } - } static class FilteredScanTest extends Test { @@ -1162,6 +1218,23 @@ public class PerformanceEvaluation extends Configured implements Tool { } } + /** + * Compute a throughput rate in MB/s. + * @param rows Number of records consumed. + * @param timeMs Time taken in milliseconds. + * @return String value with label, ie '123.76 MB/s' + */ + private static String calculateMbps(int rows, long timeMs) { + // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS) + // * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB + BigDecimal rowSize = + BigDecimal.valueOf(ROW_LENGTH + VALUE_LENGTH + FAMILY_NAME.length + QUALIFIER_NAME.length); + BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) + .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT) + .divide(BYTES_PER_MB, CXT); + return FMT.format(mbps) + " MB/s"; + } + /* * Format passed integer. * @param number @@ -1169,7 +1242,7 @@ public class PerformanceEvaluation extends Configured implements Tool { * number (Does absolute in case number is negative). */ public static byte [] format(final int number) { - byte [] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10]; + byte [] b = new byte[ROW_LENGTH]; int d = Math.abs(number); for (int i = b.length - 1; i >= 0; i--) { b[i] = (byte)((d % 10) + '0'); @@ -1211,16 +1284,17 @@ public class PerformanceEvaluation extends Configured implements Tool { } long runOneClient(final Class cmd, final int startRow, - final int perClientRunRows, final int totalRows, - boolean flushCommits, boolean writeToWAL, HConnection connection, - final Status status) + final int perClientRunRows, final int totalRows, final float sampleRate, + boolean flushCommits, boolean writeToWAL, boolean reportLatency, + HConnection connection, final Status status) throws IOException { status.setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); long totalElapsedTime = 0; TestOptions options = new TestOptions(startRow, perClientRunRows, - totalRows, N, tableName, flushCommits, writeToWAL, connection); + totalRows, sampleRate, N, tableName, flushCommits, writeToWAL, + reportLatency, connection); final Test t; try { Constructor constructor = cmd.getDeclaredConstructor( @@ -1237,11 +1311,12 @@ public class PerformanceEvaluation extends Configured implements Tool { totalElapsedTime = t.test(); status.setStatus("Finished " + cmd + " in " + totalElapsedTime + - "ms at offset " + startRow + " for " + perClientRunRows + " rows"); + "ms at offset " + startRow + " for " + perClientRunRows + " rows" + + " (" + calculateMbps((int)(perClientRunRows * sampleRate), totalElapsedTime) + ")"); return totalElapsedTime; } - private void runNIsOne(final Class cmd) { + private void runNIsOne(final Class cmd) throws IOException { Status status = new Status() { public void setStatus(String msg) throws IOException { LOG.info(msg); @@ -1252,10 +1327,12 @@ public class PerformanceEvaluation extends Configured implements Tool { try { admin = new HBaseAdmin(getConf()); checkTable(admin); - runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, this.connection, - status); + runOneClient(cmd, 0, this.R, this.R, this.sampleRate, this.flushCommits, + this.writeToWAL, this.reportLatency, this.connection, status); } catch (Exception e) { LOG.error("Failed", e); + } finally { + if (admin != null) admin.close(); } } @@ -1281,20 +1358,27 @@ public class PerformanceEvaluation extends Configured implements Tool { } System.err.println("Usage: java " + this.getClass().getName() + " \\"); System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\"); - System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] [-D]* "); + System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] " + + "[-D]* "); System.err.println(); System.err.println("Options:"); System.err.println(" nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); System.err.println(" rows Rows each client runs. Default: One million"); + System.err.println(" sampleRate Execute test on a sample of total " + + "rows. Only supported by randomRead. Default: 1.0"); System.err.println(" table Alternate table name. Default: 'TestTable'"); System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); - System.err.println(" flushCommits Used to determine if the test should flush the table. Default: false"); + System.err.println(" flushCommits Used to determine if the test should flush the table. " + + "Default: false"); System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); - System.err.println(" presplit Create presplit table. Recommended for accurate perf analysis (see guide). Default: disabled"); - System.err - .println(" inmemory Tries to keep the HFiles of the CF inmemory as far as possible. Not " + - "guaranteed that reads are always served from inmemory. Default: false"); + System.err.println(" presplit Create presplit table. Recommended for accurate perf " + + "analysis (see guide). Default: disabled"); + System.err.println(" inmemory Tries to keep the HFiles of the CF " + + "inmemory as far as possible. Not guaranteed that reads are always served " + + "from memory. Default: false"); + System.err.println(" latency Set to report operation latencies. " + + "Currently only supported by randomRead test. Default: False"); System.err.println(); System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" For example: "); @@ -1364,6 +1448,12 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } + final String sampleRate = "--sampleRate="; + if (cmd.startsWith(sampleRate)) { + this.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); + continue; + } + final String table = "--table="; if (cmd.startsWith(table)) { this.tableName = TableName.valueOf(cmd.substring(table.length())); @@ -1406,6 +1496,12 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } + final String latency = "--latency"; + if (cmd.startsWith(latency)) { + this.reportLatency = true; + continue; + } + this.connection = HConnectionManager.createConnection(getConf()); Class cmdClass = determineCommandClass(cmd); @@ -1431,9 +1527,6 @@ public class PerformanceEvaluation extends Configured implements Tool { return descriptor != null ? descriptor.getCmdClass() : null; } - /** - * @param args - */ public static void main(final String[] args) throws Exception { int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); System.exit(res); -- 1.8.4.2