diff --git src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java index 814dd74..8425081 100644 --- src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java +++ src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -38,8 +38,8 @@ import org.apache.hadoop.util.ToolRunner; */ public abstract class AbstractHBaseTool implements Tool { - private static final int EXIT_SUCCESS = 0; - private static final int EXIT_FAILURE = 1; + protected static final int EXIT_SUCCESS = 0; + protected static final int EXIT_FAILURE = 1; private static final String HELP_OPTION = "help"; @@ -50,6 +50,8 @@ public abstract class AbstractHBaseTool implements Tool { protected Configuration conf = null; private static final Set requiredOptions = new TreeSet(); + + protected String[] cmdLineArgs = null; /** * Override this to add command-line options using {@link #addOptWithArg} @@ -86,6 +88,7 @@ public abstract class AbstractHBaseTool implements Tool { try { // parse the command line arguments cmd = parseArgs(args); + cmdLineArgs = args; } catch (ParseException e) { LOG.error("Error when parsing command-line arguemnts", e); printUsage(); @@ -120,14 +123,14 @@ public abstract class AbstractHBaseTool implements Tool { return success; } - private CommandLine parseArgs(String[] args) throws ParseException { + protected CommandLine parseArgs(String[] args) throws ParseException { options.addOption(HELP_OPTION, false, "Show usage"); addOptions(); CommandLineParser parser = new BasicParser(); return parser.parse(options, args); } - private void printUsage() { + protected void printUsage() { HelpFormatter helpFormatter = new HelpFormatter(); helpFormatter.setWidth(80); String usageHeader = "Options:"; diff --git src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 3761242..64e0b20 100644 --- src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -17,11 +17,17 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -31,6 +37,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.util.ToolRunner; /** * A command-line utility that reads, writes, and verifies data. Unlike @@ -97,6 +104,7 @@ public class LoadTestTool extends AbstractHBaseTool { private static final String OPT_ZK_QUORUM = "zk"; private static final String OPT_SKIP_INIT = "skip_init"; private static final String OPT_INIT_ONLY = "init_only"; + private static final String NUM_TABLES = "num_tables"; private static final long DEFAULT_START_KEY = 0; @@ -127,6 +135,8 @@ public class LoadTestTool extends AbstractHBaseTool { private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; private int verifyPercent; + + private int numTables = 1; // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, // console tool itself should only be used from console. @@ -214,6 +224,11 @@ public class LoadTestTool extends AbstractHBaseTool { DEFAULT_START_KEY + "."); addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + "already exists"); + + addOptWithArg(NUM_TABLES, + "A positive integer number. When a number n is speicfied, load test " + + "tool will load n table parallely. -tn parameter value becomes " + + "table name prefix. Each table name is in format _1..._n"); } @Override @@ -299,6 +314,11 @@ public class LoadTestTool extends AbstractHBaseTool { System.out.println("Percent of keys to verify: " + verifyPercent); System.out.println("Reader threads: " + numReaderThreads); } + + numTables = 1; + if(cmd.hasOption(NUM_TABLES)) { + numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, 64); + } } private void parseColumnFamilyOptions(CommandLine cmd) { @@ -327,6 +347,14 @@ public class LoadTestTool extends AbstractHBaseTool { @Override protected int doWork() throws IOException { + if (numTables > 1) { + return parallelLoadTables(); + } else { + return loadTable(); + } + } + + protected int loadTable() throws IOException { if (cmd.hasOption(OPT_ZK_QUORUM)) { conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); } @@ -389,9 +417,111 @@ public class LoadTestTool extends AbstractHBaseTool { } return success ? 0 : 1; } - + public static void main(String[] args) { new LoadTestTool().doStaticMain(args); } + /** + * When NUM_TABLES is specified, the function starts multiple worker threads + * which individually start a LoadTestTool instance to load a table. Each + * table name is in format _. For example, "-tn test -num_tables 2" + * , table names will be "test_1", "test_2" + * + * @throws IOException + */ + private int parallelLoadTables() + throws IOException { + long numberOfTables = parseLong(cmd.getOptionValue(NUM_TABLES), 1, 64); + // create new command args + String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME); + String[] newArgs = null; + if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) { + newArgs = new String[cmdLineArgs.length + 2]; + newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME; + for (int i = 0; i < cmdLineArgs.length; i++) { + newArgs[i + 2] = cmdLineArgs[i]; + } + } else { + newArgs = cmdLineArgs; + } + + int tablaNameValueIndex = -1; + for (int j = 0; j < newArgs.length; j++) { + if (newArgs[j].endsWith(OPT_TABLE_NAME)) { + tablaNameValueIndex = j + 1; + } else if (newArgs[j].endsWith(NUM_TABLES)) { + // change NUM_TABLES to 1 so that each worker loads one table + newArgs[j + 1] = "1"; + } + } + + // starting to load multiple tables + List workers = new ArrayList(); + for (int i = 0; i < numberOfTables; i++) { + String[] workerArgs = newArgs.clone(); + workerArgs[tablaNameValueIndex] = tableName + "_" + (i+1); + WorkerThread worker = new WorkerThread(i, workerArgs); + workers.add(worker); + LOG.info(worker + " starting"); + worker.start(); + } + + // wait for all workers finish + LOG.info("Waiting for worker threads to finish"); + for (WorkerThread t : workers) { + try { + t.join(); + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + checkForErrors(); + } + + return EXIT_SUCCESS; + } + + // If an exception is thrown by one of worker threads, it will be + // stored here. + protected AtomicReference thrown = new AtomicReference(); + + private void workerThreadError(Throwable t) { + thrown.compareAndSet(null, t); + } + + /** + * Check for errors in the writer threads. If any is found, rethrow it. + */ + private void checkForErrors() throws IOException { + Throwable thrown = this.thrown.get(); + if (thrown == null) return; + if (thrown instanceof IOException) { + throw (IOException) thrown; + } else { + throw new RuntimeException(thrown); + } + } + + class WorkerThread extends Thread { + private String[] workerArgs; + + WorkerThread(int i, String[] args) { + super("WorkerThread-" + i); + workerArgs = args; + } + + public void run() { + try { + int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs); + if (ret != 0) { + throw new RuntimeException("LoadTestTool exit with non-zero return code."); + } + } catch (Exception ex) { + LOG.error("Error in worker thread", ex); + workerThreadError(ex); + } + } + } }