From 18f10e372173bbd3abd3df097c45f5b02183f092 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 20 Nov 2016 20:19:53 +0800 Subject: [PATCH] HBASE-16984 Implement getScanner --- .../hadoop/hbase/client/AbstractClientScanner.java | 76 ---- .../hbase/client/AllowPartialScanResultCache.java | 26 +- .../hadoop/hbase/client/AsyncConnection.java | 16 +- .../hadoop/hbase/client/AsyncConnectionImpl.java | 10 +- .../hadoop/hbase/client/AsyncRegionLocator.java | 4 +- .../hadoop/hbase/client/AsyncResultScanner.java | 172 +++++++++ .../org/apache/hadoop/hbase/client/AsyncTable.java | 354 +----------------- .../apache/hadoop/hbase/client/AsyncTableBase.java | 354 ++++++++++++++++++ .../apache/hadoop/hbase/client/AsyncTableImpl.java | 355 +++--------------- .../hbase/client/ClientAsyncPrefetchScanner.java | 12 +- .../apache/hadoop/hbase/client/ClientScanner.java | 11 +- .../hadoop/hbase/client/ConnectionUtils.java | 34 ++ .../apache/hadoop/hbase/client/RawAsyncTable.java | 56 +++ .../hadoop/hbase/client/RawAsyncTableImpl.java | 408 +++++++++++++++++++++ .../apache/hadoop/hbase/client/ResultScanner.java | 72 +++- .../hbase/client/AbstractTestAsyncTableScan.java | 33 +- .../hbase/client/TestAsyncGetMultiThread.java | 7 +- .../TestAsyncSingleRequestRpcRetryingCaller.java | 4 +- .../apache/hadoop/hbase/client/TestAsyncTable.java | 62 +++- .../hbase/client/TestAsyncTableNoncedRetry.java | 4 +- .../hadoop/hbase/client/TestAsyncTableScan.java | 8 +- .../hadoop/hbase/client/TestAsyncTableScanner.java | 93 +++++ .../hbase/client/TestAsyncTableSmallScan.java | 42 ++- 23 files changed, 1403 insertions(+), 810 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncResultScanner.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java index 7658faf..87304c3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; - import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; @@ -52,76 +48,4 @@ public abstract class AbstractClientScanner implements ResultScanner { public ScanMetrics getScanMetrics() { return scanMetrics; } - - /** - * Get nbRows rows. - * How many RPCs are made is determined by the {@link Scan#setCaching(int)} - * setting (or hbase.client.scanner.caching in hbase-site.xml). - * @param nbRows number of rows to return - * @return Between zero and nbRows rowResults. Scan is done - * if returned array is of zero-length (We never return null). - * @throws IOException - */ - @Override - public Result [] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); - for(int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - - @Override - public Iterator iterator() { - return new Iterator() { - // The next RowResult, possibly pre-read - Result next = null; - - // return true if there is another item pending, false if there isn't. - // this method is where the actual advancing takes place, but you need - // to call next() to consume it. hasNext() will only advance if there - // isn't a pending next(). - @Override - public boolean hasNext() { - if (next == null) { - try { - next = AbstractClientScanner.this.next(); - return next != null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return true; - } - - // get the pending next item and advance the iterator. returns null if - // there is no next item. - @Override - public Result next() { - // since hasNext() does the real advancing, we call this to determine - // if there is a next before proceeding. - if (!hasNext()) { - return null; - } - - // if we get to here, then hasNext() has given us an item to return. - // we want to return the item and then null out the next pointer, so - // we use a temporary variable. - Result temp = next; - next = null; - return temp; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java index bc6e44e..ab26587 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -39,29 +37,7 @@ class AllowPartialScanResultCache implements ScanResultCache { private Cell lastCell; private Result filterCells(Result result) { - if (lastCell == null) { - return result; - } - - // not the same row - if (!CellUtil.matchingRow(lastCell, result.getRow(), 0, result.getRow().length)) { - return result; - } - Cell[] rawCells = result.rawCells(); - int index = Arrays.binarySearch(rawCells, lastCell, CellComparator::compareWithoutRow); - if (index < 0) { - index = -index - 1; - } else { - index++; - } - if (index == 0) { - return result; - } - if (index == rawCells.length) { - return null; - } - return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, - result.isStale(), true); + return lastCell == null ? result : ConnectionUtils.filterCells(result, lastCell); } private void updateLastCell(Result result) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java index 6dc0300..7b0f339 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; @@ -49,6 +50,18 @@ public interface AsyncConnection extends Closeable { AsyncTableRegionLocator getRegionLocator(TableName tableName); /** + * Retrieve an RawAsyncTable implementation for accessing a table. The returned Table is not + * thread safe, a new instance should be created for each using thread. This is a lightweight + * operation, pooling or caching of the returned AsyncTable is neither required nor desired. + *

+ * This method no longer checks table existence. An exception will be thrown if the table does not + * exist only when the first operation is attempted. + * @param tableName the name of the table + * @return an RawAsyncTable to use for interactions with this table + */ + RawAsyncTable getRawTable(TableName tableName); + + /** * Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread * safe, a new instance should be created for each using thread. This is a lightweight operation, * pooling or caching of the returned AsyncTable is neither required nor desired. @@ -56,7 +69,8 @@ public interface AsyncConnection extends Closeable { * This method no longer checks table existence. An exception will be thrown if the table does not * exist only when the first operation is attempted. * @param tableName the name of the table + * @param pool the thread pool to use for executing callback * @return an AsyncTable to use for interactions with this table */ - AsyncTable getTable(TableName tableName); + AsyncTable getTable(TableName tableName, ExecutorService pool); } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 70e024e..02dce67 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; @@ -148,7 +149,12 @@ class AsyncConnectionImpl implements AsyncConnection { } @Override - public AsyncTable getTable(TableName tableName) { - return new AsyncTableImpl(this, tableName); + public RawAsyncTable getRawTable(TableName tableName) { + return new RawAsyncTableImpl(this, tableName); + } + + @Override + public AsyncTable getTable(TableName tableName, ExecutorService pool) { + return new AsyncTableImpl(this, getRawTable(tableName), pool); } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index ba5a0e0..6b74e4c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -262,7 +262,7 @@ class AsyncRegionLocator { } CompletableFuture future = new CompletableFuture<>(); byte[] metaKey = createRegionName(tableName, row, NINES, false); - conn.getTable(META_TABLE_NAME) + conn.getRawTable(META_TABLE_NAME) .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) .whenComplete( (results, error) -> onScanComplete(future, tableName, row, results, error, "row", loc -> { @@ -327,7 +327,7 @@ class AsyncRegionLocator { metaKey = createRegionName(tableName, startRowOfCurrentRegion, ZEROES, false); } CompletableFuture future = new CompletableFuture<>(); - conn.getTable(META_TABLE_NAME) + conn.getRawTable(META_TABLE_NAME) .smallScan(new Scan(metaKey).setReversed(true).setSmall(true).addFamily(CATALOG_FAMILY), 1) .whenComplete((results, error) -> onScanComplete(future, tableName, startRowOfCurrentRegion, results, error, "startRowOfCurrentRegion", loc -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncResultScanner.java new file mode 100644 index 0000000..e739dab --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncResultScanner.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; +import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; + +import com.google.common.base.Throwables; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.function.Function; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * + */ +@InterfaceAudience.Private +class AsyncResultScanner implements ResultScanner, ScanResultConsumer { + + private final RawAsyncTable rawTable; + + private final Scan scan; + + private final long maxCacheSize; + + private final Queue queue = new ArrayDeque<>(); + + private long cacheSize; + + private boolean closed = false; + + private Throwable error; + + private boolean prefetchStopped; + + // used to filter out cells that already returned when we restart a scan + private Cell lastCell; + + private Function createClosestRow; + + public AsyncResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { + this.rawTable = table; + this.scan = scan; + this.maxCacheSize = maxCacheSize; + this.createClosestRow = scan.isReversed() ? ConnectionUtils::createClosestRowBefore + : ConnectionUtils::createClosestRowAfter; + table.scan(scan, this); + } + + private void addToCache(Result result) { + queue.add(result); + cacheSize += calcEstimatedSize(result); + } + + private void stopPrefetch(Result lastResult) { + prefetchStopped = true; + if (lastResult.isPartial()) { + scan.setStartRow(lastResult.getRow()); + lastCell = lastResult.rawCells()[lastResult.rawCells().length - 1]; + } else { + scan.setStartRow(createClosestRow.apply(lastResult.getRow())); + } + } + + @Override + public synchronized boolean onNext(Result[] results) { + assert results.length > 0; + if (closed) { + return false; + } + Result firstResult = results[0]; + if (lastCell != null) { + firstResult = filterCells(firstResult, lastCell); + lastCell = null; + if (firstResult != null) { + addToCache(firstResult); + } else if (results.length == 1) { + // the only one result is null + return true; + } + } else { + addToCache(firstResult); + } + for (int i = 1; i < results.length; i++) { + addToCache(results[i]); + } + notifyAll(); + if (cacheSize < maxCacheSize) { + return true; + } + stopPrefetch(results[results.length - 1]); + return false; + } + + @Override + public synchronized boolean onHeartbeat() { + return !closed; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + } + + @Override + public synchronized void onComplete() { + closed = true; + notifyAll(); + } + + private void resumePrefetch() { + prefetchStopped = false; + rawTable.scan(scan, this); + } + + @Override + public synchronized Result next() throws IOException { + while (queue.isEmpty()) { + if (closed) { + return null; + } + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new IOException(error); + } + try { + wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + Result result = queue.poll(); + cacheSize -= calcEstimatedSize(result); + if (prefetchStopped && cacheSize <= maxCacheSize / 2) { + resumePrefetch(); + } + return result; + } + + @Override + public synchronized void close() { + closed = true; + queue.clear(); + cacheSize = 0; + notifyAll(); + } + + @Override + public boolean renewLease() { + return false; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 851fbf1..9e7472c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -17,359 +17,43 @@ */ package org.apache.hadoop.hbase.client; -import com.google.common.base.Preconditions; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ReflectionUtils; /** - * The asynchronous version of Table. Obtain an instance from a {@link AsyncConnection}. - *

- * The implementation is NOT required to be thread safe. Do NOT access it from multiple threads - * concurrently. + * The asynchronous table for normal users. *

- * Usually the implementations will not throw any exception directly, you need to get the exception - * from the returned {@link CompletableFuture}. + * The implementation should make sure that user can do everything they want to the returned + * {@code CompletableFuture} without break anything. Usually the implementation will require user to + * provide a {@code ExecutorService}. */ @InterfaceAudience.Public @InterfaceStability.Unstable -public interface AsyncTable { - - /** - * Gets the fully qualified table name instance of this table. - */ - TableName getName(); - - /** - * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. - *

- * The reference returned is not a copy, so any change made to it will affect this instance. - */ - Configuration getConfiguration(); - - /** - * Set timeout of each rpc read request in operations of this Table instance, will override the - * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too - * long, it will stop waiting and send a new request to retry until retries exhausted or operation - * timeout reached. - */ - void setReadRpcTimeout(long timeout, TimeUnit unit); - - /** - * Get timeout of each rpc read request in this Table instance. - */ - long getReadRpcTimeout(TimeUnit unit); - - /** - * Set timeout of each rpc write request in operations of this Table instance, will override the - * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too - * long, it will stop waiting and send a new request to retry until retries exhausted or operation - * timeout reached. - */ - void setWriteRpcTimeout(long timeout, TimeUnit unit); - - /** - * Get timeout of each rpc write request in this Table instance. - */ - long getWriteRpcTimeout(TimeUnit unit); - - /** - * Set timeout of each operation in this Table instance, will override the value of - * {@code hbase.client.operation.timeout} in configuration. - *

- * Operation timeout is a top-level restriction that makes sure an operation will not be blocked - * more than this. In each operation, if rpc request fails because of timeout or other reason, it - * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed - * reach the operation timeout before retries exhausted, it will break early and throw - * SocketTimeoutException. - */ - void setOperationTimeout(long timeout, TimeUnit unit); - - /** - * Get timeout of each operation in Table instance. - */ - long getOperationTimeout(TimeUnit unit); - - /** - * Set timeout of a single operation in a scan, such as openScanner and next. Will override the - * value {@code hbase.client.scanner.timeout.period} in configuration. - *

- * Generally a scan will never timeout after we add heartbeat support unless the region is - * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single - * operation in a scan. - */ - void setScanTimeout(long timeout, TimeUnit unit); - - /** - * Get the timeout of a single operation in a scan. - */ - long getScanTimeout(TimeUnit unit); - - /** - * Test for the existence of columns in the table, as specified by the Get. - *

- * This will return true if the Get matches one or more keys, false if not. - *

- * This is a server-side call so it prevents any data from being transfered to the client. - * @return true if the specified Get matches one or more keys, false if not. The return value will - * be wrapped by a {@link CompletableFuture}. - */ - default CompletableFuture exists(Get get) { - if (!get.isCheckExistenceOnly()) { - get = ReflectionUtils.newInstance(get.getClass(), get); - get.setCheckExistenceOnly(true); - } - return get(get).thenApply(r -> r.getExists()); - } - - /** - * Extracts certain cells from a given row. - * @param get The object that specifies what data to fetch and from which row. - * @return The data coming from the specified row, if it exists. If the row specified doesn't - * exist, the {@link Result} instance returned won't contain any - * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The - * return value will be wrapped by a {@link CompletableFuture}. - */ - CompletableFuture get(Get get); - - /** - * Puts some data to the table. - * @param put The data to put. - * @return A {@link CompletableFuture} that always returns null when complete normally. - */ - CompletableFuture put(Put put); - - /** - * Deletes the specified cells/row. - * @param delete The object that specifies what to delete. - * @return A {@link CompletableFuture} that always returns null when complete normally. - */ - CompletableFuture delete(Delete delete); - - /** - * Appends values to one or more columns within a single row. - *

- * This operation does not appear atomic to readers. Appends are done under a single row lock, so - * write operations to a row are synchronized, but readers do not take row locks so get and scan - * operations can see this operation partially completed. - * @param append object that specifies the columns and amounts to be used for the increment - * operations - * @return values of columns after the append operation (maybe null). The return value will be - * wrapped by a {@link CompletableFuture}. - */ - CompletableFuture append(Append append); - - /** - * Increments one or more columns within a single row. - *

- * This operation does not appear atomic to readers. Increments are done under a single row lock, - * so write operations to a row are synchronized, but readers do not take row locks so get and - * scan operations can see this operation partially completed. - * @param increment object that specifies the columns and amounts to be used for the increment - * operations - * @return values of columns after the increment. The return value will be wrapped by a - * {@link CompletableFuture}. - */ - CompletableFuture increment(Increment increment); - - /** - * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} - *

- * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. - * @param row The row that contains the cell to increment. - * @param family The column family of the cell to increment. - * @param qualifier The column qualifier of the cell to increment. - * @param amount The amount to increment the cell with (or decrement, if the amount is negative). - * @return The new value, post increment. The return value will be wrapped by a - * {@link CompletableFuture}. - */ - default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount) { - return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); - } - - /** - * Atomically increments a column value. If the column value already exists and is not a - * big-endian long, this could throw an exception. If the column value does not yet exist it is - * initialized to amount and written to the specified column. - *

- * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose - * any increments that have not been flushed. - * @param row The row that contains the cell to increment. - * @param family The column family of the cell to increment. - * @param qualifier The column qualifier of the cell to increment. - * @param amount The amount to increment the cell with (or decrement, if the amount is negative). - * @param durability The persistence guarantee for this increment. - * @return The new value, post increment. The return value will be wrapped by a - * {@link CompletableFuture}. - */ - default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount, Durability durability) { - Preconditions.checkNotNull(row, "row is null"); - Preconditions.checkNotNull(family, "family is null"); - Preconditions.checkNotNull(qualifier, "qualifier is null"); - return increment( - new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) - .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); - } - - /** - * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it - * adds the put. If the passed value is null, the check is for the lack of column (ie: - * non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param put data to put if check succeeds - * @return true if the new put was executed, false otherwise. The return value will be wrapped by - * a {@link CompletableFuture}. - */ - default CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Put put) { - return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put); - } - - /** - * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it - * adds the put. If the passed value is null, the check is for the lack of column (ie: - * non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compareOp comparison operator to use - * @param value the expected value - * @param put data to put if check succeeds - * @return true if the new put was executed, false otherwise. The return value will be wrapped by - * a {@link CompletableFuture}. - */ - CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Put put); +public interface AsyncTable extends AsyncTableBase { /** - * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it - * adds the delete. If the passed value is null, the check is for the lack of column (ie: - * non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param delete data to delete if check succeeds - * @return true if the new delete was executed, false otherwise. The return value will be wrapped - * by a {@link CompletableFuture}. + * Gets a scanner on the current table for the given family. + * @param family The column family to scan. + * @return A scanner. */ - default CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Delete delete) { - return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete); + default ResultScanner getScanner(byte[] family) { + return getScanner(new Scan().addFamily(family)); } /** - * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it - * adds the delete. If the passed value is null, the check is for the lack of column (ie: - * non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compareOp comparison operator to use - * @param value the expected value - * @param delete data to delete if check succeeds - * @return true if the new delete was executed, false otherwise. The return value will be wrapped - * by a {@link CompletableFuture}. + * Gets a scanner on the current table for the given family and qualifier. + * @param family The column family to scan. + * @param qualifier The column qualifier to scan. + * @return A scanner. */ - CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Delete delete); - - /** - * Performs multiple mutations atomically on a single row. Currently {@link Put} and - * {@link Delete} are supported. - * @param mutation object that specifies the set of mutations to perform atomically - * @return A {@link CompletableFuture} that always returns null when complete normally. - */ - CompletableFuture mutateRow(RowMutations mutation); - - /** - * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it - * performs the row mutations. If the passed value is null, the check is for the lack of column - * (ie: non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param mutation mutations to perform if check succeeds - * @return true if the new put was executed, false otherwise. The return value will be wrapped by - * a {@link CompletableFuture}. - */ - default CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - byte[] value, RowMutations mutation) { - return checkAndMutate(row, family, qualifier, CompareOp.EQUAL, value, mutation); + default ResultScanner getScanner(byte[] family, byte[] qualifier) { + return getScanner(new Scan().addColumn(family, qualifier)); } /** - * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it - * performs the row mutations. If the passed value is null, the check is for the lack of column - * (ie: non-existence) - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compareOp the comparison operator - * @param value the expected value - * @param mutation mutations to perform if check succeeds - * @return true if the new put was executed, false otherwise. The return value will be wrapped by - * a {@link CompletableFuture}. - */ - CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, RowMutations mutation); - - /** - * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}. - * @see #smallScan(Scan, int) - */ - default CompletableFuture> smallScan(Scan scan) { - return smallScan(scan, Integer.MAX_VALUE); - } - - /** - * Return all the results that match the given scan object. The number of the returned results - * will not be greater than {@code limit}. - *

- * Notice that the scan must be small, and should not use batch or allowPartialResults. The - * {@code caching} property of the scan object is also ignored as we will use {@code limit} - * instead. - * @param scan A configured {@link Scan} object. - * @param limit the limit of results count - * @return The results of this small scan operation. The return value will be wrapped by a - * {@link CompletableFuture}. - */ - CompletableFuture> smallScan(Scan scan, int limit); - - /** - * The basic scan API uses the observer pattern. All results that match the given scan object will - * be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}. - * {@link ScanResultConsumer#onComplete()} means the scan is finished, and - * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan - * is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we - * can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually - * because the matched results are too sparse, for example, a filter which almost filters out - * everything is specified. - *

- * Notice that, the methods of the given {@code consumer} will be called directly in the rpc - * framework's callback thread, so typically you should not do any time consuming work inside - * these methods, otherwise you will be likely to block at least one connection to RS(even more if - * the rpc framework uses NIO). - *

- * This method is only for experts, do NOT use this method if you have other - * choice. + * Returns a scanner on the current table as specified by the {@link Scan} object. * @param scan A configured {@link Scan} object. - * @param consumer the consumer used to receive results. + * @return A scanner. */ - void scan(Scan scan, ScanResultConsumer consumer); + ResultScanner getScanner(Scan scan); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java new file mode 100644 index 0000000..e051a6b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.base.Preconditions; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * The base interface for asynchronous version of Table. Obtain an instance from a + * {@link AsyncConnection}. + *

