diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java new file mode 100644 index 0000000..e3ad993 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java @@ -0,0 +1,269 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import com.google.common.base.Objects; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.hadoop.hbase.chaos.actions.Action; +import org.apache.hadoop.hbase.chaos.actions.MoveRegionsOfTableAction; +import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingTableAction; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.mapreduce.Job; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.lang.String.format; +import static org.junit.Assert.assertTrue; + +/** + * Test for comparing the performance impact of region replicas. Uses + * components of {@link PerformanceEvaluation} to run a mapreduce job. Modeled + * after {@link org.apache.hadoop.hbase.mttr.IntegrationTestMTTR}. + */ +@Category(IntegrationTests.class) +public class IntegrationTestRegionReplicaPerf { + + private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class); + + private static long sleepTime; + private static final String SLEEP_TIME_KEY = "hbase.IntegrationTestRegionReplicaPerf.sleeptime"; + private static final long SLEEP_TIME_DEFAULT = 1 * 1000l; + private static TableName tableName; + private static final String TABLE_NAME_KEY = "hbase.IntegrationTestRegionReplicaPerf.tableName"; + private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf"; + private static IntegrationTestingUtility util; + + /** Executor for test threads */ + private static ExecutorService executorService; + + /** + * Callable for executing a ChaosMonkey {@link Action}. Assumed to be + * idempotent but not thread safe. Meaning, {@code #call} can be invoked + * multiple times on the same instance, though not concurrently. Return value + * is always {@code true}. + */ + static class ActionCallable implements Callable { + private final Action action; + + public ActionCallable(Action action) { + this.action = action; + } + + @Override + public Boolean call() throws Exception { + this.action.perform(); + return true; + } + } + + static class PerfEvalCallable implements Callable { + private final Queue argv = new LinkedList(); + + public PerfEvalCallable(String argv) { + this.argv.addAll(Arrays.asList(argv.split(" "))); + } + + @Override + public TimingResult call() throws Exception { + PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv); + Job job = PerformanceEvaluation.doMapReduce(opts, util.getConfiguration()); + long numRows = + job.getCounters().findCounter(PerformanceEvaluation.Counter.ROWS).getValue(); + long elapsedTime = + job.getCounters().findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue(); + return new TimingResult(numRows, elapsedTime); + } + } + + /** + * Record the results from a single {@link PerformanceEvaluation} job run. + */ + static class TimingResult { + public long numRows; + public long elapsedTime; + + public TimingResult(long numRows, long elapsedTime) { + this.numRows = numRows; + this.elapsedTime = elapsedTime; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("numRows", numRows) + .add("elapsedTime", elapsedTime) + .toString(); + } + } + + @BeforeClass + public static void setUp() throws Exception { + if (util == null) { + util = new IntegrationTestingUtility(); + // TODO: splits disabled until "phase 2" is complete. + util.getConfiguration().set("hbase.regionserver.region.split.policy", + DisabledRegionSplitPolicy.class.getName()); + } + + // have enough slaves to spread the replicas around + util.initializeCluster(4); + executorService = Executors.newFixedThreadPool(2); + + sleepTime = util.getConfiguration().getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); + tableName = TableName.valueOf( + util.getConfiguration().get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT)); + } + + @AfterClass + public static void after() throws IOException { + util.restoreCluster(); + util = null; + executorService.shutdown(); + executorService = null; + } + + @Test + public void testRestartRsHoldingTable() throws Exception { + Action action = new RestartRsHoldingTableAction(sleepTime, tableName.getNameAsString()); + action.init(new Action.ActionContext(util)); + run(new ActionCallable(action), "RestartRsHoldingTableAction"); + } + + @Test + public void testMoveRegion() throws Exception { + Action action = new MoveRegionsOfTableAction(sleepTime, tableName.getNameAsString()); + action.init(new Action.ActionContext(util)); + run(new ActionCallable(action), "MoveRegionsOfTableAction"); + } + + /** + * Run a test without a monkey on its back. + */ + private static TimingResult run(Callable testCallable, ExecutorService service) + throws InterruptedException, ExecutionException { + Future testResult = service.submit(testCallable); + long duration, start = System.nanoTime(); + TimingResult result = testResult.get(); + duration = TimeUnit.SECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS); + LOG.debug("Test completed after " + duration + "secs."); + return result; + } + + /** + * Run a test with a monkey on its back. + */ + private static TimingResult run(Callable testCallable, + Callable monkeyCallable, ExecutorService service) + throws InterruptedException, ExecutionException { + Future monkeyFuture = service.submit(monkeyCallable); + Future testFuture = service.submit(testCallable); + + TimingResult result = null; + long duration, start = System.nanoTime(); + // check on test every 10 seconds for 500 seconds. Spin up another monkey if the current + // one completes before test. + for (int timeoutCnt = 0; timeoutCnt < 50; timeoutCnt++) { + try { + result = testFuture.get(10, TimeUnit.SECONDS); + duration = TimeUnit.SECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS); + LOG.debug("Test completed after " + timeoutCnt + " iterations, " + duration + "secs."); + break; + } catch (TimeoutException e) { + if (monkeyFuture.isDone()) { + // monkey not running but test job is. Kick off another monkey. + LOG.debug("Monkey completed before test worker. Launching another monkey."); + monkeyFuture = executorService.submit(monkeyCallable); + } + } + } + // wait for the monkey to finish + monkeyFuture.get(); + return result; + } + + /** + * Run {@code testname} and aggregate results. + */ + public void run(Callable monkeyCallable, String testName) throws Exception { + int maxIters = 3; + String replicas = "--replicas=2"; + String writeOpts = + format("%s --table=%s --presplit=16 sequentialWrite 4", replicas, tableName); + String readOpts = format("--table=%s --latency --sampleRate=0.1 randomRead 4", tableName); + String replicaReadOpts = format("%s %s", replicas, readOpts); + + ArrayList resultsWithoutReplica = new ArrayList(maxIters); + ArrayList resultsWithReplica = new ArrayList(maxIters); + Callable withoutReplicasCallable = new PerfEvalCallable(readOpts); + Callable withReplicasCallable = new PerfEvalCallable(replicaReadOpts); + + // create/populate the table + new PerfEvalCallable(writeOpts).call(); + + // collect a baseline without region replicas. + for (int i = 0; i < maxIters; i++) { + resultsWithoutReplica.add(run(withoutReplicasCallable, executorService)); + // TODO: is sleep necessary? + Thread.sleep(5000l); + } + // run test with region replicas. + for (int i = 0; i < maxIters; i++) { + resultsWithReplica.add(run(withReplicasCallable, monkeyCallable, executorService)); + // TODO: is sleep necessary? + Thread.sleep(5000l); + } + + DescriptiveStatistics withoutReplicaStats = new DescriptiveStatistics(); + for (int i = 0; i < resultsWithoutReplica.size(); i++) { + withoutReplicaStats.addValue(resultsWithoutReplica.get(i).elapsedTime); + } + DescriptiveStatistics withReplicaStats = new DescriptiveStatistics(); + for (int i = 0; i < resultsWithReplica.size(); i++) { + withReplicaStats.addValue(resultsWithReplica.get(i).elapsedTime); + } + + LOG.info(Objects.toStringHelper("testName") + .add("withoutReplicas", resultsWithoutReplica) + .add("withReplicas", resultsWithReplica) + .add("withoutReplicasMean", withoutReplicaStats.getMean()) + .add("withReplicasMean", withReplicaStats.getMean()) + .toString()); + + assertTrue( + "Running with region replicas under chaos should be as fast or faster than without.", + withReplicaStats.getMean() <= withoutReplicaStats.getMean()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index ea74b5b..539fbc4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -28,7 +28,9 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; +import java.util.LinkedList; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -45,6 +47,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -86,16 +89,17 @@ import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTI * client that steps through one of a set of hardcoded tests or 'experiments' * (e.g. a random reads test, a random writes test, etc.). Pass on the * command-line which test to run and how many clients are participating in - * this experiment. Run java PerformanceEvaluation --help to - * obtain usage. + * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage. * *

