:100644 100644 d2331fa... 60de82e... M src/contrib/indexed/src/test/org/apache/hadoop/hbase/IdxPerformanceEvaluation.java :100644 100644 fec8eae... abc7d37... M src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java diff --git a/src/contrib/indexed/src/test/org/apache/hadoop/hbase/IdxPerformanceEvaluation.java b/src/contrib/indexed/src/test/org/apache/hadoop/hbase/IdxPerformanceEvaluation.java index d2331fa..60de82e 100644 --- a/src/contrib/indexed/src/test/org/apache/hadoop/hbase/IdxPerformanceEvaluation.java +++ b/src/contrib/indexed/src/test/org/apache/hadoop/hbase/IdxPerformanceEvaluation.java @@ -46,15 +46,9 @@ import java.io.IOException; * split in a single node cluster is very high which can cause timeout issues on * the client side, especially during the sequentialWrite test. *

- *

A suggested schema change would be to add a second column family and column - * to the 'TestTable' table. The second value would be the first ten bytes of - * the larger 1KB value. This would allow the scan to use an index hint to - * dramatically reduce the number of rows it needs to filter without the cost - * of keeping every 1KB value in memory. - *

- *

Another point that's mentioned in the help output is that this evaluation - * requires more than the default 1GB of VM memory to complete. - * See the {@link #printUsage(String)} output for more details. + *

This evaluation creates an index on the first two bytes to the value. This + * still provides a big performance boost without requiring huge amounts of memory. + *