+ * The implementation is NOT required to be thread safe. Do NOT access it from multiple threads + * concurrently. + *

+ * Usually the implementations will not throw any exception directly, you need to get the exception + * from the returned {@link CompletableFuture}. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AsyncTableBase { + + /** + * Gets the fully qualified table name instance of this table. + */ + TableName getName(); + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will affect this instance. + */ + Configuration getConfiguration(); + + /** + * Set timeout of each rpc read request in operations of this Table instance, will override the + * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too + * long, it will stop waiting and send a new request to retry until retries exhausted or operation + * timeout reached. + */ + void setReadRpcTimeout(long timeout, TimeUnit unit); + + /** + * Get timeout of each rpc read request in this Table instance. + */ + long getReadRpcTimeout(TimeUnit unit); + + /** + * Set timeout of each rpc write request in operations of this Table instance, will override the + * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too + * long, it will stop waiting and send a new request to retry until retries exhausted or operation + * timeout reached. + */ + void setWriteRpcTimeout(long timeout, TimeUnit unit); + + /** + * Get timeout of each rpc write request in this Table instance. + */ + long getWriteRpcTimeout(TimeUnit unit); + + /** + * Set timeout of each operation in this Table instance, will override the value of + * {@code hbase.client.operation.timeout} in configuration. + *

+ * Operation timeout is a top-level restriction that makes sure an operation will not be blocked + * more than this. In each operation, if rpc request fails because of timeout or other reason, it + * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed + * reach the operation timeout before retries exhausted, it will break early and throw + * SocketTimeoutException. + */ + void setOperationTimeout(long timeout, TimeUnit unit); + + /** + * Get timeout of each operation in Table instance. + */ + long getOperationTimeout(TimeUnit unit); + + /** + * Set timeout of a single operation in a scan, such as openScanner and next. Will override the + * value {@code hbase.client.scanner.timeout.period} in configuration. + *

+ * Generally a scan will never timeout after we add heartbeat support unless the region is + * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single + * operation in a scan. + */ + void setScanTimeout(long timeout, TimeUnit unit); + + /** + * Get the timeout of a single operation in a scan. + */ + long getScanTimeout(TimeUnit unit); + + /** + * Test for the existence of columns in the table, as specified by the Get. + *

+ * This will return true if the Get matches one or more keys, false if not. + *

+ * This is a server-side call so it prevents any data from being transfered to the client. + * @return true if the specified Get matches one or more keys, false if not. The return value will + * be wrapped by a {@link CompletableFuture}. + */ + default CompletableFuture exists(Get get) { + if (!get.isCheckExistenceOnly()) { + get = ReflectionUtils.newInstance(get.getClass(), get); + get.setCheckExistenceOnly(true); + } + return get(get).thenApply(r -> r.getExists()); + } + + /** + * Extracts certain cells from a given row. + * @param get The object that specifies what data to fetch and from which row. + * @return The data coming from the specified row, if it exists. If the row specified doesn't + * exist, the {@link Result} instance returned won't contain any + * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The + * return value will be wrapped by a {@link CompletableFuture}. + */ + CompletableFuture get(Get get); + + /** + * Puts some data to the table. + * @param put The data to put. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + CompletableFuture put(Put put); + + /** + * Deletes the specified cells/row. + * @param delete The object that specifies what to delete. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + CompletableFuture delete(Delete delete); + + /** + * Appends values to one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Appends are done under a single row lock, so + * write operations to a row are synchronized, but readers do not take row locks so get and scan + * operations can see this operation partially completed. + * @param append object that specifies the columns and amounts to be used for the increment + * operations + * @return values of columns after the append operation (maybe null). The return value will be + * wrapped by a {@link CompletableFuture}. + */ + CompletableFuture append(Append append); + + /** + * Increments one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Increments are done under a single row lock, + * so write operations to a row are synchronized, but readers do not take row locks so get and + * scan operations can see this operation partially completed. + * @param increment object that specifies the columns and amounts to be used for the increment + * operations + * @return values of columns after the increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture increment(Increment increment); + + /** + * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} + *

+ * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. + * @param row The row that contains the cell to increment. + * @param family The column family of the cell to increment. + * @param qualifier The column qualifier of the cell to increment. + * @param amount The amount to increment the cell with (or decrement, if the amount is negative). + * @return The new value, post increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount) { + return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); + } + + /** + * Atomically increments a column value. If the column value already exists and is not a + * big-endian long, this could throw an exception. If the column value does not yet exist it is + * initialized to amount and written to the specified column. + *

+ * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose + * any increments that have not been flushed. + * @param row The row that contains the cell to increment. + * @param family The column family of the cell to increment. + * @param qualifier The column qualifier of the cell to increment. + * @param amount The amount to increment the cell with (or decrement, if the amount is negative). + * @param durability The persistence guarantee for this increment. + * @return The new value, post increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + default CompletableFuture incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount, Durability durability) { + Preconditions.checkNotNull(row, "row is null"); + Preconditions.checkNotNull(family, "family is null"); + Preconditions.checkNotNull(qualifier, "qualifier is null"); + return increment( + new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) + .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); + } + + /** + * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it + * adds the put. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param put data to put if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + default CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) { + return checkAndPut(row, family, qualifier, CompareOp.EQUAL, value, put); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * adds the put. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param put data to put if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put); + + /** + * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it + * adds the delete. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param delete data to delete if check succeeds + * @return true if the new delete was executed, false otherwise. The return value will be wrapped + * by a {@link CompletableFuture}. + */ + default CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) { + return checkAndDelete(row, family, qualifier, CompareOp.EQUAL, value, delete); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * adds the delete. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param delete data to delete if check succeeds + * @return true if the new delete was executed, false otherwise. The return value will be wrapped + * by a {@link CompletableFuture}. + */ + CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete); + + /** + * Performs multiple mutations atomically on a single row. Currently {@link Put} and + * {@link Delete} are supported. + * @param mutation object that specifies the set of mutations to perform atomically + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + CompletableFuture mutateRow(RowMutations mutation); + + /** + * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it + * performs the row mutations. If the passed value is null, the check is for the lack of column + * (ie: non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + default CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + byte[] value, RowMutations mutation) { + return checkAndMutate(row, family, qualifier, CompareOp.EQUAL, value, mutation); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * performs the row mutations. If the passed value is null, the check is for the lack of column + * (ie: non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp the comparison operator + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ + CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation); + + /** + * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}. + * @see #smallScan(Scan, int) + */ + default CompletableFuture> smallScan(Scan scan) { + return smallScan(scan, Integer.MAX_VALUE); + } + + /** + * Return all the results that match the given scan object. The number of the returned results + * will not be greater than {@code limit}. + *

+ * Notice that the scan must be small, and should not use batch or allowPartialResults. The + * {@code caching} property of the scan object is also ignored as we will use {@code limit} + * instead. + * @param scan A configured {@link Scan} object. + * @param limit the limit of results count + * @return The results of this small scan operation. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture> smallScan(Scan scan, int limit); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index c5afceb..b147a49 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -17,392 +17,159 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; - -import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; -import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; /** - * The implementation of AsyncTable. + * The implementation of AsyncTable. Based on {@link RawAsyncTable}. */ @InterfaceAudience.Private class AsyncTableImpl implements AsyncTable { - private static final Log LOG = LogFactory.getLog(AsyncTableImpl.class); - - private final AsyncConnectionImpl conn; - - private final TableName tableName; + private final RawAsyncTable rawTable; - private final int defaultScannerCaching; + private final ExecutorService pool; private final long defaultScannerMaxResultSize; - private long readRpcTimeoutNs; - - private long writeRpcTimeoutNs; - - private long operationTimeoutNs; - - private long scanTimeoutNs; - - public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { - this.conn = conn; - this.tableName = tableName; - this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs(); - this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs(); - this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs() - : conn.connConf.getOperationTimeoutNs(); - this.defaultScannerCaching = conn.connConf.getScannerCaching(); + public AsyncTableImpl(AsyncConnectionImpl conn, RawAsyncTable rawTable, ExecutorService pool) { + this.rawTable = rawTable; + this.pool = pool; this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); - this.scanTimeoutNs = conn.connConf.getScanTimeoutNs(); } @Override public TableName getName() { - return tableName; + return rawTable.getName(); } @Override public Configuration getConfiguration() { - return conn.getConfiguration(); - } - - @FunctionalInterface - private interface Converter { - D convert(I info, S src) throws IOException; + return rawTable.getConfiguration(); } - @FunctionalInterface - private interface RpcCall { - void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, - RpcCallback done); + @Override + public void setReadRpcTimeout(long timeout, TimeUnit unit) { + rawTable.setReadRpcTimeout(timeout, unit); } - private static CompletableFuture call( - HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, - Converter reqConvert, RpcCall rpcCall, - Converter respConverter) { - CompletableFuture future = new CompletableFuture<>(); - try { - rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req), - new RpcCallback() { - - @Override - public void run(PRESP resp) { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - } else { - try { - future.complete(respConverter.convert(controller, resp)); - } catch (IOException e) { - future.completeExceptionally(e); - } - } - } - }); - } catch (IOException e) { - future.completeExceptionally(e); - } - return future; + @Override + public long getReadRpcTimeout(TimeUnit unit) { + return rawTable.getReadRpcTimeout(unit); } - private static CompletableFuture mutate(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, REQ req, - Converter reqConvert, - Converter respConverter) { - return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), - respConverter); + @Override + public void setWriteRpcTimeout(long timeout, TimeUnit unit) { + rawTable.setWriteRpcTimeout(timeout, unit); } - private static CompletableFuture voidMutate(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, REQ req, - Converter reqConvert) { - return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { - return null; - }); + @Override + public long getWriteRpcTimeout(TimeUnit unit) { + return rawTable.getWriteRpcTimeout(unit); } - private static Result toResult(HBaseRpcController controller, MutateResponse resp) - throws IOException { - if (!resp.hasResult()) { - return null; - } - return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); + @Override + public void setOperationTimeout(long timeout, TimeUnit unit) { + rawTable.setOperationTimeout(timeout, unit); } - @FunctionalInterface - private interface NoncedConverter { - D convert(I info, S src, long nonceGroup, long nonce) throws IOException; + @Override + public long getOperationTimeout(TimeUnit unit) { + return rawTable.getOperationTimeout(unit); } - private CompletableFuture noncedMutate(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, REQ req, - NoncedConverter reqConvert, - Converter respConverter) { - long nonceGroup = conn.getNonceGenerator().getNonceGroup(); - long nonce = conn.getNonceGenerator().newNonce(); - return mutate(controller, loc, stub, req, - (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); + @Override + public void setScanTimeout(long timeout, TimeUnit unit) { + rawTable.setScanTimeout(timeout, unit); } - private SingleRequestCallerBuilder newCaller(byte[] row, long rpcTimeoutNs) { - return conn.callerFactory. single().table(tableName).row(row) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS); + @Override + public long getScanTimeout(TimeUnit unit) { + return rawTable.getScanTimeout(unit); } - private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { - return newCaller(row.getRow(), rpcTimeoutNs); + private CompletableFuture wrap(CompletableFuture future) { + CompletableFuture asyncFuture = new CompletableFuture<>(); + future.whenCompleteAsync((r, e) -> { + if (e != null) { + asyncFuture.completeExceptionally(e); + } else { + asyncFuture.complete(r); + } + }, pool); + return asyncFuture; } @Override public CompletableFuture get(Get get) { - return this. newCaller(get, readRpcTimeoutNs) - .action((controller, loc, stub) -> AsyncTableImpl - . call(controller, loc, stub, get, - RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), - (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) - .call(); + return wrap(rawTable.get(get)); } @Override public CompletableFuture put(Put put) { - return this - . newCaller(put, writeRpcTimeoutNs).action((controller, loc, stub) -> AsyncTableImpl - . voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) - .call(); + return wrap(rawTable.put(put)); } @Override public CompletableFuture delete(Delete delete) { - return this. newCaller(delete, writeRpcTimeoutNs) - .action((controller, loc, stub) -> AsyncTableImpl. voidMutate(controller, loc, stub, - delete, RequestConverter::buildMutateRequest)) - .call(); + return wrap(rawTable.delete(delete)); } @Override public CompletableFuture append(Append append) { - checkHasFamilies(append); - return this. newCaller(append, writeRpcTimeoutNs) - .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, - append, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult)) - .call(); + return wrap(rawTable.append(append)); } @Override public CompletableFuture increment(Increment increment) { - checkHasFamilies(increment); - return this. newCaller(increment, writeRpcTimeoutNs) - .action((controller, loc, stub) -> this. noncedMutate(controller, loc, - stub, increment, RequestConverter::buildMutateRequest, AsyncTableImpl::toResult)) - .call(); + return wrap(rawTable.increment(increment)); } @Override public CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, Put put) { - return this. newCaller(row, writeRpcTimeoutNs) - .action((controller, loc, stub) -> AsyncTableImpl. mutate(controller, loc, - stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p), - (c, r) -> r.getProcessed())) - .call(); + return wrap(rawTable.checkAndPut(row, family, qualifier, compareOp, value, put)); } @Override public CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, Delete delete) { - return this. newCaller(row, writeRpcTimeoutNs) - .action((controller, loc, stub) -> AsyncTableImpl. mutate(controller, loc, - stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d), - (c, r) -> r.getProcessed())) - .call(); - } - - // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, - // so here I write a new method as I do not want to change the abstraction of call method. - private static CompletableFuture mutateRow(HBaseRpcController controller, - HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, - Converter reqConvert, - Function respConverter) { - CompletableFuture future = new CompletableFuture<>(); - try { - byte[] regionName = loc.getRegionInfo().getRegionName(); - MultiRequest req = reqConvert.convert(regionName, mutation); - stub.multi(controller, req, new RpcCallback() { - - @Override - public void run(MultiResponse resp) { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - } else { - try { - org.apache.hadoop.hbase.client.MultiResponse multiResp = - ResponseConverter.getResults(req, resp, controller.cellScanner()); - Throwable ex = multiResp.getException(regionName); - if (ex != null) { - future - .completeExceptionally(ex instanceof IOException ? ex - : new IOException( - "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), - ex)); - } else { - future.complete(respConverter - .apply((Result) multiResp.getResults().get(regionName).result.get(0))); - } - } catch (IOException e) { - future.completeExceptionally(e); - } - } - } - }); - } catch (IOException e) { - future.completeExceptionally(e); - } - return future; + return wrap(rawTable.checkAndDelete(row, family, qualifier, compareOp, value, delete)); } @Override public CompletableFuture mutateRow(RowMutations mutation) { - return this. newCaller(mutation, writeRpcTimeoutNs).action((controller, loc, - stub) -> AsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> { - RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); - regionMutationBuilder.setAtomic(true); - return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - }, (resp) -> { - return null; - })).call(); + return wrap(rawTable.mutateRow(mutation)); } @Override public CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, RowMutations mutation) { - return this. newCaller(mutation, writeRpcTimeoutNs) - .action((controller, loc, stub) -> AsyncTableImpl. mutateRow(controller, loc, stub, - mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm), - (resp) -> resp.getExists())) - .call(); - } - - private CompletableFuture failedFuture(Throwable error) { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(error); - return future; - } - - private Scan setDefaultScanConfig(Scan scan) { - // always create a new scan object as we may reset the start row later. - Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); - if (newScan.getCaching() <= 0) { - newScan.setCaching(defaultScannerCaching); - } - if (newScan.getMaxResultSize() <= 0) { - newScan.setMaxResultSize(defaultScannerMaxResultSize); - } - return newScan; + return wrap(rawTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation)); } @Override public CompletableFuture> smallScan(Scan scan, int limit) { - if (!scan.isSmall()) { - return failedFuture(new IllegalArgumentException("Only small scan is allowed")); - } - if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { - return failedFuture( - new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); - } - return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan)) - .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) - .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + return wrap(rawTable.smallScan(scan, limit)); } - public void scan(Scan scan, ScanResultConsumer consumer) { - if (scan.isSmall()) { - if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { - consumer.onError( - new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); - } else { - LOG.warn("This is small scan " + scan + ", consider using smallScan directly?"); - } - } - scan = setDefaultScanConfig(scan); - new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs) - .start(); + private long resultSize2CacheSize(long maxResultSize) { + // * 2 if possible + return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; } @Override - public void setReadRpcTimeout(long timeout, TimeUnit unit) { - this.readRpcTimeoutNs = unit.toNanos(timeout); - } - - @Override - public long getReadRpcTimeout(TimeUnit unit) { - return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); - } - - @Override - public void setWriteRpcTimeout(long timeout, TimeUnit unit) { - this.writeRpcTimeoutNs = unit.toNanos(timeout); - } - - @Override - public long getWriteRpcTimeout(TimeUnit unit) { - return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); - } - - @Override - public void setOperationTimeout(long timeout, TimeUnit unit) { - this.operationTimeoutNs = unit.toNanos(timeout); - } - - @Override - public long getOperationTimeout(TimeUnit unit) { - return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); - } - - @Override - public void setScanTimeout(long timeout, TimeUnit unit) { - this.scanTimeoutNs = unit.toNanos(timeout); - } - - @Override - public long getScanTimeout(TimeUnit unit) { - return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); + public ResultScanner getScanner(Scan scan) { + return new AsyncResultScanner(rawTable, ReflectionUtils.newInstance(scan.getClass(), scan), + resultSize2CacheSize( + scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index f0903db..ec33dd2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -17,11 +17,7 @@ */ package org.apache.hadoop.hbase.client; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.util.Threads; +import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import java.io.IOException; import java.util.Queue; @@ -31,6 +27,12 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.Threads; + /** * ClientAsyncPrefetchScanner implements async scanner behaviour. * Specifically, the cache used by this scanner is a concurrent queue which allows both diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index b7bdb83..20ed183 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; @@ -36,7 +37,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -554,15 +554,6 @@ public abstract class ClientScanner extends AbstractClientScanner { return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults; } - protected long calcEstimatedSize(Result rs) { - long estimatedHeapSizeOfResult = 0; - // We don't make Iterator here - for (Cell cell : rs.rawCells()) { - estimatedHeapSizeOfResult += CellUtil.estimatedHeapSizeOf(cell); - } - return estimatedHeapSizeOfResult; - } - protected void addEstimatedSize(long estimatedHeapSizeOfResult) { return; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index d464c3b..9df9fbb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -35,6 +35,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -290,4 +293,35 @@ public final class ConnectionUtils { } return t; } + + static long calcEstimatedSize(Result rs) { + long estimatedHeapSizeOfResult = 0; + // We don't make Iterator here + for (Cell cell : rs.rawCells()) { + estimatedHeapSizeOfResult += CellUtil.estimatedHeapSizeOf(cell); + } + return estimatedHeapSizeOfResult; + } + + static Result filterCells(Result result, Cell keepCellsAfter) { + // not the same row + if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) { + return result; + } + Cell[] rawCells = result.rawCells(); + int index = Arrays.binarySearch(rawCells, keepCellsAfter, CellComparator::compareWithoutRow); + if (index < 0) { + index = -index - 1; + } else { + index++; + } + if (index == 0) { + return result; + } + if (index == rawCells.length) { + return null; + } + return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null, + result.isStale(), true); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java new file mode 100644 index 0000000..adc12ee --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A low level asynchronous table. + *

