diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index d6b6046..9b26c1e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -20,13 +20,18 @@ package org.apache.hadoop.hbase; import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY; +import java.io.BufferedWriter; +import java.io.FileWriter; import java.io.IOException; import java.io.PrintStream; +import java.io.Writer; import java.lang.reflect.Constructor; import java.math.BigDecimal; import java.math.MathContext; import java.text.DecimalFormat; import java.text.SimpleDateFormat; +import java.util.LinkedList; +import java.util.Queue; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -40,9 +45,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -84,10 +91,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.codehaus.jackson.map.ObjectMapper; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.yammer.metrics.core.Histogram; -import com.yammer.metrics.core.MetricsRegistry; - /** * Script used evaluating HBase performance and scalability. Runs a HBase * client that steps through one of a set of hardcoded tests or 'experiments' @@ -115,6 +118,7 @@ public class PerformanceEvaluation extends Configured implements Tool { private static final int ONE_GB = 1024 * 1024 * 1000; private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH; + public static final long TEN_POW_6 = 1000000; // TODO : should we make this configurable private static final int TAG_LENGTH = 256; private static final DecimalFormat FMT = new DecimalFormat("0.##"); @@ -127,6 +131,8 @@ public class PerformanceEvaluation extends Configured implements Tool { private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); + private static ResponseCollector responseCollector; + /** * Enum for map metrics. Keep it out here rather than inside in the Map * inner-class so we can find associated properties. @@ -493,6 +499,9 @@ public class PerformanceEvaluation extends Configured implements Tool { this.numClientThreads = that.numClientThreads; this.totalRows = that.totalRows; this.sampleRate = that.sampleRate; + this.responseFile = that.responseFile; + this.detailedResponse = that.detailedResponse; + this.sampleSummaryDurMs = sampleSummaryDurMs; this.tableName = that.tableName; this.flushCommits = that.flushCommits; this.writeToWAL = that.writeToWAL; @@ -515,6 +524,9 @@ public class PerformanceEvaluation extends Configured implements Tool { public int numClientThreads = 1; public int totalRows = ROWS_PER_GB; public float sampleRate = 1.0f; + public String responseFile = null; + public boolean detailedResponse = false; + public int sampleSummaryDurMs = 500; public String tableName = TABLE_NAME; public boolean flushCommits = true; public boolean writeToWAL = true; @@ -636,7 +648,10 @@ public class PerformanceEvaluation extends Configured implements Tool { } list.addFilter(new WhileMatchFilter(new PageFilter(120))); scan.setFilter(list); + long stTime = System.nanoTime(); ResultScanner s = this.table.getScanner(scan); + long enTime = System.nanoTime(); + addResponse(stTime, enTime); for (Result rr; (rr = s.next()) != null;) ; s.close(); } @@ -663,7 +678,10 @@ public class PerformanceEvaluation extends Configured implements Tool { scan.setFilter(new FilterAllFilter()); } scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + long stTime = System.nanoTime(); ResultScanner s = this.table.getScanner(scan); + long enTime = System.nanoTime(); + addResponse(stTime, enTime); int count = 0; for (Result rr; (rr = s.next()) != null;) { count++; @@ -769,18 +787,22 @@ public class PerformanceEvaluation extends Configured implements Tool { if (opts.multiGet > 0) { this.gets.add(get); if (this.gets.size() == opts.multiGet) { - long start = System.nanoTime(); + long stTime = System.nanoTime(); this.table.get(this.gets); + long enTime = System.nanoTime(); + addResponse(stTime, enTime); if (opts.reportLatency) { - times[idx++] = (System.nanoTime() - start) / 1e6; + times[idx++] = (enTime - stTime) / 1e6; } this.gets.clear(); } } else { - long start = System.nanoTime(); + long stTime = System.nanoTime(); this.table.get(get); + long enTime = System.nanoTime(); + addResponse(stTime, enTime); if (opts.reportLatency) { - times[idx++] = (System.nanoTime() - start) / 1e6; + times[idx++] = (enTime - stTime) / 1e6; } } } @@ -844,7 +866,10 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + long stTime = System.nanoTime(); table.put(put); + long enTime = System.nanoTime(); + addResponse(stTime, enTime); } } @@ -876,7 +901,10 @@ public class PerformanceEvaluation extends Configured implements Tool { } this.testScanner = table.getScanner(scan); } + long stTime = System.nanoTime(); testScanner.next(); + long enTime = System.nanoTime(); + addResponse(stTime, enTime); } } @@ -893,7 +921,10 @@ public class PerformanceEvaluation extends Configured implements Tool { if (opts.filterAll) { get.setFilter(new FilterAllFilter()); } + long stTime = System.nanoTime(); table.get(get); + long enTime = System.nanoTime(); + addResponse(stTime, enTime); } } @@ -921,7 +952,10 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + long stTime = System.nanoTime(); table.put(put); + long enTime = System.nanoTime(); + addResponse(stTime, enTime); } } @@ -939,8 +973,13 @@ public class PerformanceEvaluation extends Configured implements Tool { ResultScanner scanner = null; try { scanner = this.table.getScanner(scan); - while (scanner.next() != null) { - } + Result result = null; + do { + long stTime = System.nanoTime(); + result = scanner.next(); + long enTime = System.nanoTime(); + addResponse(stTime, enTime); + } while (result != null); } finally { if (scanner != null) scanner.close(); } @@ -962,6 +1001,169 @@ public class PerformanceEvaluation extends Configured implements Tool { return scan; } } + + /** + * The class that facilitates the collection and logging of all latency response data. + */ + public static abstract class ResponseCollector extends Thread { + + private String responseFile; + private Writer responseOut; + private int sampleDurMs; + + public ResponseCollector(String responseFile, int sampleDurMs) throws IOException { + super("ResponseCollector: " + responseFile); + setDaemon(true); + this.responseOut = new BufferedWriter(new FileWriter(responseFile)); + this.responseFile = responseFile; + this.sampleDurMs = sampleDurMs; + } + + public synchronized void addResponse(long tsMs, int durNanos) { + if (responseOut == null) { + return; + } + addResponseImpl(tsMs, durNanos); + } + + abstract void addResponseImpl(long tsMs, int durNanos); + + abstract void writeHeader(Writer out) throws IOException; + + abstract void snapshot(); + + abstract void writeStats(Writer out) throws IOException; + + @Override + public void run() { + boolean wroteHeader = false; + while (true) { + if (!wroteHeader) { + try { + writeHeader(this.responseOut); + } catch (IOException e) { + LOG.error("Exception writing header to " + responseFile + " closing response collection"); + break; + } + wroteHeader = true; + } + + try { + synchronized (this) { + snapshot(); + } + writeStats(this.responseOut); + responseOut.flush(); + } catch (IOException e) { + LOG.error("Exception writing to " + responseFile + " closing response collection"); + break; + } + + try { + Thread.sleep(sampleDurMs); + } catch (InterruptedException e) { + break; + } + } + try { + responseOut.close(); + } catch (IOException e) { + LOG.error("Error closing response file: " + responseFile, e); + } + responseOut = null; + } + } + + public static class DetailedResponseCollector extends ResponseCollector { + Queue responseQueue; + Queue snapshotQueue; + + DetailedResponseCollector(String responseFile) throws IOException { + super(responseFile, 500); + } + + @Override + void addResponseImpl(long tsMs, int durNanos) { + if (this.responseQueue == null) { + this.responseQueue = new LinkedList(); + } + responseQueue.add(new Response(tsMs, durNanos)); + } + + @Override + void writeHeader(Writer out) throws IOException { + out.write("timestampMS,durationNanos\n"); + } + + @Override + void snapshot() { + snapshotQueue = responseQueue; + responseQueue = null; + } + + @Override + void writeStats(Writer out) throws IOException { + if (snapshotQueue == null) { + return; + } + for (Response response: snapshotQueue) { + out.write(response.tsMs+","+response.durMs+"\n"); + } + snapshotQueue = null; + } + + private static class Response { + long tsMs; + int durMs; + public Response(long tsMs, int durNanos) { + this.tsMs = tsMs; + this.durMs = durNanos; + } + } + } + + public static class SummaryResponseCollector extends ResponseCollector { + private DescriptiveStatistics ds; + private DescriptiveStatistics snapshotDs; + private long lastTs; + + public SummaryResponseCollector(String responseFile, int sampleDurMs) throws IOException { + super(responseFile, sampleDurMs); + } + + @Override + void addResponseImpl(long tsMs, int durNanos) { + if (this.ds == null) { + this.ds = new SynchronizedDescriptiveStatistics(); + } + this.ds.addValue(durNanos); + } + + @Override + void writeHeader(Writer out) throws IOException { + lastTs = System.currentTimeMillis(); + out.write("timestampMS,count,minDurationNanos,maxDurationNanos,averageDurationNanos,percentile99.9,percentile99,percentile95,percentile90\n"); + } + + @Override + void snapshot() { + snapshotDs = ds; + ds = null; + lastTs = System.currentTimeMillis(); + } + + @Override + void writeStats(Writer out) throws IOException { + if (snapshotDs == null) { + return; + } + out.write(lastTs+","+snapshotDs.getN()+","+snapshotDs.getMin()+","+ + snapshotDs.getMax()+","+snapshotDs.getMean()+","+ + snapshotDs.getPercentile(99.9d)+","+snapshotDs.getPercentile(99d)+","+ + snapshotDs.getPercentile(95d)+","+snapshotDs.getPercentile(90d)+"\n"); + snapshotDs = null; + } + } /** * Compute a throughput rate in MB/s. @@ -1100,6 +1302,13 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" rows Rows each client runs. Default: One million"); System.err.println(" sampleRate Execute test on a sample of total " + "rows. Only supported by randomRead. Default: 1.0"); + System.err.println(" responseFile Specify a CSV filename to record all the latencies," + + " works only when --nomapred is used"); + System.err.println(" detailedResponse \n" + + " Enables generating of a response line for each individual invocation" + + "(default is to generate a summary line every sampleSummaryDurMs millis)"); + System.err.println(" sampleSummaryDurMs \n" + + " Specifies the duration for each summary line"); System.err.println(" table Alternate table name. Default: 'TestTable'"); System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'"); System.err.println(" flushCommits Used to determine if the test should flush the table. " + @@ -1196,6 +1405,24 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } + final String responseFile = "--responseFile="; + if (cmd.startsWith(responseFile)) { + opts.responseFile = cmd.substring(responseFile.length()); + continue; + } + + final String detailedResponse = "--detailedResponse"; + if (cmd.startsWith(detailedResponse)) { + opts.detailedResponse = true; + continue; + } + + final String sampleSummaryDurMs = "--sampleSummaryDurMs="; + if (cmd.startsWith(sampleSummaryDurMs)) { + opts.sampleSummaryDurMs = Integer.parseInt(cmd.substring(sampleSummaryDurMs.length())); + continue; + } + final String table = "--table="; if (cmd.startsWith(table)) { opts.tableName = cmd.substring(table.length()); @@ -1279,7 +1506,29 @@ public class PerformanceEvaluation extends Configured implements Tool { opts.numClientThreads = getNumClients(i + 1, args); // number of rows specified opts.totalRows = opts.perClientRunRows * opts.numClientThreads; + + if (opts.responseFile != null) { + try { + if (opts.detailedResponse) { + responseCollector = new DetailedResponseCollector(opts.responseFile); + } + else { + responseCollector = new SummaryResponseCollector(opts.responseFile, opts.sampleSummaryDurMs); + } + } catch (IOException e) { + LOG.error("Error opening file: " + opts.responseFile + " for writing, no response file will be generated", e); + } + responseCollector.start(); + } + runTest(cmdClass, opts); + + if (responseCollector != null) { + responseCollector.interrupt(); + responseCollector.join(); + responseCollector = null; + } + errCode = 0; break; } @@ -1299,6 +1548,12 @@ public class PerformanceEvaluation extends Configured implements Tool { return descriptor != null ? descriptor.getCmdClass() : null; } + public final static void addResponse(long stTime, long enTime) { + if (responseCollector != null) { + responseCollector.addResponse(stTime/TEN_POW_6, (int)(enTime - stTime)); + } + } + public static void main(final String[] args) throws Exception { int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args); System.exit(res);