*/ public class IdxPerformanceEvaluation extends PerformanceEvaluation { protected static final Log LOG = LogFactory.getLog(IdxPerformanceEvaluation.class); @@ -104,34 +98,16 @@ public class IdxPerformanceEvaluation extends PerformanceEvaluation { protected void printUsage(String message) { System.err.println(""); System.err.println( - "NOTE: In order to run this evaluration you need to ensure you have \n" + + "NOTE: In order to run this evaluation you need to ensure you have \n" + "enabled the IdxRegion in your hbase-site.xml." ); System.err.println(""); - System.err.println( - "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" - ); - System.err.println( - "WARNING: By default this evaluation creates an index on one million \n" + - "(specified by the 'rows' argument) randomly generated 1KB byte arrays. \n" + - "This means that in order to populate an index there must be \n" + - "((rows * 1000) * 1.2) bytes (1200 MB for default values) of \n" + - "memory allocated to the region servers. If you are running this \n" + - "evaluation on a single node cluster with the default memory \n" + - "configuration you'll need to increase the HBASE_HEAPSIZE \n" + - "environment variable to at least 1200 MB (preferably 1500 MB) \n" + - "in hbase-env.sh."); - System.err.println( - "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" - ); - System.err.println(""); super.printUsage(message); } static class IndexedFilteredScanTest extends FilteredScanTest { - public IndexedFilteredScanTest(final HBaseConfiguration conf, final int startRow, - final int perClientRunRows, final int totalRows, final Status status, byte[] tableName) { - super(conf, startRow, perClientRunRows, totalRows, status, tableName); + IndexedFilteredScanTest(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); } @Override diff --git a/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java b/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java index fec8eae..abc7d37 100644 --- a/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -24,6 +24,7 @@ import java.io.DataOutput; import java.io.IOException; import java.io.PrintStream; import java.io.File; +import java.lang.reflect.InvocationTargetException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; @@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Hash; import org.apache.hadoop.hbase.util.MurmurHash; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -117,14 +119,15 @@ public class PerformanceEvaluation implements HConstants { private boolean nomapred = false; private int N = 1; private int R = ROWS_PER_GB; + private boolean flushCommits = false; + private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); - /** * Regex to parse lines in input file passed to mapreduce task. */ public static final Pattern LINE_PATTERN = Pattern.compile("startRow=(\\d+),\\s+" + - "perClientRunRows=(\\d+),\\s+totalRows=(\\d+),\\s+clients=(\\d+)"); + "perClientRunRows=(\\d+),\\s+totalRows=(\\d+),\\s+clients=(\\d+),\\s+flushCommits=(\\w+)"); /** * Enum for map metrics. Keep it out here rather than inside in the Map @@ -144,13 +147,28 @@ public class PerformanceEvaluation implements HConstants { public PerformanceEvaluation(final HBaseConfiguration c) { this.conf = c; - addCommandDescriptor(RandomReadTest.class, "randomRead", "Run random read test"); - addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", "Run random seek and scan 100 test"); - addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); - addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); - addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); - addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); - addCommandDescriptor(FilteredScanTest.class, "filterScan", "Run scan test using a filter to find a specific row based on it's value"); + addCommandDescriptor(RandomReadTest.class, "randomRead", + "Run random read test"); + addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", + "Run random seek and scan 100 test"); + addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", + "Run random seek scan with both start and stop row (max 10 rows)"); + addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", + "Run random seek scan with both start and stop row (max 100 rows)"); + addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", + "Run random seek scan with both start and stop row (max 1000 rows)"); + addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", + "Run random seek scan with both start and stop row (max 10000 rows)"); + addCommandDescriptor(RandomWriteTest.class, "randomWrite", + "Run random write test"); + addCommandDescriptor(SequentialReadTest.class, "sequentialRead", + "Run sequential read test"); + addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", + "Run sequential write test"); + addCommandDescriptor(ScanTest.class, "scan", + "Run scan test (read every row)"); + addCommandDescriptor(FilteredScanTest.class, "filterScan", + "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)"); } protected void addCommandDescriptor(Class cmdClass, String name, String description) { @@ -181,19 +199,22 @@ public class PerformanceEvaluation implements HConstants { private int rows = 0; private int totalRows = 0; private int clients = 0; + private boolean flushCommits = false; public PeInputSplit() { this.startRow = 0; this.rows = 0; this.totalRows = 0; this.clients = 0; + this.flushCommits = false; } - public PeInputSplit(int startRow, int rows, int totalRows, int clients) { + public PeInputSplit(int startRow, int rows, int totalRows, int clients, boolean flushCommits) { this.startRow = startRow; this.rows = rows; this.totalRows = totalRows; this.clients = clients; + this.flushCommits = flushCommits; } @Override @@ -202,6 +223,7 @@ public class PerformanceEvaluation implements HConstants { this.rows = in.readInt(); this.totalRows = in.readInt(); this.clients = in.readInt(); + this.flushCommits = in.readBoolean(); } @Override @@ -210,6 +232,7 @@ public class PerformanceEvaluation implements HConstants { out.writeInt(rows); out.writeInt(totalRows); out.writeInt(clients); + out.writeBoolean(flushCommits); } @Override @@ -237,6 +260,10 @@ public class PerformanceEvaluation implements HConstants { public int getClients() { return clients; } + + public boolean isFlushCommits() { + return flushCommits; + } } /** @@ -268,14 +295,16 @@ public class PerformanceEvaluation implements HConstants { int rows = Integer.parseInt(m.group(2)); int totalRows = Integer.parseInt(m.group(3)); int clients = Integer.parseInt(m.group(4)); - + boolean flushCommits = Boolean.parseBoolean(m.group(5)); + LOG.debug("split["+ splitList.size() + "] " + " startRow=" + startRow + " rows=" + rows + " totalRows=" + totalRows + - " clients=" + clients); - - PeInputSplit newSplit = new PeInputSplit(startRow, rows, totalRows, clients); + " clients=" + clients + + " flushCommits=" + flushCommits); + + PeInputSplit newSplit = new PeInputSplit(startRow, rows, totalRows, clients, flushCommits); splitList.add(newSplit); } } @@ -352,20 +381,38 @@ public class PerformanceEvaluation implements HConstants { /** configuration parameter name that contains the command */ public final static String CMD_KEY = "EvaluationMapTask.command"; + /** configuration parameter name that contains the PE impl */ + public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; + private Class cmd; private PerformanceEvaluation pe; - + @Override protected void setup(Context context) throws IOException, InterruptedException { - String cmdClassName = context.getConfiguration().get(CMD_KEY); + this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); + + // this is required so that extensions of PE are instantiated within the + // map reduce task... + Class peClass = + forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); + try { + this.pe = peClass.getConstructor(HBaseConfiguration.class) + .newInstance(new HBaseConfiguration(context.getConfiguration())); + } catch (Exception e) { + throw new IllegalStateException("Could not instantiate PE instance", e); + } + } + + private Class forName(String className, Class type) { + Class clazz = null; try { - this.cmd = Class.forName(cmdClassName).asSubclass(Test.class); + clazz = Class.forName(className).asSubclass(type); } catch (ClassNotFoundException e) { - throw new IllegalStateException("Could not find class for name: " + cmdClassName, e); + throw new IllegalStateException("Could not find class for name: " + className, e); } - this.pe = new PerformanceEvaluation(new HBaseConfiguration(context.getConfiguration())); + return clazz; } - + protected void map(NullWritable key, PeInputSplit value, final Context context) throws IOException, InterruptedException { @@ -376,8 +423,8 @@ public class PerformanceEvaluation implements HConstants { }; // Evaluation task - long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), - value.getRows(), value.getTotalRows(), status); + long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), + value.getRows(), value.getTotalRows(), value.isFlushCommits(), status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); @@ -442,7 +489,7 @@ public class PerformanceEvaluation implements HConstants { try { long elapsedTime = pe.runOneClient(cmd, index * perClientRows, perClientRows, R, - new Status() { + flushCommits, new Status() { public void setStatus(final String msg) throws IOException { LOG.info("client-" + getName() + " " + msg); } @@ -481,6 +528,7 @@ public class PerformanceEvaluation implements HConstants { InterruptedException, ClassNotFoundException { Path inputDir = writeInputFile(this.conf); this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); + this.conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); Job job = new Job(this.conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); @@ -528,7 +576,8 @@ public class PerformanceEvaluation implements HConstants { String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + ", perClientRunRows=" + (perClientRows / 10) + ", totalRows=" + this.R + - ", clients=" + this.N; + ", clients=" + this.N + + ", flushCommits=" + this.flushCommits; int hash = h.hash(Bytes.toBytes(s)); m.put(hash, s); } @@ -569,6 +618,49 @@ public class PerformanceEvaluation implements HConstants { } } + /** + * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test + * tests}. This makes the reflection logic a little easier to understand... + */ + static class TestOptions { + private int startRow; + private int perClientRunRows; + private int totalRows; + private byte[] tableName; + private boolean flushCommits; + + TestOptions() { + } + + TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, boolean flushCommits) { + this.startRow = startRow; + this.perClientRunRows = perClientRunRows; + this.totalRows = totalRows; + this.tableName = tableName; + this.flushCommits = flushCommits; + } + + public int getStartRow() { + return startRow; + } + + public int getPerClientRunRows() { + return perClientRunRows; + } + + public int getTotalRows() { + return totalRows; + } + + public byte[] getTableName() { + return tableName; + } + + public boolean isFlushCommits() { + return flushCommits; + } + } + /* * A test. * Subclass to particularize what happens per row. @@ -583,21 +675,22 @@ public class PerformanceEvaluation implements HConstants { protected HBaseAdmin admin; protected HTable table; protected volatile HBaseConfiguration conf; + protected boolean flushCommits; /** * Note that all subclasses of this class must provide a public contructor * that has the exact same list of arguments. */ - Test(final HBaseConfiguration conf, final int startRow, - final int perClientRunRows, final int totalRows, final Status status, byte[] tableName) { + Test(final HBaseConfiguration conf, final TestOptions options, final Status status) { super(); - this.startRow = startRow; - this.perClientRunRows = perClientRunRows; - this.totalRows = totalRows; + this.startRow = options.getStartRow(); + this.perClientRunRows = options.getPerClientRunRows(); + this.totalRows = options.getTotalRows(); this.status = status; - this.tableName = tableName; + this.tableName = options.getTableName(); this.table = null; this.conf = conf; + this.flushCommits = options.isFlushCommits(); } private String generateStatus(final int sr, final int i, final int lr) { @@ -618,7 +711,9 @@ public class PerformanceEvaluation implements HConstants { } void testTakedown() throws IOException { - this.table.flushCommits(); + if (flushCommits) { + this.table.flushCommits(); + } } /* @@ -662,11 +757,10 @@ public class PerformanceEvaluation implements HConstants { } static class RandomSeekScanTest extends Test { - RandomSeekScanTest(final HBaseConfiguration conf, final int startRow, - final int perClientRunRows, final int totalRows, final Status status, byte[] tableName) { - super(conf, startRow, perClientRunRows, totalRows, status, tableName); + RandomSeekScanTest(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); } - + @Override void testRow(final int i) throws IOException { Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); @@ -688,12 +782,95 @@ public class PerformanceEvaluation implements HConstants { } + static abstract class RandomScanWithRangeTest extends Test { + RandomScanWithRangeTest(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) throws IOException { + Pair startAndStopRow = getStartAndStopRow(); + Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); + scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + ResultScanner s = this.table.getScanner(scan); + int count = 0; + for (Result rr = null; (rr = s.next()) != null;) { + count++; + } + + if (i % 100 == 0) { + LOG.info(String.format("Scan for key range %s - %s returned %s rows", + Bytes.toString(startAndStopRow.getFirst()), + Bytes.toString(startAndStopRow.getSecond()), count)); + } + + s.close(); + } + + protected abstract Pair getStartAndStopRow(); + + protected Pair generateStartAndStopRows(int maxRange) { + int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; + int stop = start + maxRange; + return Pair.of(format(start), format(stop)); + } + + @Override + protected int getReportingPeriod() { + int period = this.perClientRunRows / 100; + return period == 0? this.perClientRunRows: period; + } + } + + static class RandomScanWithRange10Test extends RandomScanWithRangeTest { + RandomScanWithRange10Test(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + protected Pair getStartAndStopRow() { + return generateStartAndStopRows(10); + } + } + + static class RandomScanWithRange100Test extends RandomScanWithRangeTest { + RandomScanWithRange100Test(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + protected Pair getStartAndStopRow() { + return generateStartAndStopRows(100); + } + } + + static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { + RandomScanWithRange1000Test(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + protected Pair getStartAndStopRow() { + return generateStartAndStopRows(1000); + } + } + + static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { + RandomScanWithRange10000Test(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + protected Pair getStartAndStopRow() { + return generateStartAndStopRows(10000); + } + } + static class RandomReadTest extends Test { - public RandomReadTest(final HBaseConfiguration conf, final int startRow, - final int perClientRunRows, final int totalRows, final Status status, byte[] tableName) { - super(conf, startRow, perClientRunRows, totalRows, status, tableName); + RandomReadTest(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); } - + @Override void testRow(final int i) throws IOException { Get get = new Get(getRandomRow(this.rand, this.totalRows)); @@ -710,9 +887,8 @@ public class PerformanceEvaluation implements HConstants { } static class RandomWriteTest extends Test { - RandomWriteTest(final HBaseConfiguration conf, final int startRow, - final int perClientRunRows, final int totalRows, final Status status, byte[] tableName) { - super(conf, startRow, perClientRunRows, totalRows, status, tableName); + RandomWriteTest(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); } @Override @@ -727,10 +903,9 @@ public class PerformanceEvaluation implements HConstants { static class ScanTest extends Test { private ResultScanner testScanner; - - ScanTest(final HBaseConfiguration conf, final int startRow, - final int perClientRunRows, final int totalRows, final Status status, byte[] tableName) { - super(conf, startRow, perClientRunRows, totalRows, status, tableName); + + ScanTest(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); } @Override @@ -758,9 +933,8 @@ public class PerformanceEvaluation implements HConstants { } static class SequentialReadTest extends Test { - SequentialReadTest(final HBaseConfiguration conf, final int startRow, - final int perClientRunRows, final int totalRows, final Status status, byte[] tableName) { - super(conf, startRow, perClientRunRows, totalRows, status, tableName); + SequentialReadTest(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); } @Override @@ -773,9 +947,8 @@ public class PerformanceEvaluation implements HConstants { } static class SequentialWriteTest extends Test { - SequentialWriteTest(final HBaseConfiguration conf, final int startRow, - final int perClientRunRows, final int totalRows, final Status status, byte[] tableName) { - super(conf, startRow, perClientRunRows, totalRows, status, tableName); + SequentialWriteTest(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); } @Override @@ -791,28 +964,21 @@ public class PerformanceEvaluation implements HConstants { static class FilteredScanTest extends Test { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); - private static final int COUNT = 20; - - FilteredScanTest(final HBaseConfiguration conf, final int startRow, - final int perClientRunRows, final int totalRows, - final Status status, byte[] tableName) { - super(conf, startRow, perClientRunRows, totalRows, status, tableName); + FilteredScanTest(HBaseConfiguration conf, TestOptions options, Status status) { + super(conf, options, status); } @Override - void testTimed() throws IOException { - for (int i = 0; i < COUNT; i++) { - byte[] value = generateValue(this.rand); - Scan scan = constructScan(value); - ResultScanner scanner = null; - try { - scanner = this.table.getScanner(scan); - while (scanner.next() != null) { - } - } finally { - LOG.info("Completed scan " + i + " of " + COUNT); - if (scanner != null) scanner.close(); + void testRow(int i) throws IOException { + byte[] value = generateValue(this.rand); + Scan scan = constructScan(value); + ResultScanner scanner = null; + try { + scanner = this.table.getScanner(scan); + while (scanner.next() != null) { } + } finally { + if (scanner != null) scanner.close(); } } @@ -861,28 +1027,27 @@ public class PerformanceEvaluation implements HConstants { } long runOneClient(final Class cmd, final int startRow, - final int perClientRunRows, final int totalRows, final Status status) + final int perClientRunRows, final int totalRows, boolean flushCommits, final Status status) throws IOException { status.setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); long totalElapsedTime = 0; Test t = null; + TestOptions options = new TestOptions(startRow, perClientRunRows, + totalRows, getTableDescriptor().getName(), flushCommits); try { Constructor constructor = cmd.getDeclaredConstructor( HBaseConfiguration.class, - int.class, - int.class, - int.class, - Status.class, - byte[].class + TestOptions.class, + Status.class ); - t = constructor.newInstance(this.conf, startRow, perClientRunRows, - totalRows, status, getTableDescriptor().getName()); + t = constructor.newInstance(this.conf, options, status); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Invalid command class: " + cmd.getName() + ". It does not provide a constructor as described by" + - "the javadoc comment. Available consctructors are: " + Arrays.toString(cmd.getConstructors())); + "the javadoc comment. Available constructors are: " + + Arrays.toString(cmd.getConstructors())); } catch (Exception e) { throw new IllegalStateException("Failed to construct command class", e); } @@ -904,7 +1069,7 @@ public class PerformanceEvaluation implements HConstants { try { admin = new HBaseAdmin(this.conf); checkTable(admin); - runOneClient(cmd, 0, this.R, this.R, status); + runOneClient(cmd, 0, this.R, this.R, this.flushCommits, status); } catch (Exception e) { LOG.error("Failed", e); } @@ -966,6 +1131,7 @@ public class PerformanceEvaluation implements HConstants { System.err.println(" nomapred Run multiple clients using threads " + "(rather than use mapreduce)"); System.err.println(" rows Rows each client runs. Default: One million"); + System.err.println(" flushCommits Used to determine if the test should flush the table. Default: false"); System.err.println(); System.err.println("Command:"); for (CmdDescriptor command : commands.values()) { @@ -1030,6 +1196,12 @@ public class PerformanceEvaluation implements HConstants { continue; } + final String flushCommits = "--flushCommits="; + if (cmd.startsWith(rows)) { + this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); + continue; + } + Class cmdClass = determineCommandClass(cmd); if (cmdClass != null) { getArgs(i + 1, args);