Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1227252) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; @@ -88,6 +89,12 @@ private Configuration conf; private MiniZooKeeperCluster zkCluster = null; /** + * The default number of regions per regionserver when creating a pre-split + * table. + */ + private static int DEFAULT_REGIONS_PER_SERVER = 5; + + /** * Set if we were passed a zkCluster. If so, we won't shutdown zk as * part of general shutdown. */ @@ -1578,4 +1585,46 @@ return zkw; } + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + byte[] tableName, byte[] columnFamily) throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(columnFamily)); + + int totalNumberOfRegions = 0; + try { + HBaseAdmin admin = new HBaseAdmin(conf); + + // create a table a pre-splits regions. + // The number of splits is set as: + // region servers * regions per region server). + int numberOfServers = admin.getClusterStatus().getServers().size(); + if (numberOfServers == 0) { + throw new IllegalStateException("No live regionservers"); + } + + totalNumberOfRegions = numberOfServers * DEFAULT_REGIONS_PER_SERVER; + LOG.info("Number of live regionservers: " + numberOfServers + ", " + + "pre-splitting table into " + totalNumberOfRegions + " regions " + + "(default regions per server: " + DEFAULT_REGIONS_PER_SERVER + ")"); + + byte[][] splits = new RegionSplitter.MD5StringSplit().split( + totalNumberOfRegions); + + admin.createTable(desc, splits); + admin.close(); + } catch (MasterNotRunningException e) { + LOG.error("Master not running", e); + throw new IOException(e); + } catch (TableExistsException e) { + LOG.warn("Table " + Bytes.toStringBinary(tableName) + + " already exists, continuing"); + } + return totalNumberOfRegions; + } + } Index: src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (revision 0) @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; + +/** Creates multiple threads that read and verify previously written data */ +public class MultiThreadedReader extends MultiThreadedAction +{ + private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class); + + private Set readers = new HashSet(); + private final double verifyPercent; + private volatile boolean aborted; + + private MultiThreadedWriter writer = null; + + /** + * The number of keys verified in a sequence. This will never be larger than + * the total number of keys in the range. The reader might also verify + * random keys when it catches up with the writer. + */ + private final AtomicLong numUniqueKeysVerified = new AtomicLong(); + + /** + * Default maximum number of read errors to tolerate before shutting down all + * readers. + */ + public static final int DEFAULT_MAX_ERRORS = 10; + + /** + * Default "window" size between the last key written by the writer and the + * key that we attempt to read. The lower this number, the stricter our + * testing is. If this is zero, we always attempt to read the highest key + * in the contiguous sequence of keys written by the writers. + */ + public static final int DEFAULT_KEY_WINDOW = 0; + + protected AtomicLong numKeysVerified = new AtomicLong(0); + private AtomicLong numReadErrors = new AtomicLong(0); + private AtomicLong numReadFailures = new AtomicLong(0); + + private int maxErrors = DEFAULT_MAX_ERRORS; + private int keyWindow = DEFAULT_KEY_WINDOW; + + public MultiThreadedReader(Configuration conf, byte[] tableName, + byte[] columnFamily, double verifyPercent) { + super(conf, tableName, columnFamily, "R"); + this.verifyPercent = verifyPercent; + } + + public void linkToWriter(MultiThreadedWriter writer) { + this.writer = writer; + writer.setTrackInsertedKeys(true); + } + + public void setMaxErrors(int maxErrors) { + this.maxErrors = maxErrors; + } + + public void setKeyWindow(int keyWindow) { + this.keyWindow = keyWindow; + } + + @Override + public void start(long startKey, long endKey, int numThreads) + throws IOException { + super.start(startKey, endKey, numThreads); + if (verbose) { + LOG.debug("Reading keys [" + startKey + ", " + endKey + ")"); + } + + for (int i = 0; i < numThreads; ++i) { + HBaseReaderThread reader = new HBaseReaderThread(i); + readers.add(reader); + } + startThreads(readers); + } + + public class HBaseReaderThread extends Thread { + private final int readerId; + private final HTable table; + private final Random random = new Random(); + + /** The "current" key being read. Increases from startKey to endKey. */ + private long curKey; + + /** Time when the thread started */ + private long startTimeMs; + + /** If we are ahead of the writer and reading a random key. */ + private boolean readingRandomKey; + + /** + * @param readerId only the keys with this remainder from division by + * {@link #numThreads} will be read by this thread + */ + public HBaseReaderThread(int readerId) throws IOException { + this.readerId = readerId; + table = new HTable(conf, tableName); + setName(getClass().getSimpleName() + "_" + readerId); + } + + @Override + public void run() { + try { + runReader(); + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Error closing table", e); + } + numThreadsWorking.decrementAndGet(); + } + } + + private void runReader() { + if (verbose) { + LOG.info("Started thread #" + readerId + " for reads..."); + } + + startTimeMs = System.currentTimeMillis(); + curKey = startKey; + while (curKey < endKey && !aborted) { + long k = getNextKeyToRead(); + + // A sanity check for the key range. + if (k < startKey || k >= endKey) { + numReadErrors.incrementAndGet(); + throw new AssertionError("Load tester logic error: proposed key " + + "to read " + k + " is out of range (startKey=" + startKey + + ", endKey=" + endKey + ")"); + } + + if (k % numThreads != readerId || + writer != null && writer.failedToWriteKey(k)) { + // Skip keys that this thread should not read, as well as the keys + // that we know the writer failed to write. + continue; + } + + readKey(k); + if (k == curKey - 1 && !readingRandomKey) { + // We have verified another unique key. + numUniqueKeysVerified.incrementAndGet(); + } + } + } + + /** + * Should only be used for the concurrent writer/reader workload. The + * maximum key we are allowed to read, subject to the "key window" + * constraint. + */ + private long maxKeyWeCanRead() { + long insertedUpToKey = writer.insertedUpToKey(); + if (insertedUpToKey >= endKey - 1) { + // The writer has finished writing our range, so we can read any + // key in the range. + return endKey - 1; + } + return Math.min(endKey - 1, writer.insertedUpToKey() - keyWindow); + } + + private long getNextKeyToRead() { + readingRandomKey = false; + if (writer == null || curKey <= maxKeyWeCanRead()) { + return curKey++; + } + + // We caught up with the writer. See if we can read any keys at all. + long maxKeyToRead; + while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) { + // The writer has not written sufficient keys for us to be able to read + // anything at all. Sleep a bit. This should only happen in the + // beginning of a load test run. + Threads.sleepWithoutInterrupt(50); + } + + if (curKey <= maxKeyToRead) { + // The writer wrote some keys, and we are now allowed to read our + // current key. + return curKey++; + } + + // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys. + // Don't increment the current key -- we still have to try reading it + // later. Set a flag to make sure that we don't count this key towards + // the set of unique keys we have verified. + readingRandomKey = true; + return startKey + Math.abs(random.nextLong()) + % (maxKeyToRead - startKey + 1); + } + + private Get readKey(long keyToRead) { + Get get = new Get( + LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes()); + get.addFamily(columnFamily); + + try { + if (verbose) { + LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + + ", cf " + Bytes.toStringBinary(columnFamily)); + } + queryKey(get, random.nextInt(100) < verifyPercent); + } catch (IOException e) { + numReadFailures.addAndGet(1); + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + + ", time from start: " + + (System.currentTimeMillis() - startTimeMs) + " ms"); + } + return get; + } + + public void queryKey(Get get, boolean verify) throws IOException { + String rowKey = new String(get.getRow()); + + // read the data + long start = System.currentTimeMillis(); + Result result = table.get(get); + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + numKeys.addAndGet(1); + + // if we got no data report error + if (result.isEmpty()) { + HRegionLocation hloc = table.getRegionLocation( + Bytes.toBytes(rowKey)); + LOG.info("Key = " + rowKey + ", RegionServer: " + + hloc.getHostname()); + numReadErrors.addAndGet(1); + LOG.error("No data returned, tried to get actions for key = " + + rowKey + (writer == null ? "" : ", keys inserted by writer: " + + writer.numKeys.get() + ")")); + + if (numReadErrors.get() > maxErrors) { + LOG.error("Aborting readers -- found more than " + maxErrors + + " errors\n"); + aborted = true; + } + } + + if (result.getFamilyMap(columnFamily) != null) { + // increment number of columns read + numCols.addAndGet(result.getFamilyMap(columnFamily).size()); + + if (verify) { + // verify the result + List keyValues = result.list(); + for (KeyValue kv : keyValues) { + String qual = new String(kv.getQualifier()); + + // if something does not look right report it + if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) { + numReadErrors.addAndGet(1); + LOG.error("Error checking data for key = " + rowKey + + ", actionId = " + qual); + } + } + numKeysVerified.addAndGet(1); + } + } + } + + } + + public long getNumReadFailures() { + return numReadFailures.get(); + } + + public long getNumReadErrors() { + return numReadErrors.get(); + } + + public long getNumKeysVerified() { + return numKeysVerified.get(); + } + + public long getNumUniqueKeysVerified() { + return numUniqueKeysVerified.get(); + } + + @Override + protected String progressInfo() { + StringBuilder sb = new StringBuilder(); + appendToStatus(sb, "verified", numKeysVerified.get()); + appendToStatus(sb, "READ FAILURES", numReadFailures.get()); + appendToStatus(sb, "READ ERRORS", numReadErrors.get()); + return sb.toString(); + } + +} Index: src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java (revision 0) @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Random; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.MD5Hash; + +/** + * A generator of random keys and values for load testing. Keys are generated + * by converting numeric indexes to strings and prefixing them with an MD5 + * hash. Values are generated by selecting value size in the configured range + * and generating a pseudo-random sequence of bytes seeded by key, column + * qualifier, and value size. + *

+ * Not thread-safe, so a separate instance is needed for every writer thread/ + */ +public class LoadTestKVGenerator { + + /** A random number generator for determining value size */ + private Random randomForValueSize = new Random(); + + private final int minValueSize; + private final int maxValueSize; + + public LoadTestKVGenerator(int minValueSize, int maxValueSize) { + if (minValueSize <= 0 || maxValueSize <= 0) { + throw new IllegalArgumentException("Invalid min/max value sizes: " + + minValueSize + ", " + maxValueSize); + } + this.minValueSize = minValueSize; + this.maxValueSize = maxValueSize; + } + + /** + * Verifies that the given byte array is the same as what would be generated + * for the given row key and qualifier. We are assuming that the value size + * is correct, and only verify the actual bytes. However, if the min/max + * value sizes are set sufficiently high, an accidental match should be + * extremely improbable. + */ + public static boolean verify(String rowKey, String qual, byte[] value) { + byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length); + return Bytes.equals(expectedData, value); + } + + /** + * Converts the given key to string, and prefixes it with the MD5 hash of + * the index's string representation. + */ + public static String md5PrefixedKey(long key) { + String stringKey = Long.toString(key); + String md5hash = MD5Hash.getMD5AsHex(Bytes.toBytes(stringKey)); + + // flip the key to randomize + return md5hash + ":" + stringKey; + } + + /** + * Generates a value for the given key index and column qualifier. Size is + * selected randomly in the configured range. The generated value depends + * only on the combination of the key, qualifier, and the selected value + * size. This allows to verify the actual value bytes when reading, as done + * in {@link #verify(String, String, byte[])}. + */ + public byte[] generateRandomSizeValue(long key, String qual) { + String rowKey = md5PrefixedKey(key); + int dataSize = minValueSize + randomForValueSize.nextInt( + Math.abs(maxValueSize - minValueSize)); + return getValueForRowColumn(rowKey, qual, dataSize); + } + + /** + * Generates random bytes of the given size for the given row and column + * qualifier. The random seed is fully determined by these parameters. + */ + private static byte[] getValueForRowColumn(String rowKey, String qual, + int dataSize) { + Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() + + dataSize); + byte[] randomBytes = new byte[dataSize]; + seededRandom.nextBytes(randomBytes); + return randomBytes; + } + +} Index: src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (revision 0) @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; + +/** Creates multiple threads that write key/values into the */ +public class MultiThreadedWriter extends MultiThreadedAction { + private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class); + + private long minColumnsPerKey = 1; + private long maxColumnsPerKey = 10; + private Set writers = new HashSet(); + + private boolean isMultiPut = false; + + /** + * A temporary place to keep track of inserted keys. This is written to by + * all writers and is drained on a separate thread that populates + * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys + * being inserted. This queue is supposed to stay small. + */ + private BlockingQueue insertedKeys = + new ArrayBlockingQueue(10000); + + /** + * This is the current key to be inserted by any thread. Each thread does an + * atomic get and increment operation and inserts the current value. + */ + private AtomicLong nextKeyToInsert = new AtomicLong(); + + /** + * The highest key in the contiguous range of keys . + */ + private AtomicLong insertedUpToKey = new AtomicLong(); + + /** The sorted set of keys NOT inserted by the writers */ + private Set failedKeySet = new ConcurrentSkipListSet(); + + /** + * The total size of the temporary inserted key set that have not yet lined + * up in a our contiguous sequence starting from startKey. Supposed to stay + * small. + */ + private AtomicLong insertedKeyQueueSize = new AtomicLong(); + + /** Enable this if used in conjunction with a concurrent reader. */ + private boolean trackInsertedKeys; + + public MultiThreadedWriter(Configuration conf, byte[] tableName, + byte[] columnFamily) { + super(conf, tableName, columnFamily, "W"); + } + + /** Use multi-puts vs. separate puts for every column in a row */ + public void setMultiPut(boolean isMultiPut) { + this.isMultiPut = isMultiPut; + } + + public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) { + this.minColumnsPerKey = minColumnsPerKey; + this.maxColumnsPerKey = maxColumnsPerKey; + } + + @Override + public void start(long startKey, long endKey, int numThreads) + throws IOException { + super.start(startKey, endKey, numThreads); + + if (verbose) { + LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); + } + + nextKeyToInsert.set(startKey); + insertedUpToKey.set(startKey - 1); + + for (int i = 0; i < numThreads; ++i) { + HBaseWriterThread writer = new HBaseWriterThread(i); + writers.add(writer); + } + + if (trackInsertedKeys) { + new Thread(new InsertedKeysTracker()).start(); + numThreadsWorking.incrementAndGet(); + } + + startThreads(writers); + } + + public static byte[] longToByteArrayKey(long rowKey) { + return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); + } + + private class HBaseWriterThread extends Thread { + private final HTable table; + private final int writerId; + + private final Random random = new Random(); + private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator( + minDataSize, maxDataSize); + + public HBaseWriterThread(int writerId) throws IOException { + setName(getClass().getSimpleName() + "_" + writerId); + table = new HTable(conf, tableName); + this.writerId = writerId; + } + + public void run() { + try { + long rowKey; + while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) { + long numColumns = minColumnsPerKey + Math.abs(random.nextLong()) + % (maxColumnsPerKey - minColumnsPerKey); + numKeys.addAndGet(1); + if (isMultiPut) { + multiPutInsertKey(rowKey, 0, numColumns); + } else { + for (long col = 0; col < numColumns; ++col) { + insert(rowKey, col); + } + } + if (trackInsertedKeys) { + insertedKeys.add(rowKey); + } + } + } finally { + try { + table.close(); + } catch (IOException e) { + LOG.error("Error closing table", e); + } + numThreadsWorking.decrementAndGet(); + } + } + + public void insert(long rowKey, long col) { + Put put = new Put(longToByteArrayKey(rowKey)); + String colAsStr = String.valueOf(col); + put.add(columnFamily, colAsStr.getBytes(), + dataGenerator.generateRandomSizeValue(rowKey, colAsStr)); + try { + long start = System.currentTimeMillis(); + table.put(put); + numCols.addAndGet(1); + totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(rowKey); + LOG.error("Failed to insert: " + rowKey); + e.printStackTrace(); + } + } + + public void multiPutInsertKey(long rowKey, long startCol, long endCol) { + if (verbose) { + LOG.debug("Preparing put for key = " + rowKey + ", cols = [" + + startCol + ", " + endCol + ")"); + } + + if (startCol >= endCol) { + return; + } + + Put put = new Put(LoadTestKVGenerator.md5PrefixedKey( + rowKey).getBytes()); + byte[] columnQualifier; + byte[] value; + for (long i = startCol; i < endCol; ++i) { + String qualStr = String.valueOf(i); + columnQualifier = qualStr.getBytes(); + value = dataGenerator.generateRandomSizeValue(rowKey, qualStr); + put.add(columnFamily, columnQualifier, value); + } + + try { + long start = System.currentTimeMillis(); + table.put(put); + numCols.addAndGet(endCol - startCol); + totalOpTimeMs.addAndGet( + System.currentTimeMillis() - start); + } catch (IOException e) { + failedKeySet.add(rowKey); + e.printStackTrace(); + } + } + } + + /** + * A thread that keeps track of the highest key in the contiguous range of + * inserted keys. + */ + private class InsertedKeysTracker implements Runnable { + + @Override + public void run() { + Thread.currentThread().setName(getClass().getSimpleName()); + try { + long expectedKey = startKey; + Queue sortedKeys = new PriorityQueue(); + while (expectedKey < endKey) { + // Block until a new element is available. + Long k; + try { + k = insertedKeys.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.info("Inserted key tracker thread interrupted", e); + break; + } + if (k == null) { + continue; + } + if (k == expectedKey) { + // Skip the "sorted key" queue and consume this key. + insertedUpToKey.set(k); + ++expectedKey; + } else { + sortedKeys.add(k); + } + + // See if we have a sequence of contiguous keys lined up. + while (!sortedKeys.isEmpty() + && ((k = sortedKeys.peek()) == expectedKey)) { + sortedKeys.poll(); + insertedUpToKey.set(k); + ++expectedKey; + } + + insertedKeyQueueSize.set(insertedKeys.size() + sortedKeys.size()); + } + } catch (Exception ex) { + LOG.error("Error in inserted key tracker", ex); + } finally { + numThreadsWorking.decrementAndGet(); + } + } + + } + + @Override + public void waitForFinish() { + super.waitForFinish(); + System.out.println("Failed to write keys: " + failedKeySet.size()); + for (Long key : failedKeySet) { + System.out.println("Failed to write key: " + key); + } + } + + public int getNumWriteFailures() { + return failedKeySet.size(); + } + + /** + * The max key until which all keys have been inserted (successfully or not). + * @return the last key that we have inserted all keys up to (inclusive) + */ + public long insertedUpToKey() { + return insertedUpToKey.get(); + } + + public boolean failedToWriteKey(long k) { + return failedKeySet.contains(k); + } + + @Override + protected String progressInfo() { + StringBuilder sb = new StringBuilder(); + appendToStatus(sb, "insertedUpTo", insertedUpToKey.get()); + appendToStatus(sb, "insertedQSize", insertedKeyQueueSize.get()); + return sb.toString(); + } + + /** + * Used for a joint write/read workload. Enables tracking the last inserted + * key, which requires a blocking queue and a consumer thread. + * @param enable whether to enable tracking the last inserted key + */ + void setTrackInsertedKeys(boolean enable) { + trackInsertedKeys = enable; + } + +} Index: src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (revision 0) @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; + +/** + * Common base class for reader and writer parts of multi-thread HBase load + * test ({@link LoadTestTool}). + */ +public abstract class MultiThreadedAction { + private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class); + + protected final byte[] tableName; + protected final byte[] columnFamily; + protected final Configuration conf; + + protected int numThreads = 1; + + /** The start key of the key range, inclusive */ + protected long startKey = 0; + + /** The end key of the key range, exclusive */ + protected long endKey = 1; + + protected AtomicInteger numThreadsWorking = new AtomicInteger(); + protected AtomicLong numKeys = new AtomicLong(); + protected AtomicLong numCols = new AtomicLong(); + protected AtomicLong totalOpTimeMs = new AtomicLong(); + protected boolean verbose = false; + + protected int minDataSize = 256; + protected int maxDataSize = 1024; + + /** "R" or "W" */ + private String actionLetter; + + /** Whether we need to print out Hadoop Streaming-style counters */ + private boolean streamingCounters; + + public static final int REPORTING_INTERVAL_MS = 5000; + + public MultiThreadedAction(Configuration conf, byte[] tableName, + byte[] columnFamily, String actionLetter) { + this.conf = conf; + this.tableName = tableName; + this.columnFamily = columnFamily; + this.actionLetter = actionLetter; + } + + public void start(long startKey, long endKey, int numThreads) + throws IOException { + this.startKey = startKey; + this.endKey = endKey; + this.numThreads = numThreads; + (new Thread(new ProgressReporter(actionLetter))).start(); + } + + private static String formatTime(long elapsedTime) { + String format = String.format("%%0%dd", 2); + elapsedTime = elapsedTime / 1000; + String seconds = String.format(format, elapsedTime % 60); + String minutes = String.format(format, (elapsedTime % 3600) / 60); + String hours = String.format(format, elapsedTime / 3600); + String time = hours + ":" + minutes + ":" + seconds; + return time; + } + + /** Asynchronously reports progress */ + private class ProgressReporter implements Runnable { + + private String reporterId = ""; + + public ProgressReporter(String id) { + this.reporterId = id; + } + + @Override + public void run() { + long startTime = System.currentTimeMillis(); + long priorNumKeys = 0; + long priorCumulativeOpTime = 0; + int priorAverageKeysPerSecond = 0; + + // Give other threads time to start. + Threads.sleep(REPORTING_INTERVAL_MS); + + while (numThreadsWorking.get() != 0) { + String threadsLeft = + "[" + reporterId + ":" + numThreadsWorking.get() + "] "; + if (numKeys.get() == 0) { + LOG.info(threadsLeft + "Number of keys = 0"); + } else { + long numKeys = MultiThreadedAction.this.numKeys.get(); + long time = System.currentTimeMillis() - startTime; + long totalOpTime = totalOpTimeMs.get(); + + long numKeysDelta = numKeys - priorNumKeys; + long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime; + + double averageKeysPerSecond = + (time > 0) ? (numKeys * 1000 / time) : 0; + + LOG.info(threadsLeft + + "Keys=" + + numKeys + + ", cols=" + + StringUtils.humanReadableInt(numCols.get()) + + ", time=" + + formatTime(time) + + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= " + + numKeys * 1000 / time + ", latency=" + totalOpTime + / numKeys + " ms]") : "") + + ((numKeysDelta > 0) ? (" Current: [" + "keys/s=" + + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency=" + + totalOpTimeDelta / numKeysDelta + " ms]") : "") + + progressInfo()); + + if (streamingCounters) { + printStreamingCounters(numKeysDelta, + averageKeysPerSecond - priorAverageKeysPerSecond); + } + + priorNumKeys = numKeys; + priorCumulativeOpTime = totalOpTime; + priorAverageKeysPerSecond = (int) averageKeysPerSecond; + } + + Threads.sleep(REPORTING_INTERVAL_MS); + } + } + + private void printStreamingCounters(long numKeysDelta, + double avgKeysPerSecondDelta) { + // Write stats in a format that can be interpreted as counters by + // streaming map-reduce jobs. + System.err.println("reporter:counter:numKeys," + reporterId + "," + + numKeysDelta); + System.err.println("reporter:counter:numCols," + reporterId + "," + + numCols.get()); + System.err.println("reporter:counter:avgKeysPerSecond," + reporterId + + "," + (long) (avgKeysPerSecondDelta)); + } + } + + public void setDataSize(int minDataSize, int maxDataSize) { + this.minDataSize = minDataSize; + this.maxDataSize = maxDataSize; + } + + public void waitForFinish() { + while (numThreadsWorking.get() != 0) { + Threads.sleepWithoutInterrupt(1000); + } + } + + protected void startThreads(Collection threads) { + numThreadsWorking.addAndGet(threads.size()); + for (Thread thread : threads) { + thread.start(); + } + } + + /** @return the end key of the key range, exclusive */ + public long getEndKey() { + return endKey; + } + + /** Returns a task-specific progress string */ + protected abstract String progressInfo(); + + protected static void appendToStatus(StringBuilder sb, String desc, + long v) { + if (v == 0) { + return; + } + sb.append(", "); + sb.append(desc); + sb.append("="); + sb.append(v); + } + +} Index: src/main/java/org/apache/hadoop/hbase/util/Threads.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/Threads.java (revision 1227252) +++ src/main/java/org/apache/hadoop/hbase/util/Threads.java (working copy) @@ -127,4 +127,27 @@ e.printStackTrace(); } } + + /** + * Sleeps for the given amount of time even if interrupted. Preserves + * the interrupt status. + * @param msToWait the amount of time to sleep in milliseconds + */ + public static void sleepWithoutInterrupt(final long msToWait) { + long timeMillis = System.currentTimeMillis(); + long endTime = timeMillis + msToWait; + boolean interrupted = false; + while (timeMillis < endTime) { + try { + Thread.sleep(endTime - timeMillis); + } catch (InterruptedException ex) { + interrupted = true; + } + timeMillis = System.currentTimeMillis(); + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + } } Index: src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java (revision 0) @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Common base class used for HBase command-line tools. Simplifies workflow and + * command-line argument parsing. + */ +public abstract class AbstractHBaseTool implements Tool { + + private static final int EXIT_SUCCESS = 0; + private static final int EXIT_FAILURE = 1; + + private static final String HELP_OPTION = "help"; + + private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class); + + private final Options options = new Options(); + + protected Configuration conf = null; + + private static final Set requiredOptions = new TreeSet(); + + /** + * Override this to add command-line options using {@link #addOptWithArg} + * and similar methods. + */ + protected abstract void addOptions(); + + /** + * This method is called to process the options after they have been parsed. + */ + protected abstract void processOptions(CommandLine cmd); + + /** The "main function" of the tool */ + protected abstract void doWork() throws Exception; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public final int run(String[] args) throws Exception { + if (conf == null) { + LOG.error("Tool configuration is not initialized"); + throw new NullPointerException("conf"); + } + + CommandLine cmd; + try { + // parse the command line arguments + cmd = parseArgs(args); + } catch (ParseException e) { + LOG.error("Error when parsing command-line arguemnts", e); + printUsage(); + return EXIT_FAILURE; + } + + if (cmd.hasOption(HELP_OPTION) || !sanityCheckOptions(cmd)) { + printUsage(); + return EXIT_FAILURE; + } + + processOptions(cmd); + + try { + doWork(); + } catch (Exception e) { + LOG.error("Error running command-line tool", e); + return EXIT_FAILURE; + } + return EXIT_SUCCESS; + } + + private boolean sanityCheckOptions(CommandLine cmd) { + boolean success = true; + for (String reqOpt : requiredOptions) { + if (!cmd.hasOption(reqOpt)) { + LOG.error("Required option -" + reqOpt + " is missing"); + success = false; + } + } + return success; + } + + private CommandLine parseArgs(String[] args) throws ParseException { + options.addOption(HELP_OPTION, false, "Show usage"); + addOptions(); + CommandLineParser parser = new BasicParser(); + return parser.parse(options, args); + } + + private void printUsage() { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.setWidth(80); + String usageHeader = "Options:"; + String usageFooter = ""; + String usageStr = "bin/hbase " + getClass().getName() + " "; + + helpFormatter.printHelp(usageStr, usageHeader, options, + usageFooter); + } + + protected void addRequiredOptWithArg(String opt, String description) { + requiredOptions.add(opt); + addOptWithArg(opt, description); + } + + protected void addOptNoArg(String opt, String description) { + options.addOption(opt, false, description); + } + + protected void addOptWithArg(String opt, String description) { + options.addOption(opt, true, description); + } + + /** + * Parse a number and enforce a range. + */ + public static long parseLong(String s, long minValue, long maxValue) { + long l = Long.parseLong(s); + if (l < minValue || l > maxValue) { + throw new IllegalArgumentException("The value " + l + + " is out of range [" + minValue + ", " + maxValue + "]"); + } + return l; + } + + public static int parseInt(String s, int minValue, int maxValue) { + return (int) parseLong(s, minValue, maxValue); + } + + /** Call this from the concrete tool class's main function. */ + protected void doStaticMain(String args[]) { + int ret; + try { + ret = ToolRunner.run(HBaseConfiguration.create(), this, args); + } catch (Exception ex) { + LOG.error("Error running command-line tool", ex); + ret = EXIT_FAILURE; + } + System.exit(ret); + } + +} Index: src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (revision 0) @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.StoreFile; + +/** + * A command-line utility that reads, writes, and verifies data. Unlike + * {@link PerformanceEvaluation}, this tool validates the data written, + * and supports simultaneously writing and reading the same set of keys. + */ +public class LoadTestTool extends AbstractHBaseTool { + + private static final Log LOG = LogFactory.getLog(LoadTestTool.class); + + /** Table name for the test */ + private byte[] tableName; + + /** Table name to use of not overridden on the command line */ + private static final String DEFAULT_TABLE_NAME = "cluster_test"; + + /** Column family used by the test */ + static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf"); + + /** Column families used by the test */ + static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY }; + + /** The number of reader/writer threads if not specified */ + private static final int DEFAULT_NUM_THREADS = 20; + + /** Usage string for the load option */ + private static final String OPT_USAGE_LOAD = + ":" + + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; + + /** Usa\ge string for the read option */ + private static final String OPT_USAGE_READ = + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; + + private static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " + + Arrays.toString(StoreFile.BloomType.values()); + + private static final String OPT_USAGE_COMPRESSION = "Compression type, " + + "one of " + Arrays.toString(Compression.Algorithm.values()); + + private static final String OPT_BLOOM = "bloom"; + private static final String OPT_COMPRESSION = "compression"; + + private static final String OPT_KEY_WINDOW = "key_window"; + private static final String OPT_WRITE = "write"; + private static final String OPT_MAX_READ_ERRORS = "max_read_errors"; + private static final String OPT_MULTIPUT = "multiput"; + private static final String OPT_NUM_KEYS = "num_keys"; + private static final String OPT_READ = "read"; + private static final String OPT_START_KEY = "start_key"; + private static final String OPT_TABLE_NAME = "tn"; + private static final String OPT_ZK_QUORUM = "zk"; + private static final String OPT_ZK_PORT = "zk_port"; + private static final String OPT_ZK_ZNODE_PARENT = "zk_parent"; + + private static final long DEFAULT_START_KEY = 0; + + /** This will be removed as we factor out the dependency on command line */ + private CommandLine cmd; + + private MultiThreadedWriter writerThreads = null; + private MultiThreadedReader readerThreads = null; + + private long startKey, endKey; + + private boolean isWrite, isRead; + + // Column family options + private boolean encodeInCacheOnly; + + // Writer options + private int numWriterThreads = DEFAULT_NUM_THREADS; + private long minColsPerKey, maxColsPerKey; + private int minColDataSize, maxColDataSize; + private boolean isMultiPut; + + // Reader options + private int numReaderThreads = DEFAULT_NUM_THREADS; + private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; + private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; + private int verifyPercent; + + private String[] splitColonSeparated(String option, + int minNumCols, int maxNumCols) { + String optVal = cmd.getOptionValue(option); + String[] cols = optVal.split(":"); + if (cols.length < minNumCols || cols.length > maxNumCols) { + throw new IllegalArgumentException("Expected at least " + + minNumCols + " columns but no more than " + maxNumCols + + " in the colon-separated value '" + optVal + "' of the " + + "-" + option + " option"); + } + return cols; + } + + private int getNumThreads(String numThreadsStr) { + return parseInt(numThreadsStr, 1, Short.MAX_VALUE); + } + + /** + * Apply column family options such as Bloom filters, compression, and data + * block encoding. + */ + private void applyColumnFamilyOptions(byte[] tableName, + byte[][] columnFamilies) throws IOException { + String bloomStr = cmd.getOptionValue(OPT_BLOOM); + StoreFile.BloomType bloomType = bloomStr == null ? null : + StoreFile.BloomType.valueOf(bloomStr); + + String compressStr = cmd.getOptionValue(OPT_COMPRESSION); + Compression.Algorithm compressAlgo = compressStr == null ? null : + Compression.Algorithm.valueOf(compressStr); + + HBaseAdmin admin = new HBaseAdmin(conf); + HTableDescriptor tableDesc = admin.getTableDescriptor(tableName); + LOG.info("Disabling table " + Bytes.toString(tableName)); + admin.disableTable(tableName); + for (byte[] cf : columnFamilies) { + HColumnDescriptor columnDesc = tableDesc.getFamily(cf); + if (bloomStr != null) { + columnDesc.setBloomFilterType(bloomType); + } + if (compressStr != null) { + columnDesc.setCompressionType(compressAlgo); + } + admin.modifyColumn(tableName, columnDesc); + } + LOG.info("Enabling table " + Bytes.toString(tableName)); + admin.enableTable(tableName); + } + + @Override + protected void addOptions() { + addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " + + "without port numbers"); + addOptWithArg(OPT_ZK_PORT, "ZK client port number"); + addOptWithArg(OPT_ZK_ZNODE_PARENT, "root node in ZK for HBase"); + addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); + addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); + addOptWithArg(OPT_READ, OPT_USAGE_READ); + addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); + addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); + addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + + "to tolerate before terminating all reader threads. The default is " + + MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); + addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " + + "reads and writes for concurrent write/read workload. The default " + + "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); + + addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " + + "separate puts for every column in a row"); + + addRequiredOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); + addOptWithArg(OPT_START_KEY, "The first key to read/write " + + "(a 0-based index). The default value is " + + DEFAULT_START_KEY + "."); + } + + @Override + protected void processOptions(CommandLine cmd) { + this.cmd = cmd; + + tableName = Bytes.toBytes(cmd.getOptionValue(OPT_TABLE_NAME, + DEFAULT_TABLE_NAME)); + startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, + String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE); + long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, + Long.MAX_VALUE - startKey); + endKey = startKey + numKeys; + + isWrite = cmd.hasOption(OPT_WRITE); + isRead = cmd.hasOption(OPT_READ); + + if (!isWrite && !isRead) { + throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " + + "-" + OPT_READ + " has to be specified"); + } + + if (isWrite) { + String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3); + + int colIndex = 0; + minColsPerKey = 1; + maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]); + int avgColDataSize = + parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE); + minColDataSize = avgColDataSize / 2; + maxColDataSize = avgColDataSize * 3 / 2; + + if (colIndex < writeOpts.length) { + numWriterThreads = getNumThreads(writeOpts[colIndex++]); + } + + isMultiPut = cmd.hasOption(OPT_MULTIPUT); + + System.out.println("Multi-puts: " + isMultiPut); + System.out.println("Columns per key: " + minColsPerKey + ".." + + maxColsPerKey); + System.out.println("Data size per column: " + minColDataSize + ".." + + maxColDataSize); + } + + if (isRead) { + String[] readOpts = splitColonSeparated(OPT_READ, 1, 2); + int colIndex = 0; + verifyPercent = parseInt(readOpts[colIndex++], 0, 100); + if (colIndex < readOpts.length) { + numReaderThreads = getNumThreads(readOpts[colIndex++]); + } + + if (cmd.hasOption(OPT_MAX_READ_ERRORS)) { + maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), + 0, Integer.MAX_VALUE); + } + + if (cmd.hasOption(OPT_KEY_WINDOW)) { + keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), + 0, Integer.MAX_VALUE); + } + + System.out.println("Percent of keys to verify: " + verifyPercent); + System.out.println("Reader threads: " + numReaderThreads); + } + + System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]"); + } + + @Override + protected void doWork() throws IOException { + if (cmd.hasOption(OPT_ZK_QUORUM)) { + conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); + } + if (cmd.hasOption(OPT_ZK_PORT)) { + conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, cmd.getOptionValue(OPT_ZK_PORT)); + } + if (cmd.hasOption(OPT_ZK_ZNODE_PARENT)) { + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_ZNODE_PARENT)); + } + + HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName, + COLUMN_FAMILY); + applyColumnFamilyOptions(tableName, COLUMN_FAMILIES); + + if (isWrite) { + writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY); + writerThreads.setMultiPut(isMultiPut); + writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey); + writerThreads.setDataSize(minColDataSize, maxColDataSize); + } + + if (isRead) { + readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY, + verifyPercent); + readerThreads.setMaxErrors(maxReadErrors); + readerThreads.setKeyWindow(keyWindow); + } + + if (isRead && isWrite) { + LOG.info("Concurrent read/write workload: making readers aware of the " + + "write point"); + readerThreads.linkToWriter(writerThreads); + } + + if (isWrite) { + System.out.println("Starting to write data..."); + writerThreads.start(startKey, endKey, numWriterThreads); + } + + if (isRead) { + System.out.println("Starting to read data..."); + readerThreads.start(startKey, endKey, numReaderThreads); + } + + if (isWrite) { + writerThreads.waitForFinish(); + } + + if (isRead) { + readerThreads.waitForFinish(); + } + } + + public static void main(String[] args) { + new LoadTestTool().doStaticMain(args); + } + +}