+ * The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback + * thread, so typically you should not do any time consuming work inside these methods, otherwise + * you will be likely to block at least one connection to RS(even more if the rpc framework uses + * NIO). + *

+ * So, only experts that want to build high performance service should use this interface directly, + * especially for the {@link #scan(Scan, ScanResultConsumer)} below. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface RawAsyncTable extends AsyncTableBase { + + /** + * The basic scan API uses the observer pattern. All results that match the given scan object will + * be passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result[])}. + * {@link ScanResultConsumer#onComplete()} means the scan is finished, and + * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan + * is terminated. {@link ScanResultConsumer#onHeartbeat()} means the RS is still working but we + * can not get a valid result to call {@link ScanResultConsumer#onNext(Result[])}. This is usually + * because the matched results are too sparse, for example, a filter which almost filters out + * everything is specified. + *

+ * Notice that, the methods of the given {@code consumer} will be called directly in the rpc + * framework's callback thread, so typically you should not do any time consuming work inside + * these methods, otherwise you will be likely to block at least one connection to RS(even more if + * the rpc framework uses NIO). + * @param scan A configured {@link Scan} object. + * @param consumer the consumer used to receive results. + */ + void scan(Scan scan, ScanResultConsumer consumer); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java new file mode 100644 index 0000000..bf9a3f2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * The implementation of RawAsyncTable. + */ +@InterfaceAudience.Private +class RawAsyncTableImpl implements RawAsyncTable { + + private static final Log LOG = LogFactory.getLog(RawAsyncTableImpl.class); + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final int defaultScannerCaching; + + private final long defaultScannerMaxResultSize; + + private long readRpcTimeoutNs; + + private long writeRpcTimeoutNs; + + private long operationTimeoutNs; + + private long scanTimeoutNs; + + public RawAsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { + this.conn = conn; + this.tableName = tableName; + this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs(); + this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs(); + this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs() + : conn.connConf.getOperationTimeoutNs(); + this.defaultScannerCaching = conn.connConf.getScannerCaching(); + this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); + this.scanTimeoutNs = conn.connConf.getScanTimeoutNs(); + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public Configuration getConfiguration() { + return conn.getConfiguration(); + } + + @FunctionalInterface + private interface Converter { + D convert(I info, S src) throws IOException; + } + + @FunctionalInterface + private interface RpcCall { + void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, + RpcCallback done); + } + + private static CompletableFuture call( + HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert, RpcCall rpcCall, + Converter respConverter) { + CompletableFuture future = new CompletableFuture<>(); + try { + rpcCall.call(stub, controller, reqConvert.convert(loc.getRegionInfo().getRegionName(), req), + new RpcCallback() { + + @Override + public void run(PRESP resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(respConverter.convert(controller, resp)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + private static CompletableFuture mutate(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert, + Converter respConverter) { + return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), + respConverter); + } + + private static CompletableFuture voidMutate(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + Converter reqConvert) { + return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { + return null; + }); + } + + private static Result toResult(HBaseRpcController controller, MutateResponse resp) + throws IOException { + if (!resp.hasResult()) { + return null; + } + return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); + } + + @FunctionalInterface + private interface NoncedConverter { + D convert(I info, S src, long nonceGroup, long nonce) throws IOException; + } + + private CompletableFuture noncedMutate(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, REQ req, + NoncedConverter reqConvert, + Converter respConverter) { + long nonceGroup = conn.getNonceGenerator().getNonceGroup(); + long nonce = conn.getNonceGenerator().newNonce(); + return mutate(controller, loc, stub, req, + (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); + } + + private SingleRequestCallerBuilder newCaller(byte[] row, long rpcTimeoutNs) { + return conn.callerFactory. single().table(tableName).row(row) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS); + } + + private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { + return newCaller(row.getRow(), rpcTimeoutNs); + } + + @Override + public CompletableFuture get(Get get) { + return this. newCaller(get, readRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl + . call(controller, loc, stub, get, + RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), + (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) + .call(); + } + + @Override + public CompletableFuture put(Put put) { + return this. newCaller(put, writeRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, + put, RequestConverter::buildMutateRequest)) + .call(); + } + + @Override + public CompletableFuture delete(Delete delete) { + return this. newCaller(delete, writeRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, + stub, delete, RequestConverter::buildMutateRequest)) + .call(); + } + + @Override + public CompletableFuture append(Append append) { + checkHasFamilies(append); + return this. newCaller(append, writeRpcTimeoutNs) + .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, + append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .call(); + } + + @Override + public CompletableFuture increment(Increment increment) { + checkHasFamilies(increment); + return this. newCaller(increment, writeRpcTimeoutNs) + .action((controller, loc, stub) -> this. noncedMutate(controller, loc, + stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .call(); + } + + @Override + public CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) { + return this. newCaller(row, writeRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, + stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(compareOp.name()), p), + (c, r) -> r.getProcessed())) + .call(); + } + + @Override + public CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) { + return this. newCaller(row, writeRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, + loc, stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(compareOp.name()), d), + (c, r) -> r.getProcessed())) + .call(); + } + + // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, + // so here I write a new method as I do not want to change the abstraction of call method. + private static CompletableFuture mutateRow(HBaseRpcController controller, + HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, + Converter reqConvert, + Function respConverter) { + CompletableFuture future = new CompletableFuture<>(); + try { + byte[] regionName = loc.getRegionInfo().getRegionName(); + MultiRequest req = reqConvert.convert(regionName, mutation); + stub.multi(controller, req, new RpcCallback() { + + @Override + public void run(MultiResponse resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + org.apache.hadoop.hbase.client.MultiResponse multiResp = + ResponseConverter.getResults(req, resp, controller.cellScanner()); + Throwable ex = multiResp.getException(regionName); + if (ex != null) { + future + .completeExceptionally(ex instanceof IOException ? ex + : new IOException( + "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), + ex)); + } else { + future.complete(respConverter + .apply((Result) multiResp.getResults().get(regionName).result.get(0))); + } + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + } + + @Override + public CompletableFuture mutateRow(RowMutations mutation) { + return this. newCaller(mutation, writeRpcTimeoutNs).action((controller, loc, + stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> { + RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); + regionMutationBuilder.setAtomic(true); + return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + }, (resp) -> { + return null; + })).call(); + } + + @Override + public CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) { + return this. newCaller(mutation, writeRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, + stub, mutation, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm), + (resp) -> resp.getExists())) + .call(); + } + + private CompletableFuture failedFuture(Throwable error) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(error); + return future; + } + + private Scan setDefaultScanConfig(Scan scan) { + // always create a new scan object as we may reset the start row later. + Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); + if (newScan.getCaching() <= 0) { + newScan.setCaching(defaultScannerCaching); + } + if (newScan.getMaxResultSize() <= 0) { + newScan.setMaxResultSize(defaultScannerMaxResultSize); + } + return newScan; + } + + @Override + public CompletableFuture> smallScan(Scan scan, int limit) { + if (!scan.isSmall()) { + return failedFuture(new IllegalArgumentException("Only small scan is allowed")); + } + if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { + return failedFuture( + new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); + } + return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan)) + .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) + .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + } + + public void scan(Scan scan, ScanResultConsumer consumer) { + if (scan.isSmall()) { + if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { + consumer.onError( + new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); + } else { + LOG.warn("This is small scan " + scan + ", consider using smallScan directly?"); + } + } + scan = setDefaultScanConfig(scan); + new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs) + .start(); + } + + @Override + public void setReadRpcTimeout(long timeout, TimeUnit unit) { + this.readRpcTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getReadRpcTimeout(TimeUnit unit) { + return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); + } + + @Override + public void setWriteRpcTimeout(long timeout, TimeUnit unit) { + this.writeRpcTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getWriteRpcTimeout(TimeUnit unit) { + return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); + } + + @Override + public void setOperationTimeout(long timeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getOperationTimeout(TimeUnit unit) { + return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); + } + + @Override + public void setScanTimeout(long timeout, TimeUnit unit) { + this.scanTimeoutNs = unit.toNanos(timeout); + } + + @Override + public long getScanTimeout(TimeUnit unit) { + return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java index d3efbda..e9cb476 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java @@ -20,32 +20,90 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; /** - * Interface for client-side scanning. - * Go to {@link Table} to obtain instances. + * Interface for client-side scanning. Go to {@link Table} to obtain instances. */ @InterfaceAudience.Public @InterfaceStability.Stable public interface ResultScanner extends Closeable, Iterable { + @Override + default Iterator iterator() { + return new Iterator() { + // The next RowResult, possibly pre-read + Result next = null; + + // return true if there is another item pending, false if there isn't. + // this method is where the actual advancing takes place, but you need + // to call next() to consume it. hasNext() will only advance if there + // isn't a pending next(). + @Override + public boolean hasNext() { + if (next != null) { + return true; + } + try { + return (next = ResultScanner.this.next()) != null; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + // get the pending next item and advance the iterator. returns null if + // there is no next item. + @Override + public Result next() { + // since hasNext() does the real advancing, we call this to determine + // if there is a next before proceeding. + if (!hasNext()) { + return null; + } + + // if we get to here, then hasNext() has given us an item to return. + // we want to return the item and then null out the next pointer, so + // we use a temporary variable. + Result temp = next; + next = null; + return temp; + } + }; + } + /** * Grab the next row's worth of values. The scanner will return a Result. - * @return Result object if there is another row, null if the scanner is - * exhausted. + * @return Result object if there is another row, null if the scanner is exhausted. * @throws IOException e */ Result next() throws IOException; /** + * Get nbRows rows. How many RPCs are made is determined by the {@link Scan#setCaching(int)} + * setting (or hbase.client.scanner.caching in hbase-site.xml). * @param nbRows number of rows to return - * @return Between zero and nbRows results - * @throws IOException e + * @return Between zero and nbRows rowResults. Scan is done if returned array is of zero-length + * (We never return null). + * @throws IOException */ - Result [] next(int nbRows) throws IOException; + default Result[] next(int nbRows) throws IOException { + List resultSets = new ArrayList<>(nbRows); + for (int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[0]); + } /** * Closes the scanner and releases any resources it has allocated diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index a0792ef..220be10 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -57,11 +57,12 @@ public abstract class AbstractTestAsyncTableScan { TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); TEST_UTIL.waitTableAvailable(TABLE_NAME); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); List> futures = new ArrayList<>(); - IntStream.range(0, COUNT).forEach( - i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) - .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))))); + IntStream.range(0, COUNT) + .forEach(i -> futures.add(table.put( + new Put(Bytes.toBytes(String.format("%03d", i))).addColumn(FAMILY, CQ1, Bytes.toBytes(i)) + .addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))))); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); } @@ -73,11 +74,11 @@ public abstract class AbstractTestAsyncTableScan { protected abstract Scan createScan(); - protected abstract List doScan(AsyncTable table, Scan scan) throws Exception; + protected abstract List doScan(Scan scan) throws Exception; @Test public void testScanAll() throws Exception { - List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan()); + List results = doScan(createScan()); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> { Result result = results.get(i); @@ -94,7 +95,7 @@ public abstract class AbstractTestAsyncTableScan { @Test public void testReversedScanAll() throws Exception { - List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), createScan().setReversed(true)); + List results = doScan(createScan().setReversed(true)); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); } @@ -102,8 +103,8 @@ public abstract class AbstractTestAsyncTableScan { @Test public void testScanNoStopKey() throws Exception { int start = 345; - List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), - createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))); + List results = + doScan(createScan().setStartRow(Bytes.toBytes(String.format("%03d", start)))); assertEquals(COUNT - start, results.size()); IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); } @@ -111,24 +112,24 @@ public abstract class AbstractTestAsyncTableScan { @Test public void testReverseScanNoStopKey() throws Exception { int start = 765; - List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), + List results = doScan( createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true)); assertEquals(start + 1, results.size()); IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); } private void testScan(int start, int stop) throws Exception { - List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), - createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop)))); + List results = + doScan(createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))) + .setStopRow(Bytes.toBytes(String.format("%03d", stop)))); assertEquals(stop - start, results.size()); IntStream.range(0, stop - start).forEach(i -> assertResultEquals(results.get(i), start + i)); } private void testReversedScan(int start, int stop) throws Exception { - List results = doScan(ASYNC_CONN.getTable(TABLE_NAME), - createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setReversed(true)); + List results = + doScan(createScan().setStartRow(Bytes.toBytes(String.format("%03d", start))) + .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setReversed(true)); assertEquals(start - stop, results.size()); IntStream.range(0, start - stop).forEach(i -> assertResultEquals(results.get(i), start - i)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java index b20e616..fe988aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncGetMultiThread.java @@ -87,7 +87,7 @@ public class TestAsyncGetMultiThread { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); - AsyncTable table = CONN.getTable(TABLE_NAME); + RawAsyncTable table = CONN.getRawTable(TABLE_NAME); List> futures = new ArrayList<>(); IntStream.range(0, COUNT) .forEach(i -> futures.add(table.put(new Put(Bytes.toBytes(String.format("%03d", i))) @@ -105,8 +105,9 @@ public class TestAsyncGetMultiThread { while (!stop.get()) { int i = ThreadLocalRandom.current().nextInt(COUNT); assertEquals(i, - Bytes.toInt(CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))) - .get().getValue(FAMILY, QUALIFIER))); + Bytes.toInt( + CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() + .getValue(FAMILY, QUALIFIER))); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index 67d2661..0b3e186 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -99,7 +99,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName()); TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes( TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName())); - AsyncTable table = asyncConn.getTable(TABLE_NAME); + RawAsyncTable table = asyncConn.getRawTable(TABLE_NAME); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); // move back @@ -185,7 +185,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { return mockedLocator; } }) { - AsyncTable table = new AsyncTableImpl(mockedConn, TABLE_NAME); + RawAsyncTable table = new RawAsyncTableImpl(mockedConn, TABLE_NAME); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); assertTrue(errorTriggered.get()); errorTriggered.set(false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 8ba3414..7a85727 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -26,12 +26,15 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.IntStream; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -47,7 +50,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) @Category({ MediumTests.class, ClientTests.class }) public class TestAsyncTable { @@ -68,6 +76,23 @@ public class TestAsyncTable { private byte[] row; + @Parameter + public Supplier getTable; + + private static RawAsyncTable getRawTable() { + return ASYNC_CONN.getRawTable(TABLE_NAME); + } + + private static AsyncTable getTable() { + return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + } + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestAsyncTable::getRawTable }, + new Supplier[] { TestAsyncTable::getTable }); + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(1); @@ -89,7 +114,7 @@ public class TestAsyncTable { @Test public void testSimple() throws Exception { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); @@ -106,7 +131,7 @@ public class TestAsyncTable { @Test public void testSimpleMultiple() throws Exception { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); int count = 100; CountDownLatch putLatch = new CountDownLatch(count); IntStream.range(0, count).forEach( @@ -150,7 +175,7 @@ public class TestAsyncTable { @Test public void testIncrement() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); int count = 100; CountDownLatch latch = new CountDownLatch(count); AtomicLong sum = new AtomicLong(0L); @@ -167,7 +192,7 @@ public class TestAsyncTable { @Test public void testAppend() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); int count = 10; CountDownLatch latch = new CountDownLatch(count); char suffix = ':'; @@ -190,7 +215,7 @@ public class TestAsyncTable { @Test public void testCheckAndPut() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); AtomicInteger successCount = new AtomicInteger(0); AtomicInteger successIndex = new AtomicInteger(-1); int count = 10; @@ -211,7 +236,7 @@ public class TestAsyncTable { @Test public void testCheckAndDelete() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); int count = 10; CountDownLatch putLatch = new CountDownLatch(count + 1); table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); @@ -223,17 +248,16 @@ public class TestAsyncTable { AtomicInteger successCount = new AtomicInteger(0); AtomicInteger successIndex = new AtomicInteger(-1); CountDownLatch deleteLatch = new CountDownLatch(count); - IntStream.range(0, count) - .forEach(i -> table - .checkAndDelete(row, FAMILY, QUALIFIER, VALUE, - new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) - .thenAccept(x -> { - if (x) { - successCount.incrementAndGet(); - successIndex.set(i); - } - deleteLatch.countDown(); - })); + IntStream.range(0, count).forEach(i -> table + .checkAndDelete(row, FAMILY, QUALIFIER, VALUE, + new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) + .thenAccept(x -> { + if (x) { + successCount.incrementAndGet(); + successIndex.set(i); + } + deleteLatch.countDown(); + })); deleteLatch.await(); assertEquals(1, successCount.get()); Result result = table.get(new Get(row)).get(); @@ -248,7 +272,7 @@ public class TestAsyncTable { @Test public void testMutateRow() throws InterruptedException, ExecutionException, IOException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); RowMutations mutation = new RowMutations(row); mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); table.mutateRow(mutation).get(); @@ -266,7 +290,7 @@ public class TestAsyncTable { @Test public void testCheckAndMutate() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); int count = 10; CountDownLatch putLatch = new CountDownLatch(count + 1); table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index 840f844..c8e1c7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -100,7 +100,7 @@ public class TestAsyncTableNoncedRetry { @Test public void testAppend() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); Result result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get(); assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); result = table.append(new Append(row).add(FAMILY, QUALIFIER, VALUE)).get(); @@ -112,7 +112,7 @@ public class TestAsyncTableNoncedRetry { @Test public void testIncrement() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); // the second call should have no effect as we always generate the same nonce. assertEquals(1L, table.incrementColumnValue(row, FAMILY, QUALIFIER, 1L).get().longValue()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index d21560f..65fb1ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -124,11 +124,11 @@ public class TestAsyncTableScan extends AbstractTestAsyncTableScan { } @Override - protected List doScan(AsyncTable table, Scan scan) throws Exception { - SimpleScanResultConsumer scanObserver = new SimpleScanResultConsumer(); - table.scan(scan, scanObserver); + protected List doScan(Scan scan) throws Exception { + SimpleScanResultConsumer scanConsumer = new SimpleScanResultConsumer(); + ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer); List results = new ArrayList<>(); - for (Result result; (result = scanObserver.take()) != null;) { + for (Result result; (result = scanConsumer.take()) != null;) { results.add(result); } if (scan.getBatch() > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java new file mode 100644 index 0000000..b97005a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { + + @Parameter + public Supplier scanCreater; + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestAsyncTableScanner::createNormalScan }, + new Supplier[] { TestAsyncTableScanner::createBatchScan }); + } + + private static Scan createNormalScan() { + return new Scan(); + } + + private static Scan createBatchScan() { + return new Scan().setBatch(1); + } + + @Override + protected Scan createScan() { + return scanCreater.get(); + } + + private Result convertToPartial(Result result) { + return Result.create(result.rawCells(), result.getExists(), result.isStale(), true); + } + + @Override + protected List doScan(Scan scan) throws Exception { + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + List results = new ArrayList<>(); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result result; (result = scanner.next()) != null;) { + results.add(result); + } + } + if (scan.getBatch() > 0) { + assertTrue(results.size() % 2 == 0); + return IntStream.range(0, results.size() / 2).mapToObj(i -> { + try { + return Result.createCompleteResult(Arrays.asList(convertToPartial(results.get(2 * i)), + convertToPartial(results.get(2 * i + 1)))); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toList()); + } + return results; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java index e920013..3737af2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableSmallScan.java @@ -19,8 +19,11 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; import java.util.stream.IntStream; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -28,19 +31,44 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) @Category({ MediumTests.class, ClientTests.class }) public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan { + @Parameter + public Supplier getTable; + + private static RawAsyncTable getRawTable() { + return ASYNC_CONN.getRawTable(TABLE_NAME); + } + + private static AsyncTable getTable() { + return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + } + + @Parameters + public static List params() { + return Arrays.asList(new Supplier[] { TestAsyncTableSmallScan::getRawTable }, + new Supplier[] { TestAsyncTableSmallScan::getTable }); + } + @Test public void testScanWithLimit() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); int start = 111; int stop = 888; int limit = 300; - List results = table.smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) - .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true), - limit).get(); + List results = + table + .smallScan(new Scan(Bytes.toBytes(String.format("%03d", start))) + .setStopRow(Bytes.toBytes(String.format("%03d", stop))).setSmall(true), + limit) + .get(); assertEquals(limit, results.size()); IntStream.range(0, limit).forEach(i -> { Result result = results.get(i); @@ -52,7 +80,7 @@ public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan { @Test public void testReversedScanWithLimit() throws InterruptedException, ExecutionException { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + AsyncTableBase table = getTable.get(); int start = 888; int stop = 111; int limit = 300; @@ -75,7 +103,7 @@ public class TestAsyncTableSmallScan extends AbstractTestAsyncTableScan { } @Override - protected List doScan(AsyncTable table, Scan scan) throws Exception { - return table.smallScan(scan).get(); + protected List doScan(Scan scan) throws Exception { + return getTable.get().smallScan(scan).get(); } } -- 1.9.1