This class sets up and runs the evaluation programs described in * Section 7, Performance Evaluation, of the Bigtable * paper, pages 8-10. * - *

If number of clients > 1, we start up a MapReduce job. Each map task - * runs an individual client. Each client does about 1GB of data. + *

By default, runs as a mapreduce job where each mapper runs a single test + * client. Can also run as a non-mapreduce, multithreaded application by + * specifying {@code --nomapred}. Each client does about 1GB of data, unless + * specified otherwise. */ public class PerformanceEvaluation extends Configured implements Tool { protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); @@ -116,10 +120,35 @@ public class PerformanceEvaluation extends Configured implements Tool { private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024); private static final TestOptions DEFAULT_OPTS = new TestOptions(); - protected Map commands = new TreeMap(); - + private static Map COMMANDS = new TreeMap(); private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); + static { + addCommandDescriptor(RandomReadTest.class, "randomRead", + "Run random read test"); + addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", + "Run random seek and scan 100 test"); + addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", + "Run random seek scan with both start and stop row (max 10 rows)"); + addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", + "Run random seek scan with both start and stop row (max 100 rows)"); + addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", + "Run random seek scan with both start and stop row (max 1000 rows)"); + addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", + "Run random seek scan with both start and stop row (max 10000 rows)"); + addCommandDescriptor(RandomWriteTest.class, "randomWrite", + "Run random write test"); + addCommandDescriptor(SequentialReadTest.class, "sequentialRead", + "Run sequential read test"); + addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", + "Run sequential write test"); + addCommandDescriptor(ScanTest.class, "scan", + "Run scan test (read every row)"); + addCommandDescriptor(FilteredScanTest.class, "filterScan", + "Run scan test using a filter to find a specific row based on it's value " + + "(make sure to use --rows=20)"); + } + /** * Enum for map metrics. Keep it out here rather than inside in the Map * inner-class so we can find associated properties. @@ -137,36 +166,12 @@ public class PerformanceEvaluation extends Configured implements Tool { */ public PerformanceEvaluation(final Configuration conf) { super(conf); - - addCommandDescriptor(RandomReadTest.class, "randomRead", - "Run random read test"); - addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", - "Run random seek and scan 100 test"); - addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", - "Run random seek scan with both start and stop row (max 10 rows)"); - addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", - "Run random seek scan with both start and stop row (max 100 rows)"); - addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", - "Run random seek scan with both start and stop row (max 1000 rows)"); - addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", - "Run random seek scan with both start and stop row (max 10000 rows)"); - addCommandDescriptor(RandomWriteTest.class, "randomWrite", - "Run random write test"); - addCommandDescriptor(SequentialReadTest.class, "sequentialRead", - "Run sequential read test"); - addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", - "Run sequential write test"); - addCommandDescriptor(ScanTest.class, "scan", - "Run scan test (read every row)"); - addCommandDescriptor(FilteredScanTest.class, "filterScan", - "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)"); } - protected void addCommandDescriptor(Class cmdClass, + protected static void addCommandDescriptor(Class cmdClass, String name, String description) { - CmdDescriptor cmdDescriptor = - new CmdDescriptor(cmdClass, name, description); - commands.put(name, cmdDescriptor); + CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description); + COMMANDS.put(name, cmdDescriptor); } /** @@ -244,35 +249,47 @@ public class PerformanceEvaluation extends Configured implements Tool { } /* - * If table does not already exist, create. - * @param c Client to use checking. - * @return True if we created the table. - * @throws IOException + * If table does not already exist, create. Also create a table when + * {@code opts.presplitRegions} is specified or when the existing table's + * region replica count doesn't matc {@code opts.replicas}. */ private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException { - HTableDescriptor tableDescriptor = getTableDescriptor(opts); - if (opts.presplitRegions > 0) { - // presplit requested - if (admin.tableExists(tableDescriptor.getTableName())) { - admin.disableTable(tableDescriptor.getTableName()); - admin.deleteTable(tableDescriptor.getTableName()); - } - - byte[][] splits = getSplits(opts); - for (int i=0; i < splits.length; i++) { - LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); - } - admin.createTable(tableDescriptor, splits); - LOG.info ("Table created with " + opts.presplitRegions + " splits"); - } - else { - boolean tableExists = admin.tableExists(tableDescriptor.getTableName()); - if (!tableExists) { - admin.createTable(tableDescriptor); - LOG.info("Table " + tableDescriptor + " created"); + TableName tableName = TableName.valueOf(opts.tableName); + boolean needsDelete = false, exists = admin.tableExists(tableName); + HTableDescriptor desc = + exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null; + byte[][] splits = getSplits(opts); + + // recreate the table when user has requested presplit or when existing replica count does + // not match requested replica acount. + if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions) + || (desc != null && desc.getRegionReplication() != opts.replicas)) { + needsDelete = true; + } + + // remove an existing table + if (needsDelete) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + + // table creation is necessary + if (!exists || needsDelete) { + desc = getTableDescriptor(opts); + if (splits != null) { + if (LOG.isDebugEnabled()) { + for (int i = 0; i < splits.length; i++) { + LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i])); + } + } + admin.createTable(desc, splits); + LOG.info("Table " + desc + "created with " + opts.presplitRegions + " splits"); + } else { + admin.createTable(desc); + LOG.info("Table " + desc + " created"); } } - return admin.tableExists(tableDescriptor.getTableName()); + return admin.tableExists(tableName); } /** @@ -287,6 +304,9 @@ public class PerformanceEvaluation extends Configured implements Tool { family.setInMemory(true); } desc.addFamily(family); + if (opts.replicas != DEFAULT_OPTS.replicas) { + desc.setRegionReplication(opts.replicas); + } return desc; } @@ -294,8 +314,8 @@ public class PerformanceEvaluation extends Configured implements Tool { * generates splits based on total number of rows and specified split regions */ protected static byte[][] getSplits(TestOptions opts) { - if (opts.presplitRegions == 0) - return new byte [0][]; + if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions) + return null; int numSplitPoints = opts.presplitRegions - 1; byte[][] splits = new byte[numSplitPoints][]; @@ -312,8 +332,10 @@ public class PerformanceEvaluation extends Configured implements Tool { * @param cmd Command to run. * @throws IOException */ - private void doLocalClients(final Class cmd, final TestOptions opts) + static void doLocalClients(final TestOptions opts, final Configuration conf) throws IOException, InterruptedException { + final Class cmd = determineCommandClass(opts.cmdName); + assert cmd != null; Future[] threads = new Future[opts.numClientThreads]; long[] timings = new long[opts.numClientThreads]; ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, @@ -325,7 +347,7 @@ public class PerformanceEvaluation extends Configured implements Tool { public Long call() throws Exception { TestOptions threadOpts = new TestOptions(opts); threadOpts.startRow = index * threadOpts.perClientRunRows; - long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() { + long elapsedTime = runOneClient(cmd, conf, threadOpts, new Status() { public void setStatus(final String msg) throws IOException { LOG.info("client-" + Thread.currentThread().getName() + " " + msg); } @@ -353,9 +375,9 @@ public class PerformanceEvaluation extends Configured implements Tool { total += timings[i]; } LOG.info("[" + test + "]" - + "\tMin: " + timings[0] + "ms" - + "\tMax: " + timings[timings.length - 1] + "ms" - + "\tAvg: " + (total / timings.length) + "ms"); + + "\tMin: " + timings[0] + "ms" + + "\tMax: " + timings[timings.length - 1] + "ms" + + "\tAvg: " + (total / timings.length) + "ms"); } /* @@ -365,12 +387,13 @@ public class PerformanceEvaluation extends Configured implements Tool { * @param cmd Command to run. * @throws IOException */ - private void doMapReduce(final Class cmd, TestOptions opts) throws IOException, - InterruptedException, ClassNotFoundException { - Configuration conf = getConf(); + static Job doMapReduce(TestOptions opts, final Configuration conf) + throws IOException, InterruptedException, ClassNotFoundException { + final Class cmd = determineCommandClass(opts.cmdName); + assert cmd != null; Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); - conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); + conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); Job job = new Job(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); @@ -399,6 +422,7 @@ public class PerformanceEvaluation extends Configured implements Tool { TableMapReduceUtil.initCredentials(job); job.waitForCompletion(true); + return job; } /* @@ -407,7 +431,7 @@ public class PerformanceEvaluation extends Configured implements Tool { * @return Directory that contains file written. * @throws IOException */ - private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { + private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); Path inputDir = new Path(jobdir, "inputs"); @@ -479,6 +503,7 @@ public class PerformanceEvaluation extends Configured implements Tool { public TestOptions() {} public TestOptions(TestOptions that) { + this.cmdName = that.cmdName; this.nomapred = that.nomapred; this.startRow = that.startRow; this.perClientRunRows = that.perClientRunRows; @@ -494,10 +519,12 @@ public class PerformanceEvaluation extends Configured implements Tool { this.multiGet = that.multiGet; this.inMemoryCF = that.inMemoryCF; this.presplitRegions = that.presplitRegions; + this.replicas = that.replicas; this.compression = that.compression; this.blockEncoding = that.blockEncoding; } + public String cmdName = null; public boolean nomapred = false; public int startRow = 0; public int perClientRunRows = ROWS_PER_GB; @@ -511,8 +538,9 @@ public class PerformanceEvaluation extends Configured implements Tool { public int noOfTags = 1; public boolean reportLatency = false; public int multiGet = 0; - boolean inMemoryCF = false; - int presplitRegions = 0; + public boolean inMemoryCF = false; + public int presplitRegions = 0; + public int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION; public Compression.Algorithm compression = Compression.Algorithm.NONE; public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE; } @@ -720,12 +748,14 @@ public class PerformanceEvaluation extends Configured implements Tool { static class RandomReadTest extends Test { private final int everyN; private final double[] times; + private final Consistency consistency; private ArrayList gets; int idx = 0; RandomReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); + consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE; LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); if (opts.multiGet > 0) { LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + "."); @@ -743,6 +773,7 @@ public class PerformanceEvaluation extends Configured implements Tool { if (i % everyN == 0) { Get get = new Get(getRandomRow(this.rand, opts.totalRows)); get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + get.setConsistency(consistency); if (opts.multiGet > 0) { this.gets.add(get); if (this.gets.size() == opts.multiGet) { @@ -1032,9 +1063,9 @@ public class PerformanceEvaluation extends Configured implements Tool { if (admin != null) admin.close(); } if (opts.nomapred) { - doLocalClients(cmd, opts); + doLocalClients(opts, getConf()); } else { - doMapReduce(cmd, opts); + doMapReduce(opts, getConf()); } } @@ -1073,6 +1104,9 @@ public class PerformanceEvaluation extends Configured implements Tool { "This works only if usetags is true."); System.err.println(" latency Set to report operation latencies. " + "Currently only supported by randomRead test. Default: False"); + System.err.println(" multiGet Batch gets together into groups of N. Only supported " + + "by randomRead. Default: disabled"); + System.err.println(" replicas Enable region replica testing. Defaults: 1."); System.err.println(); System.err.println(" Note: -D properties will be applied to the conf used. "); System.err.println(" For example: "); @@ -1080,7 +1114,7 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" -Dmapreduce.task.timeout=60000"); System.err.println(); System.err.println("Command:"); - for (CmdDescriptor command : commands.values()) { + for (CmdDescriptor command : COMMANDS.values()) { System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); } System.err.println(); @@ -1094,140 +1128,155 @@ public class PerformanceEvaluation extends Configured implements Tool { + " sequentialWrite 1"); } - private static int getNumClients(final int start, final String[] args) { - if(start + 1 > args.length) { - throw new IllegalArgumentException("must supply the number of clients"); - } - int N = Integer.parseInt(args[start]); - if (N < 1) { - throw new IllegalArgumentException("Number of clients must be > 1"); - } - return N; - } + /** + * Parse options passed in via an arguments array. Assumes that array has been split + * on white-space and placed into a {@code Queue}. Any unknown arguments will remain + * in the queue at the conclusion of this method call. It's up to the caller to deal + * with these unrecognized arguments. + */ + static TestOptions parseOpts(Queue args) { + TestOptions opts = new TestOptions(); + + String cmd = null; + while ((cmd = args.poll()) != null) { + if (cmd.equals("-h") || cmd.startsWith("--h")) { + // place item back onto queue so that caller knows parsing was incomplete + args.add(cmd); + break; + } - public int run(String[] args) throws Exception { - // Process command-line args. TODO: Better cmd-line processing - // (but hopefully something not as painful as cli options). - int errCode = -1; - if (args.length < 1) { - printUsage(); - return errCode; - } + final String nmr = "--nomapred"; + if (cmd.startsWith(nmr)) { + opts.nomapred = true; + continue; + } - try { - // MR-NOTE: if you are adding a property that is used to control an operation - // like put(), get(), scan(), ... you must also add it as part of the MR - // input, take a look at writeInputFile(). - // Then you must adapt the LINE_PATTERN input regex, - // and parse the argument, take a look at PEInputFormat.getSplits(). - - TestOptions opts = new TestOptions(); - - for (int i = 0; i < args.length; i++) { - String cmd = args[i]; - if (cmd.equals("-h") || cmd.startsWith("--h")) { - printUsage(); - errCode = 0; - break; - } + final String rows = "--rows="; + if (cmd.startsWith(rows)) { + opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); + continue; + } - final String nmr = "--nomapred"; - if (cmd.startsWith(nmr)) { - opts.nomapred = true; - continue; - } + final String sampleRate = "--sampleRate="; + if (cmd.startsWith(sampleRate)) { + opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); + continue; + } - final String rows = "--rows="; - if (cmd.startsWith(rows)) { - opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length())); - continue; - } + final String table = "--table="; + if (cmd.startsWith(table)) { + opts.tableName = cmd.substring(table.length()); + continue; + } - final String sampleRate = "--sampleRate="; - if (cmd.startsWith(sampleRate)) { - opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); - continue; - } + final String compress = "--compress="; + if (cmd.startsWith(compress)) { + opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); + continue; + } - final String table = "--table="; - if (cmd.startsWith(table)) { - opts.tableName = cmd.substring(table.length()); - continue; - } + final String blockEncoding = "--blockEncoding="; + if (cmd.startsWith(blockEncoding)) { + opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); + continue; + } - final String compress = "--compress="; - if (cmd.startsWith(compress)) { - opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length())); - continue; - } + final String flushCommits = "--flushCommits="; + if (cmd.startsWith(flushCommits)) { + opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); + continue; + } - final String blockEncoding = "--blockEncoding="; - if (cmd.startsWith(blockEncoding)) { - opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length())); - continue; - } + final String writeToWAL = "--writeToWAL="; + if (cmd.startsWith(writeToWAL)) { + opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); + continue; + } - final String flushCommits = "--flushCommits="; - if (cmd.startsWith(flushCommits)) { - opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length())); - continue; - } + final String presplit = "--presplit="; + if (cmd.startsWith(presplit)) { + opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); + continue; + } - final String writeToWAL = "--writeToWAL="; - if (cmd.startsWith(writeToWAL)) { - opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length())); - continue; - } + final String inMemory = "--inmemory="; + if (cmd.startsWith(inMemory)) { + opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); + continue; + } - final String presplit = "--presplit="; - if (cmd.startsWith(presplit)) { - opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length())); - continue; - } - - final String inMemory = "--inmemory="; - if (cmd.startsWith(inMemory)) { - opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length())); - continue; - } + final String latency = "--latency"; + if (cmd.startsWith(latency)) { + opts.reportLatency = true; + continue; + } - final String latency = "--latency"; - if (cmd.startsWith(latency)) { - opts.reportLatency = true; - continue; - } + final String multiGet = "--multiGet="; + if (cmd.startsWith(multiGet)) { + opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); + continue; + } - final String multiGet = "--multiGet="; - if (cmd.startsWith(multiGet)) { - opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length())); - continue; - } + final String useTags = "--usetags="; + if (cmd.startsWith(useTags)) { + opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); + continue; + } - final String useTags = "--usetags="; - if (cmd.startsWith(useTags)) { - opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length())); - continue; - } - - final String noOfTags = "--nooftags="; - if (cmd.startsWith(noOfTags)) { - opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); - continue; - } - - Class cmdClass = determineCommandClass(cmd); - if (cmdClass != null) { - opts.numClientThreads = getNumClients(i + 1, args); - // number of rows specified - opts.totalRows = opts.perClientRunRows * opts.numClientThreads; - runTest(cmdClass, opts); - errCode = 0; - break; - } + final String noOfTags = "--nooftags="; + if (cmd.startsWith(noOfTags)) { + opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length())); + continue; + } - printUsage(); + final String replicas = "--replicas="; + if (cmd.startsWith(replicas)) { + opts.replicas = Integer.parseInt(cmd.substring(replicas.length())); + continue; + } + + if (isCommandClass(cmd)) { + opts.cmdName = cmd; + opts.numClientThreads = Integer.parseInt(args.remove()); + // number of rows specified + opts.totalRows = opts.perClientRunRows * opts.numClientThreads; break; } + } + return opts; + } + + public int run(String[] args) throws Exception { + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + int errCode = -1; + if (args.length < 1) { + printUsage(); + return errCode; + } + + try { + LinkedList argv = new LinkedList(); + argv.addAll(Arrays.asList(args)); + TestOptions opts = parseOpts(argv); + + // args remainting, print help and exit + if (!argv.isEmpty()) { + errCode = 0; + printUsage(); + } + + // must run at least 1 client + if (opts.numClientThreads <= 0) { + throw new IllegalArgumentException("Number of clients must be > 0"); + } + + Class cmdClass = determineCommandClass(opts.cmdName); + if (cmdClass != null) { + runTest(cmdClass, opts); + errCode = 0; + } + } catch (Exception e) { e.printStackTrace(); } @@ -1235,8 +1284,12 @@ public class PerformanceEvaluation extends Configured implements Tool { return errCode; } - private Class determineCommandClass(String cmd) { - CmdDescriptor descriptor = commands.get(cmd); + private static boolean isCommandClass(String cmd) { + return COMMANDS.containsKey(cmd); + } + + private static Class determineCommandClass(String cmd) { + CmdDescriptor descriptor = COMMANDS.get(cmd); return descriptor != null ? descriptor.getCmdClass() : null; }