From c38c3052a33ab432b0c6f1f9685c5b08428f562d Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Wed, 20 Nov 2013 15:16:16 -0800 Subject: [PATCH] HBASE-10007 PerformanceEval: add sampling, latency collection and throughput summary --- .../apache/hadoop/hbase/PerformanceEvaluation.java | 245 ++++++++++++++------- 1 file changed, 171 insertions(+), 74 deletions(-) diff --git a/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 16c7653..9284b4e 100644 --- a/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -21,9 +21,12 @@ package org.apache.hadoop.hbase; import java.io.DataInput; import java.io.DataOutput; +import java.io.File; import java.io.IOException; import java.io.PrintStream; -import java.io.File; +import java.math.BigDecimal; +import java.math.MathContext; +import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; @@ -98,20 +101,20 @@ import org.apache.hadoop.util.LineReader; public class PerformanceEvaluation { protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); - private static final int ROW_LENGTH = 1000; - private static final int ONE_GB = 1024 * 1024 * 1000; - private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; - public static final byte[] TABLE_NAME = Bytes.toBytes("TestTable"); public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); + public static final int VALUE_LENGTH = 1000; + public static final int ROW_LENGTH = 26; - protected static final HTableDescriptor TABLE_DESCRIPTOR; - static { - TABLE_DESCRIPTOR = new HTableDescriptor(TABLE_NAME); - TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME)); - } + private static final int ONE_GB = 1024 * 1024 * 1000; + private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH; + private static final DecimalFormat FMT = new DecimalFormat("0.##"); + private static final MathContext CXT = MathContext.DECIMAL64; + private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); + private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); + protected static HTableDescriptor TABLE_DESCRIPTOR; protected Map commands = new TreeMap(); volatile Configuration conf; @@ -119,21 +122,24 @@ public class PerformanceEvaluation { private boolean nomapred = false; private int N = 1; private int R = ROWS_PER_GB; + private float sampleRate = 1.0f; private boolean flushCommits = true; + private boolean reportLatency = false; private boolean writeToWAL = true; private int presplitRegions = 0; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); - /** - * Regex to parse lines in input file passed to mapreduce task. - */ + + /** Regex to parse lines in input file passed to mapreduce task. */ public static final Pattern LINE_PATTERN = Pattern.compile("startRow=(\\d+),\\s+" + "perClientRunRows=(\\d+),\\s+" + "totalRows=(\\d+),\\s+" + + "sampleRate=([-+]?[0-9]*\\.?[0-9]+),\\s+" + "clients=(\\d+),\\s+" + "flushCommits=(\\w+),\\s+" + - "writeToWAL=(\\w+)"); + "writeToWAL=(\\w+),\\s+" + + "reportLatency=(\\w+)"); /** * Enum for map metrics. Keep it out here rather than inside in the Map @@ -143,8 +149,8 @@ public class PerformanceEvaluation { /** elapsed time */ ELAPSED_TIME, /** number of rows */ - ROWS} - + ROWS + } /** * Constructor @@ -206,27 +212,24 @@ public class PerformanceEvaluation { private int startRow = 0; private int rows = 0; private int totalRows = 0; + private float sampleRate = 1.0f; private int clients = 0; private boolean flushCommits = false; private boolean writeToWAL = true; + private boolean reportLatency = false; - public PeInputSplit() { - this.startRow = 0; - this.rows = 0; - this.totalRows = 0; - this.clients = 0; - this.flushCommits = false; - this.writeToWAL = true; - } + public PeInputSplit() {} - public PeInputSplit(int startRow, int rows, int totalRows, int clients, - boolean flushCommits, boolean writeToWAL) { + public PeInputSplit(int startRow, int rows, int totalRows, float sampleRate, + int clients, boolean flushCommits, boolean writeToWAL, boolean reportLatency) { this.startRow = startRow; this.rows = rows; this.totalRows = totalRows; + this.sampleRate = sampleRate; this.clients = clients; this.flushCommits = flushCommits; this.writeToWAL = writeToWAL; + this.reportLatency = reportLatency; } @Override @@ -234,9 +237,11 @@ public class PerformanceEvaluation { this.startRow = in.readInt(); this.rows = in.readInt(); this.totalRows = in.readInt(); + this.sampleRate = in.readFloat(); this.clients = in.readInt(); this.flushCommits = in.readBoolean(); this.writeToWAL = in.readBoolean(); + this.reportLatency = in.readBoolean(); } @Override @@ -244,9 +249,11 @@ public class PerformanceEvaluation { out.writeInt(startRow); out.writeInt(rows); out.writeInt(totalRows); + out.writeFloat(sampleRate); out.writeInt(clients); out.writeBoolean(flushCommits); out.writeBoolean(writeToWAL); + out.writeBoolean(reportLatency); } @Override @@ -271,6 +278,10 @@ public class PerformanceEvaluation { return totalRows; } + public float getSampleRate() { + return sampleRate; + } + public int getClients() { return clients; } @@ -282,6 +293,10 @@ public class PerformanceEvaluation { public boolean isWriteToWAL() { return writeToWAL; } + + public boolean isReportLatency() { + return reportLatency; + } } /** @@ -312,21 +327,25 @@ public class PerformanceEvaluation { int startRow = Integer.parseInt(m.group(1)); int rows = Integer.parseInt(m.group(2)); int totalRows = Integer.parseInt(m.group(3)); - int clients = Integer.parseInt(m.group(4)); - boolean flushCommits = Boolean.parseBoolean(m.group(5)); - boolean writeToWAL = Boolean.parseBoolean(m.group(6)); + float sampleRate = Float.parseFloat(m.group(4)); + int clients = Integer.parseInt(m.group(5)); + boolean flushCommits = Boolean.parseBoolean(m.group(6)); + boolean writeToWAL = Boolean.parseBoolean(m.group(7)); + boolean reportLatency = Boolean.parseBoolean(m.group(8)); LOG.debug("split["+ splitList.size() + "] " + " startRow=" + startRow + " rows=" + rows + " totalRows=" + totalRows + + " sampleRate=" + sampleRate + " clients=" + clients + " flushCommits=" + flushCommits + - " writeToWAL=" + writeToWAL); + " writeToWAL=" + writeToWAL + + " reportLatency=" + reportLatency); PeInputSplit newSplit = - new PeInputSplit(startRow, rows, totalRows, clients, - flushCommits, writeToWAL); + new PeInputSplit(startRow, rows, totalRows, sampleRate, clients, + flushCommits, writeToWAL, reportLatency); splitList.add(newSplit); } } @@ -363,7 +382,7 @@ public class PerformanceEvaluation { } key = NullWritable.get(); - value = (PeInputSplit)split; + value = split; readOver = true; return true; @@ -446,9 +465,9 @@ public class PerformanceEvaluation { // Evaluation task long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), - value.getRows(), value.getTotalRows(), - value.isFlushCommits(), value.isWriteToWAL(), - status); + value.getRows(), value.getTotalRows(), value.getSampleRate(), + value.isFlushCommits(), value.isWriteToWAL(), value.isReportLatency(), + status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); @@ -487,11 +506,15 @@ public class PerformanceEvaluation { LOG.info("Table " + tableDescriptor + " created"); } } - boolean tableExists = admin.tableExists(tableDescriptor.getName()); - return tableExists; + return admin.tableExists(tableDescriptor.getName()); } protected HTableDescriptor getTableDescriptor() { + if (TABLE_DESCRIPTOR == null) { + TABLE_DESCRIPTOR = new HTableDescriptor(TABLE_NAME); + HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); + TABLE_DESCRIPTOR.addFamily(family); + } return TABLE_DESCRIPTOR; } @@ -546,8 +569,8 @@ public class PerformanceEvaluation { int index = Integer.parseInt(getName()); try { long elapsedTime = pe.runOneClient(cmd, index * perClientRows, - perClientRows, R, - flushCommits, writeToWAL, new Status() { + perClientRows, R, sampleRate, flushCommits, writeToWAL, + reportLatency, new Status() { public void setStatus(final String msg) throws IOException { LOG.info("client-" + getName() + " " + msg); } @@ -641,9 +664,11 @@ public class PerformanceEvaluation { String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + ", perClientRunRows=" + (perClientRows / 10) + ", totalRows=" + this.R + + ", sampleRate=" + this.sampleRate + ", clients=" + this.N + ", flushCommits=" + this.flushCommits + - ", writeToWAL=" + this.writeToWAL; + ", writeToWAL=" + this.writeToWAL + + ", reportLatency=" + this.reportLatency; int hash = h.hash(Bytes.toBytes(s)); m.put(hash, s); } @@ -692,20 +717,24 @@ public class PerformanceEvaluation { private int startRow; private int perClientRunRows; private int totalRows; + private float sampleRate; private byte[] tableName; private boolean flushCommits; private boolean writeToWAL = true; + private boolean reportLatency; - TestOptions() { - } + TestOptions() {} - TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, boolean flushCommits, boolean writeToWAL) { + TestOptions(int startRow, int perClientRunRows, int totalRows, float sampleRate, + byte[] tableName, boolean flushCommits, boolean writeToWAL, boolean reportLatency) { this.startRow = startRow; this.perClientRunRows = perClientRunRows; this.totalRows = totalRows; + this.sampleRate = sampleRate; this.tableName = tableName; this.flushCommits = flushCommits; this.writeToWAL = writeToWAL; + this.reportLatency = reportLatency; } public int getStartRow() { @@ -720,6 +749,10 @@ public class PerformanceEvaluation { return totalRows; } + public float getSampleRate() { + return sampleRate; + } + public byte[] getTableName() { return tableName; } @@ -731,6 +764,10 @@ public class PerformanceEvaluation { public boolean isWriteToWAL() { return writeToWAL; } + + public boolean isReportLatency() { + return reportLatency; + } } /* @@ -750,6 +787,7 @@ public class PerformanceEvaluation { protected final int startRow; protected final int perClientRunRows; protected final int totalRows; + protected final float sampleRate; private final Status status; protected byte[] tableName; protected HBaseAdmin admin; @@ -757,6 +795,7 @@ public class PerformanceEvaluation { protected volatile Configuration conf; protected boolean flushCommits; protected boolean writeToWAL; + protected boolean reportlatency; /** * Note that all subclasses of this class must provide a public contructor @@ -767,12 +806,14 @@ public class PerformanceEvaluation { this.startRow = options.getStartRow(); this.perClientRunRows = options.getPerClientRunRows(); this.totalRows = options.getTotalRows(); + this.sampleRate = options.getSampleRate(); this.status = status; this.tableName = options.getTableName(); this.table = null; this.conf = conf; this.flushCommits = options.isFlushCommits(); this.writeToWAL = options.isWriteToWAL(); + this.reportlatency = options.isReportLatency(); } private String generateStatus(final int sr, final int i, final int lr) { @@ -781,7 +822,7 @@ public class PerformanceEvaluation { protected int getReportingPeriod() { int period = this.perClientRunRows / 10; - return period == 0? this.perClientRunRows: period; + return period == 0 ? this.perClientRunRows : period; } void testSetup() throws IOException { @@ -850,17 +891,14 @@ public class PerformanceEvaluation { scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setFilter(new WhileMatchFilter(new PageFilter(120))); ResultScanner s = this.table.getScanner(scan); - //int count = 0; - for (Result rr = null; (rr = s.next()) != null;) { - // LOG.info("" + count++ + " " + rr.toString()); - } + for (Result rr; (rr = s.next()) != null;) ; s.close(); } @Override protected int getReportingPeriod() { int period = this.perClientRunRows / 100; - return period == 0? this.perClientRunRows: period; + return period == 0 ? this.perClientRunRows : period; } } @@ -878,7 +916,7 @@ public class PerformanceEvaluation { scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); ResultScanner s = this.table.getScanner(scan); int count = 0; - for (Result rr = null; (rr = s.next()) != null;) { + for (Result rr; (rr = s.next()) != null;) { count++; } @@ -951,23 +989,49 @@ public class PerformanceEvaluation { } static class RandomReadTest extends Test { + private final int everyN; + private final boolean reportLatency; + private final float[] times; + int idx = 0; + RandomReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); + everyN = (int) (this.totalRows / (this.totalRows * this.sampleRate)); + LOG.info("Sampling 1 every " + everyN + " out of " + perClientRunRows + " total rows."); + this.reportLatency = options.isReportLatency(); + if (this.reportLatency) { + times = new float[(int) Math.ceil(this.perClientRunRows * this.sampleRate)]; + } else { + times = null; + } } @Override void testRow(final int i) throws IOException { - Get get = new Get(getRandomRow(this.rand, this.totalRows)); - get.addColumn(FAMILY_NAME, QUALIFIER_NAME); - this.table.get(get); + if (i % everyN == 0) { + Get get = new Get(getRandomRow(this.rand, this.totalRows)); + get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + long start = System.nanoTime(); + this.table.get(get); + if (this.reportLatency) { + times[idx++] = (float) ((System.nanoTime() - start) / 1000000.0); + } + } } @Override protected int getReportingPeriod() { int period = this.perClientRunRows / 100; - return period == 0? this.perClientRunRows: period; + return period == 0 ? this.perClientRunRows : period; } + @Override + protected void testTakedown() throws IOException { + super.testTakedown(); + if (this.reportLatency) { + LOG.info("randomRead latency log (ms): " + Arrays.toString(times)); + } + } } static class RandomWriteTest extends Test { @@ -1030,7 +1094,6 @@ public class PerformanceEvaluation { get.addColumn(FAMILY_NAME, QUALIFIER_NAME); table.get(get); } - } static class SequentialWriteTest extends Test { @@ -1046,7 +1109,6 @@ public class PerformanceEvaluation { put.setWriteToWAL(writeToWAL); table.put(put); } - } static class FilteredScanTest extends Test { @@ -1082,14 +1144,31 @@ public class PerformanceEvaluation { } } + /** + * Compute a throughput rate in MB/s. + * @param rows Number of records consumed. + * @param timeMs Time taken in milliseconds. + * @return String value with label, ie '123.76 MB/s' + */ + private static String calculateMbps(int rows, long timeMs) { + // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS) + // * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB + BigDecimal rowSize = + BigDecimal.valueOf(VALUE_LENGTH + VALUE_LENGTH + FAMILY_NAME.length + QUALIFIER_NAME.length); + BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT) + .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT) + .divide(BYTES_PER_MB, CXT); + return FMT.format(mbps) + " MB/s"; + } + /* * Format passed integer. * @param number - * @return Returns zero-prefixed 10-byte wide decimal version of passed + * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed * number (Does absolute in case number is negative). */ public static byte [] format(final int number) { - byte [] b = new byte[10]; + byte [] b = new byte[ROW_LENGTH]; int d = Math.abs(number); for (int i = b.length - 1; i >= 0; i--) { b[i] = (byte)((d % 10) + '0'); @@ -1105,7 +1184,7 @@ public class PerformanceEvaluation { * @return Generated random value to insert into a table cell. */ public static byte[] generateValue(final Random r) { - byte [] b = new byte [ROW_LENGTH]; + byte [] b = new byte [VALUE_LENGTH]; r.nextBytes(b); return b; } @@ -1115,17 +1194,17 @@ public class PerformanceEvaluation { } long runOneClient(final Class cmd, final int startRow, - final int perClientRunRows, final int totalRows, - boolean flushCommits, boolean writeToWAL, - final Status status) + final int perClientRunRows, final int totalRows, final float sampleRate, + boolean flushCommits, boolean writeToWAL, boolean reportLatency, + final Status status) throws IOException { status.setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); long totalElapsedTime = 0; Test t = null; - TestOptions options = new TestOptions(startRow, perClientRunRows, - totalRows, getTableDescriptor().getName(), flushCommits, writeToWAL); + TestOptions options = new TestOptions(startRow, perClientRunRows, totalRows, + sampleRate, getTableDescriptor().getName(), flushCommits, writeToWAL, reportLatency); try { Constructor constructor = cmd.getDeclaredConstructor( Configuration.class, TestOptions.class, Status.class); @@ -1141,11 +1220,12 @@ public class PerformanceEvaluation { totalElapsedTime = t.test(); status.setStatus("Finished " + cmd + " in " + totalElapsedTime + - "ms at offset " + startRow + " for " + perClientRunRows + " rows"); + "ms at offset " + startRow + " for " + perClientRunRows + " rows" + + " (" + calculateMbps((int)(perClientRunRows * sampleRate), totalElapsedTime) + ")"); return totalElapsedTime; } - private void runNIsOne(final Class cmd) { + private void runNIsOne(final Class cmd) throws IOException { Status status = new Status() { public void setStatus(String msg) throws IOException { LOG.info(msg); @@ -1156,10 +1236,12 @@ public class PerformanceEvaluation { try { admin = new HBaseAdmin(this.conf); checkTable(admin); - runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL, - status); + runOneClient(cmd, 0, this.R, this.R, this.sampleRate, this.flushCommits, + this.writeToWAL, this.writeToWAL, status); } catch (Exception e) { LOG.error("Failed", e); + } finally { + if (admin != null) admin.close(); } } @@ -1219,9 +1301,15 @@ public class PerformanceEvaluation { System.err.println(" nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); System.err.println(" rows Rows each client runs. Default: One million"); - System.err.println(" flushCommits Used to determine if the test should flush the table. Default: false"); + System.err.println(" sampleRate Execute test on a sample of total " + + "rows. Only supported by randomRead. Default: 1.0"); + System.err.println(" flushCommits Used to determine if the test should flush the table. " + + "Default: false"); System.err.println(" writeToWAL Set writeToWAL on puts. Default: True"); - System.err.println(" presplit Create presplit table. Recommended for accurate perf analysis (see guide). Default: disabled"); + System.err.println(" presplit Create presplit table. Recommended for accurate perf " + + "analysis (see guide). Default: disabled"); + System.err.println(" latency Set to report operation latencies. " + + "Currently only supported by randomRead test. Default: False"); System.err.println(); System.err.println("Command:"); for (CmdDescriptor command : commands.values()) { @@ -1286,6 +1374,12 @@ public class PerformanceEvaluation { continue; } + final String sampleRate = "--sampleRate="; + if (cmd.startsWith(sampleRate)) { + this.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); + continue; + } + final String flushCommits = "--flushCommits="; if (cmd.startsWith(flushCommits)) { this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); @@ -1304,6 +1398,12 @@ public class PerformanceEvaluation { continue; } + final String latency = "--latency"; + if (cmd.startsWith(latency)) { + this.reportLatency = true; + continue; + } + Class cmdClass = determineCommandClass(cmd); if (cmdClass != null) { getArgs(i + 1, args); @@ -1327,9 +1427,6 @@ public class PerformanceEvaluation { return descriptor != null ? descriptor.getCmdClass() : null; } - /** - * @param args - */ public static void main(final String[] args) { Configuration c = HBaseConfiguration.create(); System.exit(new PerformanceEvaluation(c).doCommandLine(args)); -- 1.8.4.2