diff --git src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java index 814dd74..1a2ecd6 100644 --- src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java +++ src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -39,7 +39,7 @@ import org.apache.hadoop.util.ToolRunner; public abstract class AbstractHBaseTool implements Tool { private static final int EXIT_SUCCESS = 0; - private static final int EXIT_FAILURE = 1; + protected static final int EXIT_FAILURE = 1; private static final String HELP_OPTION = "help"; @@ -120,14 +120,14 @@ public abstract class AbstractHBaseTool implements Tool { return success; } - private CommandLine parseArgs(String[] args) throws ParseException { + protected CommandLine parseArgs(String[] args) throws ParseException { options.addOption(HELP_OPTION, false, "Show usage"); addOptions(); CommandLineParser parser = new BasicParser(); return parser.parse(options, args); } - private void printUsage() { + protected void printUsage() { HelpFormatter helpFormatter = new HelpFormatter(); helpFormatter.setWidth(80); String usageHeader = "Options:"; diff --git src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 3761242..36e2853 100644 --- src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -17,11 +17,17 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -31,6 +37,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.util.ToolRunner; /** * A command-line utility that reads, writes, and verifies data. Unlike @@ -97,6 +104,7 @@ public class LoadTestTool extends AbstractHBaseTool { private static final String OPT_ZK_QUORUM = "zk"; private static final String OPT_SKIP_INIT = "skip_init"; private static final String OPT_INIT_ONLY = "init_only"; + private static final String OPT_CONCURRENT_FACTOR = "concurrent_factor"; private static final long DEFAULT_START_KEY = 0; @@ -214,6 +222,11 @@ public class LoadTestTool extends AbstractHBaseTool { DEFAULT_START_KEY + "."); addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + "already exists"); + + addOptWithArg(OPT_CONCURRENT_FACTOR, + "A positive integer number. When a number n is speicfied, load test " + + "tool will load n table parallely. -tn parameter value becomes " + + "table name prefix. Each table name is in format _1..._n"); } @Override @@ -391,7 +404,126 @@ public class LoadTestTool extends AbstractHBaseTool { } public static void main(String[] args) { - new LoadTestTool().doStaticMain(args); + LoadTestTool tool = new LoadTestTool(); + CommandLine cmd; + try { + // parse the command line arguments + cmd = tool.parseArgs(args); + } catch (ParseException e) { + LOG.error("Error when parsing command-line arguemnts", e); + tool.printUsage(); + return; + } + if (cmd.hasOption(OPT_CONCURRENT_FACTOR)) { + try { + tool.parallelLoadTables(cmd, args); + } catch (Exception e) { + LOG.info("Loading failed", e); + System.exit(EXIT_FAILURE); + } + } else { + tool.doStaticMain(args); + } } + /** + * When OPT_CONCURRENT_FACTOR is specified, the function starts multiple worker threads which + * individually start a LoadTestTool instance to load a table. Each table name is in format + * _. For example, "-tn test -concurrent_factor + * 2", table names will be "test_1", "test_2" + * @param cmd + * @param cmdArgs + * @throws IOException + */ + private void parallelLoadTables(CommandLine cmd, final String[] cmdArgs) + throws IOException { + long numberOfTables = parseLong(cmd.getOptionValue(OPT_CONCURRENT_FACTOR), + 1, 64); + // create new command args + String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME); + String[] newArgs = null; + if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) { + newArgs = new String[cmdArgs.length + 2]; + newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME; + for (int i = 0; i < cmdArgs.length; i++) { + newArgs[i + 2] = cmdArgs[i]; + } + } else { + newArgs = cmdArgs; + } + + int tablaNameValueIndex = -1; + for (int j = 0; j < newArgs.length; j++) { + if (newArgs[j].endsWith(OPT_TABLE_NAME)) { + tablaNameValueIndex = j + 1; + break; + } + } + + // loading multiple tables + List workers = new ArrayList(); + for (int i = 0; i < numberOfTables; i++) { + String[] workerArgs = newArgs.clone(); + workerArgs[tablaNameValueIndex] = tableName + "_" + (i+1); + WorkerThread worker = new WorkerThread(i, workerArgs); + workers.add(worker); + LOG.info(worker + " starting"); + worker.start(); + } + + // wait for all workers finish + LOG.info("Waiting for worker threads to finish"); + for (WorkerThread t : workers) { + try { + t.join(); + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + checkForErrors(); + } + } + + // If an exception is thrown by one of worker threads, it will be + // stored here. + protected AtomicReference thrown = new AtomicReference(); + + private void workerThreadError(Throwable t) { + thrown.compareAndSet(null, t); + } + + /** + * Check for errors in the writer threads. If any is found, rethrow it. + */ + private void checkForErrors() throws IOException { + Throwable thrown = this.thrown.get(); + if (thrown == null) return; + if (thrown instanceof IOException) { + throw (IOException) thrown; + } else { + throw new RuntimeException(thrown); + } + } + + class WorkerThread extends Thread { + private String[] workerArgs; + + WorkerThread(int i, String[] args) { + super("WorkerThread-" + i); + workerArgs = args; + } + + public void run() { + try { + int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs); + if (ret != 0) { + throw new RuntimeException("LoadTestTool exit with non-zero return code."); + } + } catch (Exception ex) { + LOG.error("Error in worker thread", ex); + workerThreadError(ex); + } + } + } }