diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 54b27e5..6f873ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -18,31 +18,31 @@ */ package org.apache.hadoop.hbase; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.io.PrintStream; +import java.lang.reflect.Constructor; import java.math.BigDecimal; import java.math.MathContext; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.TreeMap; -import java.util.Arrays; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.lang.reflect.Constructor; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Get; @@ -68,23 +68,18 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Hash; import org.apache.hadoop.hbase.util.MurmurHash; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.util.LineReader; +import org.codehaus.jackson.map.ObjectMapper; + +import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY; /** * Script used evaluating HBase performance and scalability. Runs a HBase @@ -105,7 +100,7 @@ import org.apache.hadoop.util.LineReader; public class PerformanceEvaluation extends Configured implements Tool { protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); - public static final TableName TABLE_NAME = TableName.valueOf("TestTable"); + public static final String TABLE_NAME = "TestTable"; public static final byte[] FAMILY_NAME = Bytes.toBytes("info"); public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data"); public static final int VALUE_LENGTH = 1000; @@ -117,40 +112,12 @@ public class PerformanceEvaluation extends Configured implements Tool { private static final MathContext CXT = MathContext.DECIMAL64; private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000); private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); + private static final TestOptions DEFAULT_OPTS = new TestOptions(); - protected HTableDescriptor TABLE_DESCRIPTOR; protected Map commands = new TreeMap(); - private boolean nomapred = false; - private int N = 1; - private int R = ROWS_PER_GB; - private float sampleRate = 1.0f; - private TableName tableName = TABLE_NAME; - private Compression.Algorithm compression = Compression.Algorithm.NONE; - private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; - private boolean flushCommits = true; - private boolean writeToWAL = true; - private boolean inMemoryCF = false; - private boolean reportLatency = false; - private int presplitRegions = 0; - private int multiGet = 0; - private HConnection connection; - private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); - /** Regex to parse lines in input file passed to mapreduce task. */ - public static final Pattern LINE_PATTERN = - Pattern.compile("tableName=(\\w+),\\s+" + - "startRow=(\\d+),\\s+" + - "perClientRunRows=(\\d+),\\s+" + - "totalRows=(\\d+),\\s+" + - "sampleRate=([-+]?[0-9]*\\.?[0-9]+),\\s+" + - "clients=(\\d+),\\s+" + - "flushCommits=(\\w+),\\s+" + - "writeToWAL=(\\w+),\\s+" + - "reportLatency=(\\w+),\\s+" + - "multiGet=(\\d+)"); - /** * Enum for map metrics. Keep it out here rather than inside in the Map * inner-class so we can find associated properties. @@ -213,252 +180,10 @@ public class PerformanceEvaluation extends Configured implements Tool { } /** - * This class works as the InputSplit of Performance Evaluation - * MapReduce InputFormat, and the Record Value of RecordReader. - * Each map task will only read one record from a PeInputSplit, - * the record value is the PeInputSplit itself. - */ - public static class PeInputSplit extends InputSplit implements Writable { - private TableName tableName = TABLE_NAME; - private int startRow = 0; - private int rows = 0; - private int totalRows = 0; - private float sampleRate = 1.0f; - private int clients = 0; - private boolean flushCommits = false; - private boolean writeToWAL = true; - private boolean reportLatency = false; - private int multiGet = 0; - - public PeInputSplit() {} - - public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, - float sampleRate, int clients, boolean flushCommits, boolean writeToWAL, - boolean reportLatency, int multiGet) { - this.tableName = tableName; - this.startRow = startRow; - this.rows = rows; - this.totalRows = totalRows; - this.sampleRate = sampleRate; - this.clients = clients; - this.flushCommits = flushCommits; - this.writeToWAL = writeToWAL; - this.reportLatency = reportLatency; - this.multiGet = multiGet; - } - - @Override - public void readFields(DataInput in) throws IOException { - int tableNameLen = in.readInt(); - byte[] name = new byte[tableNameLen]; - in.readFully(name); - this.tableName = TableName.valueOf(name); - - this.startRow = in.readInt(); - this.rows = in.readInt(); - this.totalRows = in.readInt(); - this.sampleRate = in.readFloat(); - this.clients = in.readInt(); - this.flushCommits = in.readBoolean(); - this.writeToWAL = in.readBoolean(); - this.reportLatency = in.readBoolean(); - this.multiGet = in.readInt(); - } - - @Override - public void write(DataOutput out) throws IOException { - byte[] name = this.tableName.toBytes(); - out.writeInt(name.length); - out.write(name); - out.writeInt(startRow); - out.writeInt(rows); - out.writeInt(totalRows); - out.writeFloat(sampleRate); - out.writeInt(clients); - out.writeBoolean(flushCommits); - out.writeBoolean(writeToWAL); - out.writeBoolean(reportLatency); - out.writeInt(multiGet); - } - - @Override - public long getLength() throws IOException, InterruptedException { - return 0; - } - - @Override - public String[] getLocations() throws IOException, InterruptedException { - return new String[0]; - } - - public TableName getTableName() { - return tableName; - } - - public int getStartRow() { - return startRow; - } - - public int getRows() { - return rows; - } - - public int getTotalRows() { - return totalRows; - } - - public float getSampleRate() { - return sampleRate; - } - - public int getClients() { - return clients; - } - - public boolean isFlushCommits() { - return flushCommits; - } - - public boolean isWriteToWAL() { - return writeToWAL; - } - - public boolean isReportLatency() { - return reportLatency; - } - - public int getMultiGet() { - return multiGet; - } - } - - /** - * InputFormat of Performance Evaluation MapReduce job. - * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). - */ - public static class PeInputFormat extends FileInputFormat { - - @Override - public List getSplits(JobContext job) throws IOException { - // generate splits - List splitList = new ArrayList(); - - for (FileStatus file: listStatus(job)) { - if (file.isDir()) { - continue; - } - Path path = file.getPath(); - FileSystem fs = path.getFileSystem(job.getConfiguration()); - FSDataInputStream fileIn = fs.open(path); - LineReader in = new LineReader(fileIn, job.getConfiguration()); - int lineLen = 0; - while(true) { - Text lineText = new Text(); - lineLen = in.readLine(lineText); - if(lineLen <= 0) { - break; - } - Matcher m = LINE_PATTERN.matcher(lineText.toString()); - if((m != null) && m.matches()) { - TableName tableName = TableName.valueOf(m.group(1)); - int startRow = Integer.parseInt(m.group(2)); - int rows = Integer.parseInt(m.group(3)); - int totalRows = Integer.parseInt(m.group(4)); - float sampleRate = Float.parseFloat(m.group(5)); - int clients = Integer.parseInt(m.group(6)); - boolean flushCommits = Boolean.parseBoolean(m.group(7)); - boolean writeToWAL = Boolean.parseBoolean(m.group(8)); - boolean reportLatency = Boolean.parseBoolean(m.group(9)); - int multiGet = Integer.parseInt(m.group(10)); - - LOG.debug("tableName=" + tableName + - " split["+ splitList.size() + "] " + - " startRow=" + startRow + - " rows=" + rows + - " totalRows=" + totalRows + - " sampleRate=" + sampleRate + - " clients=" + clients + - " flushCommits=" + flushCommits + - " writeToWAL=" + writeToWAL + - " reportLatency=" + reportLatency + - " multiGet=" + multiGet); - - PeInputSplit newSplit = - new PeInputSplit(tableName, startRow, rows, totalRows, sampleRate, clients, - flushCommits, writeToWAL, reportLatency, multiGet); - splitList.add(newSplit); - } - } - in.close(); - } - - LOG.info("Total # of splits: " + splitList.size()); - return splitList; - } - - @Override - public RecordReader createRecordReader(InputSplit split, - TaskAttemptContext context) { - return new PeRecordReader(); - } - - public static class PeRecordReader extends RecordReader { - private boolean readOver = false; - private PeInputSplit split = null; - private NullWritable key = null; - private PeInputSplit value = null; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - this.readOver = false; - this.split = (PeInputSplit)split; - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if(readOver) { - return false; - } - - key = NullWritable.get(); - value = split; - - readOver = true; - return true; - } - - @Override - public NullWritable getCurrentKey() throws IOException, InterruptedException { - return key; - } - - @Override - public PeInputSplit getCurrentValue() throws IOException, InterruptedException { - return value; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - if(readOver) { - return 1.0f; - } else { - return 0.0f; - } - } - - @Override - public void close() throws IOException { - // do nothing - } - } - } - - /** * MapReduce job that runs a performance evaluation client in each map task. */ public static class EvaluationMapTask - extends Mapper { + extends Mapper { /** configuration parameter name that contains the command */ public final static String CMD_KEY = "EvaluationMapTask.command"; @@ -485,16 +210,14 @@ public class PerformanceEvaluation extends Configured implements Tool { } private Class forName(String className, Class type) { - Class clazz = null; try { - clazz = Class.forName(className).asSubclass(type); + return Class.forName(className).asSubclass(type); } catch (ClassNotFoundException e) { throw new IllegalStateException("Could not find class for name: " + className, e); } - return clazz; } - protected void map(NullWritable key, PeInputSplit value, final Context context) + protected void map(LongWritable key, Text value, final Context context) throws IOException, InterruptedException { Status status = new Status() { @@ -503,18 +226,17 @@ public class PerformanceEvaluation extends Configured implements Tool { } }; + ObjectMapper mapper = new ObjectMapper(); + TestOptions opts = mapper.readValue(value.toString(), TestOptions.class); + Configuration conf = HBaseConfiguration.create(context.getConfiguration()); + // Evaluation task - pe.tableName = value.getTableName(); - long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), - value.getRows(), value.getTotalRows(), value.getSampleRate(), - value.isFlushCommits(), value.isWriteToWAL(), value.isReportLatency(), - value.getMultiGet(), HConnectionManager.createConnection(context.getConfiguration()), - status); + long elapsedTime = this.pe.runOneClient(this.cmd, conf, opts, status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); - context.getCounter(Counter.ROWS).increment(value.rows); - context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); + context.getCounter(Counter.ROWS).increment(opts.perClientRunRows); + context.write(new LongWritable(opts.startRow), new LongWritable(elapsedTime)); context.progress(); } } @@ -525,21 +247,21 @@ public class PerformanceEvaluation extends Configured implements Tool { * @return True if we created the table. * @throws IOException */ - private boolean checkTable(HBaseAdmin admin) throws IOException { - HTableDescriptor tableDescriptor = getTableDescriptor(); - if (this.presplitRegions > 0) { + private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException { + HTableDescriptor tableDescriptor = getTableDescriptor(opts); + if (opts.presplitRegions > 0) { // presplit requested if (admin.tableExists(tableDescriptor.getTableName())) { admin.disableTable(tableDescriptor.getTableName()); admin.deleteTable(tableDescriptor.getTableName()); } - byte[][] splits = getSplits(); + byte[][] splits = getSplits(opts); for (int i=0; i < splits.length; i++) { LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); } admin.createTable(tableDescriptor, splits); - LOG.info ("Table created with " + this.presplitRegions + " splits"); + LOG.info ("Table created with " + opts.presplitRegions + " splits"); } else { boolean tableExists = admin.tableExists(tableDescriptor.getTableName()); @@ -551,33 +273,32 @@ public class PerformanceEvaluation extends Configured implements Tool { return admin.tableExists(tableDescriptor.getTableName()); } - protected HTableDescriptor getTableDescriptor() { - if (TABLE_DESCRIPTOR == null) { - TABLE_DESCRIPTOR = new HTableDescriptor(tableName); - HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); - family.setDataBlockEncoding(blockEncoding); - family.setCompressionType(compression); - if (inMemoryCF) { - family.setInMemory(true); - } - TABLE_DESCRIPTOR.addFamily(family); - } - return TABLE_DESCRIPTOR; + /** + * Create an HTableDescriptor from provided TestOptions. + */ + protected static HTableDescriptor getTableDescriptor(TestOptions opts) { + HTableDescriptor desc = new HTableDescriptor(opts.tableName); + HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME); + family.setDataBlockEncoding(opts.blockEncoding); + family.setCompressionType(opts.compression); + if (opts.inMemoryCF) { + family.setInMemory(true); + } + desc.addFamily(family); + return desc; } /** * generates splits based on total number of rows and specified split regions - * - * @return splits : array of byte [] */ - protected byte[][] getSplits() { - if (this.presplitRegions == 0) + protected static byte[][] getSplits(TestOptions opts) { + if (opts.presplitRegions == 0) return new byte [0][]; - int numSplitPoints = presplitRegions - 1; + int numSplitPoints = opts.presplitRegions - 1; byte[][] splits = new byte[numSplitPoints][]; - int jump = this.R / this.presplitRegions; - for (int i=0; i < numSplitPoints; i++) { + int jump = opts.totalRows / opts.presplitRegions; + for (int i = 0; i < numSplitPoints; i++) { int rowkey = jump * (1 + i); splits[i] = format(rowkey); } @@ -585,86 +306,40 @@ public class PerformanceEvaluation extends Configured implements Tool { } /* - * We're to run multiple clients concurrently. Setup a mapreduce job. Run - * one map per client. Then run a single reduce to sum the elapsed times. - * @param cmd Command to run. - * @throws IOException - */ - private void runNIsMoreThanOne(final Class cmd) - throws IOException, InterruptedException, ClassNotFoundException { - checkTable(new HBaseAdmin(getConf())); - if (this.nomapred) { - doMultipleClients(cmd); - } else { - doMapReduce(cmd); - } - } - - /* * Run all clients in this vm each to its own thread. * @param cmd Command to run. * @throws IOException */ - private void doMultipleClients(final Class cmd) throws IOException { - final List threads = new ArrayList(this.N); - final long[] timings = new long[this.N]; - final int perClientRows = R/N; - final float sampleRate = this.sampleRate; - final TableName tableName = this.tableName; - final DataBlockEncoding encoding = this.blockEncoding; - final boolean flushCommits = this.flushCommits; - final Compression.Algorithm compression = this.compression; - final boolean writeToWal = this.writeToWAL; - final boolean reportLatency = this.reportLatency; - final int preSplitRegions = this.presplitRegions; - final int multiGet = this.multiGet; - final HConnection connection = HConnectionManager.createConnection(getConf()); - for (int i = 0; i < this.N; i++) { + private void doLocalClients(final Class cmd, final TestOptions opts) + throws IOException, InterruptedException { + Future[] threads = new Future[opts.numClientThreads]; + long[] timings = new long[opts.numClientThreads]; + ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, + new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build()); + for (int i = 0; i < threads.length; i++) { final int index = i; - Thread t = new Thread ("TestClient-" + i) { + threads[i] = pool.submit(new Callable() { @Override - public void run() { - super.run(); - PerformanceEvaluation pe = new PerformanceEvaluation(getConf()); - pe.tableName = tableName; - pe.blockEncoding = encoding; - pe.flushCommits = flushCommits; - pe.compression = compression; - pe.writeToWAL = writeToWal; - pe.presplitRegions = preSplitRegions; - pe.N = N; - pe.sampleRate = sampleRate; - pe.reportLatency = reportLatency; - pe.connection = connection; - pe.multiGet = multiGet; - try { - long elapsedTime = pe.runOneClient(cmd, index * perClientRows, - perClientRows, R, sampleRate, flushCommits, writeToWAL, reportLatency, multiGet, - connection, new Status() { - public void setStatus(final String msg) throws IOException { - LOG.info("client-" + getName() + " " + msg); - } - }); - timings[index] = elapsedTime; - LOG.info("Finished " + getName() + " in " + elapsedTime + - "ms writing " + perClientRows + " rows"); - } catch (IOException e) { - throw new RuntimeException(e); - } + public Long call() throws Exception { + TestOptions threadOpts = new TestOptions(opts); + threadOpts.startRow = index * threadOpts.perClientRunRows; + long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() { + public void setStatus(final String msg) throws IOException { + LOG.info("client-" + Thread.currentThread().getName() + " " + msg); + } + }); + LOG.info("Finished " + Thread.currentThread().getName() + " in " + elapsedTime + + "ms over " + threadOpts.perClientRunRows + " rows"); + return elapsedTime; } - }; - threads.add(t); - } - for (Thread t: threads) { - t.start(); + }); } - for (Thread t: threads) { - while(t.isAlive()) { - try { - t.join(); - } catch (InterruptedException e) { - LOG.debug("Interrupted, continuing" + e.toString()); - } + pool.shutdown(); + for (int i = 0; i < threads.length; i++) { + try { + timings[i] = threads[i].get(); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); } } final String test = cmd.getSimpleName(); @@ -672,13 +347,13 @@ public class PerformanceEvaluation extends Configured implements Tool { + Arrays.toString(timings)); Arrays.sort(timings); long total = 0; - for (int i = 0; i < this.N; i++) { + for (int i = 0; i < timings.length; i++) { total += timings[i]; } LOG.info("[" + test + "]" + "\tMin: " + timings[0] + "ms" - + "\tMax: " + timings[this.N - 1] + "ms" - + "\tAvg: " + (total / this.N) + "ms"); + + "\tMax: " + timings[timings.length - 1] + "ms" + + "\tAvg: " + (total / timings.length) + "ms"); } /* @@ -688,18 +363,20 @@ public class PerformanceEvaluation extends Configured implements Tool { * @param cmd Command to run. * @throws IOException */ - private void doMapReduce(final Class cmd) throws IOException, + private void doMapReduce(final Class cmd, TestOptions opts) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); - Path inputDir = writeInputFile(conf); + Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); Job job = new Job(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); - job.setInputFormatClass(PeInputFormat.class); - PeInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(NLineInputFormat.class); + NLineInputFormat.setInputPaths(job, inputDir); + // this is default, but be explicit about it just in case. + NLineInputFormat.setNumLinesPerSplit(job, 1); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); @@ -728,7 +405,7 @@ public class PerformanceEvaluation extends Configured implements Tool { * @return Directory that contains file written. * @throws IOException */ - private Path writeInputFile(final Configuration c) throws IOException { + private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); Path inputDir = new Path(jobdir, "inputs"); @@ -741,20 +418,16 @@ public class PerformanceEvaluation extends Configured implements Tool { // Make input random. Map m = new TreeMap(); Hash h = MurmurHash.getInstance(); - int perClientRows = (this.R / this.N); + int perClientRows = (opts.totalRows / opts.numClientThreads); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SORT_PROPERTIES_ALPHABETICALLY, true); try { for (int i = 0; i < 10; i++) { - for (int j = 0; j < N; j++) { - String s = "tableName=" + this.tableName + - ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + - ", perClientRunRows=" + (perClientRows / 10) + - ", totalRows=" + this.R + - ", sampleRate=" + this.sampleRate + - ", clients=" + this.N + - ", flushCommits=" + this.flushCommits + - ", writeToWAL=" + this.writeToWAL + - ", reportLatency=" + this.reportLatency + - ", multiGet=" + this.multiGet; + for (int j = 0; j < opts.numClientThreads; j++) { + TestOptions next = new TestOptions(opts); + next.startRow = (j * perClientRows) + (i * (perClientRows/10)); + next.perClientRunRows = perClientRows / 10; + String s = mapper.writeValueAsString(next); int hash = h.hash(Bytes.toBytes(s)); m.put(hash, s); } @@ -796,81 +469,50 @@ public class PerformanceEvaluation extends Configured implements Tool { } /** - * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test - * tests}. This makes the reflection logic a little easier to understand... + * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}. + * This makes tracking all these arguments a little easier. */ static class TestOptions { - private int startRow; - private int perClientRunRows; - private int totalRows; - private float sampleRate; - private int numClientThreads; - private TableName tableName; - private boolean flushCommits; - private boolean writeToWAL = true; - private boolean reportLatency; - private int multiGet = 0; - private HConnection connection; - - TestOptions(int startRow, int perClientRunRows, int totalRows, float sampleRate, - int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL, - boolean reportLatency, int multiGet, HConnection connection) { - this.startRow = startRow; - this.perClientRunRows = perClientRunRows; - this.totalRows = totalRows; - this.sampleRate = sampleRate; - this.numClientThreads = numClientThreads; - this.tableName = tableName; - this.flushCommits = flushCommits; - this.writeToWAL = writeToWAL; - this.reportLatency = reportLatency; - this.multiGet = multiGet; - this.connection = connection; - } - public int getStartRow() { - return startRow; - } - - public int getPerClientRunRows() { - return perClientRunRows; - } - - public int getTotalRows() { - return totalRows; - } - - public float getSampleRate() { - return sampleRate; - } - - public int getNumClientThreads() { - return numClientThreads; - } - - public TableName getTableName() { - return tableName; - } - - public boolean isFlushCommits() { - return flushCommits; - } - - public boolean isWriteToWAL() { - return writeToWAL; - } - - public boolean isReportLatency() { - return reportLatency; - } - - public int getMultiGet() { - return multiGet; - } - - public HConnection getConnection() { - return connection; - } + public TestOptions() {} + + public TestOptions(TestOptions that) { + this.nomapred = that.nomapred; + this.startRow = that.startRow; + this.perClientRunRows = that.perClientRunRows; + this.numClientThreads = that.numClientThreads; + this.totalRows = that.totalRows; + this.sampleRate = that.sampleRate; + this.tableName = that.tableName; + this.flushCommits = that.flushCommits; + this.writeToWAL = that.writeToWAL; + this.useTags = that.useTags; + this.noOfTags = that.noOfTags; + this.reportLatency = that.reportLatency; + this.multiGet = that.multiGet; + this.inMemoryCF = that.inMemoryCF; + this.presplitRegions = that.presplitRegions; + this.compression = that.compression; + this.blockEncoding = that.blockEncoding; + } + + public boolean nomapred = false; + public int startRow = 0; + public int perClientRunRows = ROWS_PER_GB; + public int numClientThreads = 1; + public int totalRows = ROWS_PER_GB; + public float sampleRate = 1.0f; + public String tableName = TABLE_NAME; + public boolean flushCommits = true; + public boolean writeToWAL = true; + public boolean useTags = false; + public int noOfTags = 1; + public boolean reportLatency = false; + public int multiGet = 0; + boolean inMemoryCF = false; + int presplitRegions = 0; + public Compression.Algorithm compression = Compression.Algorithm.NONE; + public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; } /* @@ -880,44 +522,26 @@ public class PerformanceEvaluation extends Configured implements Tool { static abstract class Test { // Below is make it so when Tests are all running in the one // jvm, that they each have a differently seeded Random. - private static final Random randomSeed = - new Random(System.currentTimeMillis()); + private static final Random randomSeed = new Random(System.currentTimeMillis()); private static long nextRandomSeed() { return randomSeed.nextLong(); } protected final Random rand = new Random(nextRandomSeed()); + protected final Configuration conf; + protected final TestOptions opts; - protected final int startRow; - protected final int perClientRunRows; - protected final int totalRows; - protected final float sampleRate; private final Status status; - protected TableName tableName; - protected HTableInterface table; - protected volatile Configuration conf; - protected boolean flushCommits; - protected boolean writeToWAL; - protected boolean reportLatency; protected HConnection connection; + protected HTableInterface table; /** * Note that all subclasses of this class must provide a public contructor * that has the exact same list of arguments. */ Test(final Configuration conf, final TestOptions options, final Status status) { - super(); - this.startRow = options.getStartRow(); - this.perClientRunRows = options.getPerClientRunRows(); - this.totalRows = options.getTotalRows(); - this.sampleRate = options.getSampleRate(); - this.status = status; - this.tableName = options.getTableName(); - this.table = null; this.conf = conf; - this.flushCommits = options.isFlushCommits(); - this.writeToWAL = options.isWriteToWAL(); - this.reportLatency = options.isReportLatency(); - this.connection = options.getConnection(); + this.opts = options; + this.status = status; } private String generateStatus(final int sr, final int i, final int lr) { @@ -925,20 +549,22 @@ public class PerformanceEvaluation extends Configured implements Tool { } protected int getReportingPeriod() { - int period = this.perClientRunRows / 10; - return period == 0 ? this.perClientRunRows : period; + int period = opts.perClientRunRows / 10; + return period == 0 ? opts.perClientRunRows : period; } void testSetup() throws IOException { - this.table = connection.getTable(tableName); + this.connection = HConnectionManager.createConnection(conf); + this.table = connection.getTable(opts.tableName); this.table.setAutoFlush(false, true); } void testTakedown() throws IOException { - if (flushCommits) { + if (opts.flushCommits) { this.table.flushCommits(); } table.close(); + connection.close(); } /* @@ -962,12 +588,12 @@ public class PerformanceEvaluation extends Configured implements Tool { * Provides an extension point for tests that don't want a per row invocation. */ void testTimed() throws IOException { - int lastRow = this.startRow + this.perClientRunRows; + int lastRow = opts.startRow + opts.perClientRunRows; // Report on completion of 1/10th of total. - for (int i = this.startRow; i < lastRow; i++) { + for (int i = opts.startRow; i < lastRow; i++) { testRow(i); if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { - status.setStatus(generateStatus(this.startRow, i, lastRow)); + status.setStatus(generateStatus(opts.startRow, i, lastRow)); } } } @@ -988,7 +614,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { - Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); + Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows)); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setFilter(new WhileMatchFilter(new PageFilter(120))); ResultScanner s = this.table.getScanner(scan); @@ -998,8 +624,8 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override protected int getReportingPeriod() { - int period = this.perClientRunRows / 100; - return period == 0 ? this.perClientRunRows : period; + int period = opts.perClientRunRows / 100; + return period == 0 ? opts.perClientRunRows : period; } } @@ -1033,15 +659,15 @@ public class PerformanceEvaluation extends Configured implements Tool { protected abstract Pair getStartAndStopRow(); protected Pair generateStartAndStopRows(int maxRange) { - int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; + int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows; int stop = start + maxRange; return new Pair(format(start), format(stop)); } @Override protected int getReportingPeriod() { - int period = this.perClientRunRows / 100; - return period == 0? this.perClientRunRows: period; + int period = opts.perClientRunRows / 100; + return period == 0? opts.perClientRunRows: period; } } @@ -1091,24 +717,20 @@ public class PerformanceEvaluation extends Configured implements Tool { static class RandomReadTest extends Test { private final int everyN; - private final boolean reportLatency; private final double[] times; - private final int multiGet; private ArrayList gets; int idx = 0; RandomReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); - everyN = (int) (this.totalRows / (this.totalRows * this.sampleRate)); - LOG.info("Sampling 1 every " + everyN + " out of " + perClientRunRows + " total rows."); - this.reportLatency = options.isReportLatency(); - this.multiGet = options.getMultiGet(); - if (this.multiGet > 0) { - LOG.info("MultiGet enabled. Sending GETs in batches of " + this.multiGet + "."); - this.gets = new ArrayList(this.multiGet); + everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); + LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); + if (opts.multiGet > 0) { + LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); + this.gets = new ArrayList(opts.multiGet); } - if (this.reportLatency) { - this.times = new double[(int) Math.ceil(this.perClientRunRows * this.sampleRate / Math.max(1, this.multiGet))]; + if (opts.reportLatency) { + this.times = new double[(int) Math.ceil(opts.perClientRunRows * opts.sampleRate / Math.max(1, opts.multiGet))]; } else { this.times = null; } @@ -1117,14 +739,14 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { if (i % everyN == 0) { - Get get = new Get(getRandomRow(this.rand, this.totalRows)); + Get get = new Get(getRandomRow(this.rand, opts.totalRows)); get.addColumn(FAMILY_NAME, QUALIFIER_NAME); - if (this.multiGet > 0) { + if (opts.multiGet > 0) { this.gets.add(get); - if (this.gets.size() == this.multiGet) { + if (this.gets.size() == opts.multiGet) { long start = System.nanoTime(); this.table.get(this.gets); - if (this.reportLatency) { + if (opts.reportLatency) { times[idx++] = (System.nanoTime() - start) / 1e6; } this.gets.clear(); @@ -1132,7 +754,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } else { long start = System.nanoTime(); this.table.get(get); - if (this.reportLatency) { + if (opts.reportLatency) { times[idx++] = (System.nanoTime() - start) / 1e6; } } @@ -1141,8 +763,8 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override protected int getReportingPeriod() { - int period = this.perClientRunRows / 100; - return period == 0 ? this.perClientRunRows : period; + int period = opts.perClientRunRows / 100; + return period == 0 ? opts.perClientRunRows : period; } @Override @@ -1152,7 +774,7 @@ public class PerformanceEvaluation extends Configured implements Tool { this.gets.clear(); } super.testTakedown(); - if (this.reportLatency) { + if (opts.reportLatency) { Arrays.sort(times); DescriptiveStatistics ds = new DescriptiveStatistics(); for (double t : times) { @@ -1180,11 +802,11 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { - byte [] row = getRandomRow(this.rand, this.totalRows); + byte[] row = getRandomRow(this.rand, opts.totalRows); Put put = new Put(row); - byte[] value = generateValue(this.rand); + byte[] value = generateData(this.rand, VALUE_LENGTH); put.add(FAMILY_NAME, QUALIFIER_NAME, value); - put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); table.put(put); } } @@ -1209,7 +831,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { if (this.testScanner == null) { - Scan scan = new Scan(format(this.startRow)); + Scan scan = new Scan(format(opts.startRow)); scan.setCaching(30); scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); this.testScanner = table.getScanner(scan); @@ -1240,9 +862,9 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(final int i) throws IOException { Put put = new Put(format(i)); - byte[] value = generateValue(this.rand); + byte[] value = generateData(this.rand, VALUE_LENGTH); put.add(FAMILY_NAME, QUALIFIER_NAME, value); - put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); table.put(put); } } @@ -1256,7 +878,7 @@ public class PerformanceEvaluation extends Configured implements Tool { @Override void testRow(int i) throws IOException { - byte[] value = generateValue(this.rand); + byte[] value = generateData(this.rand, VALUE_LENGTH); Scan scan = constructScan(value); ResultScanner scanner = null; try { @@ -1319,11 +941,11 @@ public class PerformanceEvaluation extends Configured implements Tool { * consumes about 30% of CPU time. * @return Generated random value to insert into a table cell. */ - public static byte[] generateValue(final Random r) { - byte [] b = new byte [VALUE_LENGTH]; + public static byte[] generateData(final Random r, int length) { + byte [] b = new byte [length]; int i = 0; - for(i = 0; i < (VALUE_LENGTH-8); i += 8) { + for(i = 0; i < (length-8); i += 8) { b[i] = (byte) (65 + r.nextInt(26)); b[i+1] = b[i]; b[i+2] = b[i]; @@ -1335,7 +957,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } byte a = (byte) (65 + r.nextInt(26)); - for(; i < VALUE_LENGTH; i++) { + for(; i < length; i++) { b[i] = a; } return b; @@ -1345,26 +967,21 @@ public class PerformanceEvaluation extends Configured implements Tool { return format(random.nextInt(Integer.MAX_VALUE) % totalRows); } - long runOneClient(final Class cmd, final int startRow, - final int perClientRunRows, final int totalRows, final float sampleRate, - boolean flushCommits, boolean writeToWAL, boolean reportLatency, int multiGet, - HConnection connection, final Status status) - throws IOException { - status.setStatus("Start " + cmd + " at offset " + startRow + " for " + - perClientRunRows + " rows"); + static long runOneClient(final Class cmd, Configuration conf, TestOptions opts, + final Status status) + throws IOException { + status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " + + opts.perClientRunRows + " rows"); long totalElapsedTime = 0; - TestOptions options = new TestOptions(startRow, perClientRunRows, - totalRows, sampleRate, N, tableName, flushCommits, writeToWAL, - reportLatency, multiGet, connection); final Test t; try { - Constructor constructor = cmd.getDeclaredConstructor( - Configuration.class, TestOptions.class, Status.class); - t = constructor.newInstance(getConf(), options, status); + Constructor constructor = + cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class); + t = constructor.newInstance(conf, opts, status); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Invalid command class: " + - cmd.getName() + ". It does not provide a constructor as described by" + + cmd.getName() + ". It does not provide a constructor as described by " + "the javadoc comment. Available constructors are: " + Arrays.toString(cmd.getConstructors())); } catch (Exception e) { @@ -1373,40 +990,24 @@ public class PerformanceEvaluation extends Configured implements Tool { totalElapsedTime = t.test(); status.setStatus("Finished " + cmd + " in " + totalElapsedTime + - "ms at offset " + startRow + " for " + perClientRunRows + " rows" + - " (" + calculateMbps((int)(perClientRunRows * sampleRate), totalElapsedTime) + ")"); + "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" + + " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime) + ")"); return totalElapsedTime; } - private void runNIsOne(final Class cmd) throws IOException { - Status status = new Status() { - public void setStatus(String msg) throws IOException { - LOG.info(msg); - } - }; - + private void runTest(final Class cmd, TestOptions opts) throws IOException, + InterruptedException, ClassNotFoundException { HBaseAdmin admin = null; try { admin = new HBaseAdmin(getConf()); - checkTable(admin); - runOneClient(cmd, 0, this.R, this.R, this.sampleRate, this.flushCommits, - this.writeToWAL, this.reportLatency, this.multiGet, this.connection, status); - } catch (Exception e) { - LOG.error("Failed", e); + checkTable(admin, opts); } finally { if (admin != null) admin.close(); } - } - - private void runTest(final Class cmd) throws IOException, - InterruptedException, ClassNotFoundException { - if (N == 1) { - // If there is only one client and one HRegionServer, we assume nothing - // has been set up at all. - runNIsOne(cmd); + if (opts.nomapred) { + doLocalClients(cmd, opts); } else { - // Else, run - runNIsMoreThanOne(cmd); + doMapReduce(cmd, opts); } } @@ -1462,16 +1063,15 @@ public class PerformanceEvaluation extends Configured implements Tool { + " sequentialWrite 1"); } - private void getArgs(final int start, final String[] args) { + private static int getNumClients(final int start, final String[] args) { if(start + 1 > args.length) { throw new IllegalArgumentException("must supply the number of clients"); } - N = Integer.parseInt(args[start]); + int N = Integer.parseInt(args[start]); if (N < 1) { throw new IllegalArgumentException("Number of clients must be > 1"); } - // Set total number of rows to write. - this.R = this.R * N; + return N; } public int run(String[] args) throws Exception { @@ -1490,6 +1090,8 @@ public class PerformanceEvaluation extends Configured implements Tool { // Then you must adapt the LINE_PATTERN input regex, // and parse the argument, take a look at PEInputFormat.getSplits(). + TestOptions opts = new TestOptions(); + for (int i = 0; i < args.length; i++) { String cmd = args[i]; if (cmd.equals("-h") || cmd.startsWith("--h")) { @@ -1500,82 +1102,82 @@ public class PerformanceEvaluation extends Configured implements Tool { final String nmr = "--nomapred"; if (cmd.startsWith(nmr)) { - this.nomapred = true; + opts.nomapred = true; continue; } final String rows = "--rows="; if (cmd.startsWith(rows)) { - this.R = Integer.parseInt(cmd.substring(rows.length())); + opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); continue; } final String sampleRate = "--sampleRate="; if (cmd.startsWith(sampleRate)) { - this.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); + opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); continue; } final String table = "--table="; if (cmd.startsWith(table)) { - this.tableName = TableName.valueOf(cmd.substring(table.length())); + opts.tableName = cmd.substring(table.length()); continue; } final String compress = "--compress="; if (cmd.startsWith(compress)) { - this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); + opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); continue; } final String blockEncoding = "--blockEncoding="; if (cmd.startsWith(blockEncoding)) { - this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); + opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); continue; } final String flushCommits = "--flushCommits="; if (cmd.startsWith(flushCommits)) { - this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); + opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); continue; } final String writeToWAL = "--writeToWAL="; if (cmd.startsWith(writeToWAL)) { - this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); + opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); continue; } final String presplit = "--presplit="; if (cmd.startsWith(presplit)) { - this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); + opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); continue; } final String inMemory = "--inmemory="; if (cmd.startsWith(inMemory)) { - this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); + opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); continue; } final String latency = "--latency"; if (cmd.startsWith(latency)) { - this.reportLatency = true; + opts.reportLatency = true; continue; } final String multiGet = "--multiGet="; if (cmd.startsWith(multiGet)) { - this.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); + opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); continue; } - - this.connection = HConnectionManager.createConnection(getConf()); Class cmdClass = determineCommandClass(cmd); if (cmdClass != null) { - getArgs(i + 1, args); - runTest(cmdClass); + opts.numClientThreads = getNumClients(i + 1, args); + // number of rows specified + opts.totalRows = opts.perClientRunRows * opts.numClientThreads; + runTest(cmdClass, opts); errCode = 0; break; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index 5cfcdb0..5861b4b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -344,7 +344,7 @@ public class TestHFileOutputFormat { // first region start key is always empty ret[0] = HConstants.EMPTY_BYTE_ARRAY; for (int i = 1; i < numKeys; i++) { - ret[i] = PerformanceEvaluation.generateValue(random); + ret[i] = PerformanceEvaluation.generateData(random, PerformanceEvaluation.VALUE_LENGTH); } return ret; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 7ee97c3..26d7ddc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -342,7 +342,7 @@ public class TestHFileOutputFormat2 { // first region start key is always empty ret[0] = HConstants.EMPTY_BYTE_ARRAY; for (int i = 1; i < numKeys; i++) { - ret[i] = PerformanceEvaluation.generateValue(random); + ret[i] = PerformanceEvaluation.generateData(random, PerformanceEvaluation.ROW_LENGTH); } return ret; }