diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java index 964492c..ca6e7bd 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java @@ -301,6 +301,19 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr numReadFailures.addAndGet(1); // fail the test } } + + @Override + protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, + Result[] results, HTable table, boolean isNullExpected) + throws IOException { + super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected); + // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC + // to complete, but if the request took longer than timeout, we treat that as error. + if (elapsedNano > timeoutNano) { //TODO: should we record each GET in the batch as one READ + timedOutReads.incrementAndGet(); + numReadFailures.addAndGet(1); // fail the test + } + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index 3b45ae7..1d42609 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -127,6 +127,7 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_WRITE = "write"; protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; protected static final String OPT_MULTIPUT = "multiput"; + protected static final String OPT_MULTIGET = "multigetBatchSize"; protected static final String OPT_NUM_KEYS = "num_keys"; protected static final String OPT_READ = "read"; protected static final String OPT_START_KEY = "start_key"; @@ -188,6 +189,7 @@ public class LoadTestTool extends AbstractHBaseTool { // Reader options private int numReaderThreads = DEFAULT_NUM_THREADS; private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; + private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE; private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; private int verifyPercent; @@ -286,6 +288,8 @@ public class LoadTestTool extends AbstractHBaseTool { addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + "to tolerate before terminating all reader threads. The default is " + MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); + addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " + + "separate gets for every column in a row"); addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " + "reads and writes for concurrent write/read workload. The default " + "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); @@ -414,6 +418,12 @@ public class LoadTestTool extends AbstractHBaseTool { 0, Integer.MAX_VALUE); } + if (cmd.hasOption(OPT_MULTIGET)) { + multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), + 0, Integer.MAX_VALUE); + } + + System.out.println("Multi-gets: (value of 0 means no multigets)" + multiGetBatchSize); System.out.println("Percent of keys to verify: " + verifyPercent); System.out.println("Reader threads: " + numReaderThreads); } @@ -555,6 +565,7 @@ public class LoadTestTool extends AbstractHBaseTool { readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen); readerThreads.setMaxErrors(maxReadErrors); readerThreads.setKeyWindow(keyWindow); + readerThreads.setMultiGetBatchSize(multiGetBatchSize); } if (isUpdate && isWrite) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index b0d44fd..a6d6de9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -64,13 +65,18 @@ public class MultiThreadedReader extends MultiThreadedAction */ public static final int DEFAULT_KEY_WINDOW = 0; + /** + * Default batch size for multigets + */ + public static final int DEFAULT_BATCH_SIZE = 0; + protected AtomicLong numKeysVerified = new AtomicLong(0); protected AtomicLong numReadErrors = new AtomicLong(0); protected AtomicLong numReadFailures = new AtomicLong(0); protected AtomicLong nullResult = new AtomicLong(0); - private int maxErrors = DEFAULT_MAX_ERRORS; private int keyWindow = DEFAULT_KEY_WINDOW; + private int batchSize = DEFAULT_BATCH_SIZE; public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, double verifyPercent) { @@ -91,6 +97,10 @@ public class MultiThreadedReader extends MultiThreadedAction this.keyWindow = keyWindow; } + public void setMultiGetBatchSize(int batchSize) { + this.batchSize = batchSize; + } + @Override public void start(long startKey, long endKey, int numThreads) throws IOException { super.start(startKey, endKey, numThreads); @@ -169,32 +179,61 @@ public class MultiThreadedReader extends MultiThreadedAction startTimeMs = System.currentTimeMillis(); curKey = startKey; + long [] keys = new long[batchSize]; + long [] keysForThisReader = new long[batchSize]; + int readingRandomKeyStartIndex = -1; while (curKey < endKey && !aborted) { - long k = getNextKeyToRead(); - - // A sanity check for the key range. - if (k < startKey || k >= endKey) { - numReadErrors.incrementAndGet(); - throw new AssertionError("Load tester logic error: proposed key " + - "to read " + k + " is out of range (startKey=" + startKey + - ", endKey=" + endKey + ")"); - } - - if (k % numThreads != readerId || - writer != null && writer.failedToWriteKey(k)) { - // Skip keys that this thread should not read, as well as the keys - // that we know the writer failed to write. - continue; + int numIter = 0; + boolean isMultiGet = batchSize > 0; + // if multiGet, loop until we have the number of keys equal to the batch size + // subject to the availability of enough keys, else break and read what we can + do { + keys[numIter] = getNextKeyToRead(); + if (readingRandomKey && readingRandomKeyStartIndex == 0) { + //store the first index of a random read + readingRandomKeyStartIndex = numIter; + } + numIter++; + } while (isMultiGet && numIter < batchSize); + + int i = 0; + int currentIter = 0; + int randomRead = 0; + for (long k : keys) { + // A sanity check for the key range. + if (k < startKey || k >= endKey) { + numReadErrors.incrementAndGet(); + throw new AssertionError("Load tester logic error: proposed key " + + "to read " + k + " is out of range (startKey=" + startKey + + ", endKey=" + endKey + ")"); + } + + currentIter++; + if (k % numThreads != readerId || + writer != null && writer.failedToWriteKey(k)) { + // Skip keys that this thread should not read, as well as the keys + // that we know the writer failed to write. + continue; + } + keysForThisReader[i++] = k; + //increment the randomRead count when we know the index in the + //global keys array is more than the smallest index and this readerId + //would read it + if (readingRandomKeyStartIndex < currentIter) { + randomRead++; + } } - readKey(k); - if (k == curKey - 1 && !readingRandomKey) { - // We have verified another unique key. - numUniqueKeysVerified.incrementAndGet(); - } + readKey(keysForThisReader, batchSize > 0); + // We have verified some unique key(s). + numUniqueKeysVerified.getAndAdd(randomRead); } } + protected boolean hasKeyAdvanced(long key) { + return key > (startKey - 1); + } + /** * Should only be used for the concurrent writer/reader workload. The * maximum key we are allowed to read, subject to the "key window" @@ -240,22 +279,44 @@ public class MultiThreadedReader extends MultiThreadedAction % (maxKeyToRead - startKey + 1); } - private Get readKey(long keyToRead) { - Get get = null; - try { - get = createGet(keyToRead); - queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead); - } catch (IOException e) { - numReadFailures.addAndGet(1); - LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") - + ", time from start: " - + (System.currentTimeMillis() - startTimeMs) + " ms"); - if (printExceptionTrace) { - LOG.warn(e); - printExceptionTrace = false; + private Get[] readKey(long[] keysToRead, boolean useMultiGet) { + Get [] gets = new Get[keysToRead.length]; + int i = 0; + for (long keyToRead : keysToRead) { + try { + gets[i] = createGet(keyToRead); + if (!useMultiGet) { + queryKey(gets[i], RandomUtils.nextInt(100) < verifyPercent, keyToRead); + } + i++; + } catch (IOException e) { + numReadFailures.addAndGet(1); + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + + ", time from start: " + + (System.currentTimeMillis() - startTimeMs) + " ms"); + if (printExceptionTrace) { + LOG.warn(e); + printExceptionTrace = false; + } } } - return get; + if (useMultiGet) { + try { + queryKey(gets, RandomUtils.nextInt(100) < verifyPercent, keysToRead); + } catch (IOException e) { + numReadFailures.addAndGet(gets.length); + for (long keyToRead : keysToRead) { + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + + ", time from start: " + + (System.currentTimeMillis() - startTimeMs) + " ms"); + } + if (printExceptionTrace) { + LOG.warn(e); + printExceptionTrace = false; + } + } + } + return gets; } protected Get createGet(long keyToRead) throws IOException { @@ -278,6 +339,14 @@ public class MultiThreadedReader extends MultiThreadedAction return get; } + public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException { + // read the data + long start = System.nanoTime(); + Result[] results = table.get(Arrays.asList(gets)); + long end = System.nanoTime(); + verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false); + } + public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { String rowKey = Bytes.toString(get.getRow()); @@ -288,11 +357,28 @@ public class MultiThreadedReader extends MultiThreadedAction verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, table, false); } + protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, + Result[] results, HTable table, boolean isNullExpected) + throws IOException { + totalOpTimeMs.addAndGet(elapsedNano / 1000000); + numKeys.addAndGet(gets.length); + int i = 0; + for (Result result : results) { + String rowKey = Bytes.toString(gets[i++].getRow()); + verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, rowKey, result, table, isNullExpected); + } + } + protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano, Result result, HTable table, boolean isNullExpected) throws IOException { totalOpTimeMs.addAndGet(elapsedNano / 1000000); numKeys.addAndGet(1); + verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, rowKey, result, table, isNullExpected); + } + + private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, String rowKey, + Result result, HTable table, boolean isNullExpected) throws IOException { if (!result.isEmpty()) { if (verify) { numKeysVerified.incrementAndGet();