From 84e7a2c3362852dd05e119220bc64cba225b8cbe Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 8 Jan 2017 21:08:21 +0800 Subject: [PATCH] HBASE-17372 Make AsyncTable thread safe --- .../hbase/client/AsyncBatchRpcRetryingCaller.java | 54 ++-------- .../hadoop/hbase/client/AsyncClientScanner.java | 21 +++- .../hadoop/hbase/client/AsyncConnection.java | 39 +++++-- .../hbase/client/AsyncConnectionConfiguration.java | 18 +++- .../hadoop/hbase/client/AsyncConnectionImpl.java | 27 +++-- .../client/AsyncRpcRetryingCallerFactory.java | 105 ++++++++++++++----- .../AsyncScanSingleRegionRpcRetryingCaller.java | 4 +- .../AsyncSingleRequestRpcRetryingCaller.java | 5 +- .../client/AsyncSmallScanRpcRetryingCaller.java | 15 ++- .../apache/hadoop/hbase/client/AsyncTableBase.java | 106 ++++++------------- .../hadoop/hbase/client/AsyncTableBuilder.java | 113 +++++++++++++++++++++ .../hadoop/hbase/client/AsyncTableBuilderBase.java | 110 ++++++++++++++++++++ .../apache/hadoop/hbase/client/AsyncTableImpl.java | 42 ++++---- .../hadoop/hbase/client/ConnectionUtils.java | 15 +-- .../hadoop/hbase/client/RawAsyncTableImpl.java | 113 ++++++++++++--------- .../java/org/apache/hadoop/hbase/HConstants.java | 3 - .../TestAsyncSingleRequestRpcRetryingCaller.java | 2 +- .../client/TestAsyncTableGetMultiThreaded.java | 28 +++-- .../hadoop/hbase/client/TestRawAsyncTableScan.java | 6 +- 19 files changed, 551 insertions(+), 275 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 6f0b8e9..9b362d1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.CellUtil.createCellScanner; import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; -import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; @@ -40,7 +39,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -61,7 +59,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.util.AtomicUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -102,9 +99,7 @@ class AsyncBatchRpcRetryingCaller { private final long operationTimeoutNs; - private final long readRpcTimeoutNs; - - private final long writeRpcTimeoutNs; + private final long rpcTimeoutNs; private final int startLogErrorsCnt; @@ -128,39 +123,22 @@ class AsyncBatchRpcRetryingCaller { public final ConcurrentMap actionsByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); - public final AtomicLong rpcTimeoutNs; - - public ServerRequest(long defaultRpcTimeoutNs) { - this.rpcTimeoutNs = new AtomicLong(defaultRpcTimeoutNs); - } - - public void addAction(HRegionLocation loc, Action action, long rpcTimeoutNs) { + public void addAction(HRegionLocation loc, Action action) { computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(), () -> new RegionRequest(loc)).actions.add(action); - // try update the timeout to a larger value - if (this.rpcTimeoutNs.get() <= 0) { - return; - } - if (rpcTimeoutNs <= 0) { - this.rpcTimeoutNs.set(-1L); - return; - } - AtomicUtils.updateMax(this.rpcTimeoutNs, rpcTimeoutNs); } } public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - TableName tableName, List actions, long pauseNs, int maxRetries, - long operationTimeoutNs, long readRpcTimeoutNs, long writeRpcTimeoutNs, - int startLogErrorsCnt) { + TableName tableName, List actions, long pauseNs, int maxAttempts, + long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.pauseNs = pauseNs; - this.maxAttempts = retries2Attempts(maxRetries); + this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; - this.readRpcTimeoutNs = readRpcTimeoutNs; - this.writeRpcTimeoutNs = writeRpcTimeoutNs; + this.rpcTimeoutNs = rpcTimeoutNs; this.startLogErrorsCnt = startLogErrorsCnt; this.actions = new ArrayList<>(actions.size()); @@ -366,7 +344,7 @@ class AsyncBatchRpcRetryingCaller { return; } HBaseRpcController controller = conn.rpcControllerFactory.newController(); - resetController(controller, Math.min(serverReq.rpcTimeoutNs.get(), remainingNs)); + resetController(controller, Math.min(rpcTimeoutNs, remainingNs)); if (!cells.isEmpty()) { controller.setCellScanner(createCellScanner(cells)); } @@ -416,10 +394,6 @@ class AsyncBatchRpcRetryingCaller { retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); } - private long getRpcTimeoutNs(Action action) { - return action.getAction() instanceof Get ? readRpcTimeoutNs : writeRpcTimeoutNs; - } - private void groupAndSend(Stream actions, int tries) { long locateTimeoutNs; if (operationTimeoutNs > 0) { @@ -433,15 +407,6 @@ class AsyncBatchRpcRetryingCaller { } ConcurrentMap actionsByServer = new ConcurrentHashMap<>(); ConcurrentLinkedQueue locateFailed = new ConcurrentLinkedQueue<>(); - // use the small one as the default timeout value, and increase the timeout value if we have an - // action in the group needs a larger timeout value. - long defaultRpcTimeoutNs; - if (readRpcTimeoutNs > 0) { - defaultRpcTimeoutNs = - writeRpcTimeoutNs > 0 ? Math.min(readRpcTimeoutNs, writeRpcTimeoutNs) : readRpcTimeoutNs; - } else { - defaultRpcTimeoutNs = writeRpcTimeoutNs > 0 ? writeRpcTimeoutNs : -1L; - } CompletableFuture.allOf(actions .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { @@ -454,9 +419,8 @@ class AsyncBatchRpcRetryingCaller { addError(action, error, null); locateFailed.add(action); } else { - computeIfAbsent(actionsByServer, loc.getServerName(), - () -> new ServerRequest(defaultRpcTimeoutNs)).addAction(loc, action, - getRpcTimeoutNs(action)); + computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new) + .addAction(loc, action); } })) .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index d7a3ed1..f656a6c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -55,14 +55,21 @@ class AsyncClientScanner { private final AsyncConnectionImpl conn; + private final long pauseNs; + + private final int maxAttempts; + private final long scanTimeoutNs; private final long rpcTimeoutNs; + private final int startLogErrorsCnt; + private final ScanResultCache resultCache; public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName, - AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) { + AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); } @@ -73,8 +80,11 @@ class AsyncClientScanner { this.consumer = consumer; this.tableName = tableName; this.conn = conn; + this.pauseNs = pauseNs; + this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; + this.startLogErrorsCnt = startLogErrorsCnt; this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0 ? new AllowPartialScanResultCache() : new CompleteScanResultCache(); } @@ -117,7 +127,9 @@ class AsyncClientScanner { conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub) .setScan(scan).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start().whenComplete((hasMore, error) -> { + .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start() + .whenComplete((hasMore, error) -> { if (error != null) { consumer.onError(error); return; @@ -133,8 +145,9 @@ class AsyncClientScanner { private void openScanner() { conn.callerFactory. single().table(tableName).row(scan.getStartRow()) .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call() - .whenComplete((resp, error) -> { + .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner) + .call().whenComplete((resp, error) -> { if (error != null) { consumer.onError(error); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java index 7b0f339..3a1d6df 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -50,21 +50,32 @@ public interface AsyncConnection extends Closeable { AsyncTableRegionLocator getRegionLocator(TableName tableName); /** - * Retrieve an RawAsyncTable implementation for accessing a table. The returned Table is not - * thread safe, a new instance should be created for each using thread. This is a lightweight - * operation, pooling or caching of the returned AsyncTable is neither required nor desired. + * Retrieve an {@link RawAsyncTable} implementation for accessing a table. + *

+ * The returned instance will use default configs. Use {@link #createRawTable(TableName)} if you + * want to customize some configs. *

* This method no longer checks table existence. An exception will be thrown if the table does not * exist only when the first operation is attempted. * @param tableName the name of the table * @return an RawAsyncTable to use for interactions with this table + * @see #createRawTable(TableName) + */ + default RawAsyncTable getRawTable(TableName tableName) { + return createRawTable(tableName).build(); + } + + /** + * Returns an {@link AsyncTableBuilder} for creating {@link RawAsyncTable}. + *

+ * This method no longer checks table existence. An exception will be thrown if the table does not + * exist only when the first operation is attempted. + * @param tableName the name of the table */ - RawAsyncTable getRawTable(TableName tableName); + AsyncTableBuilder createRawTable(TableName tableName); /** - * Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread - * safe, a new instance should be created for each using thread. This is a lightweight operation, - * pooling or caching of the returned AsyncTable is neither required nor desired. + * Retrieve an AsyncTable implementation for accessing a table. *

* This method no longer checks table existence. An exception will be thrown if the table does not * exist only when the first operation is attempted. @@ -72,5 +83,17 @@ public interface AsyncConnection extends Closeable { * @param pool the thread pool to use for executing callback * @return an AsyncTable to use for interactions with this table */ - AsyncTable getTable(TableName tableName, ExecutorService pool); + default AsyncTable getTable(TableName tableName, ExecutorService pool) { + return createTable(tableName, pool).build(); + } + + /** + * Returns an {@link AsyncTableBuilder} for creating {@link AsyncTable}. + *

+ * This method no longer checks table existence. An exception will be thrown if the table does not + * exist only when the first operation is attempted. + * @param tableName the name of the table + * @param pool the thread pool to use for executing callback + */ + AsyncTableBuilder createTable(TableName tableName, ExecutorService pool); } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 6279d46..0845785 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -56,6 +56,10 @@ class AsyncConnectionConfiguration { // by this value, see scanTimeoutNs. private final long operationTimeoutNs; + // timeout for each rpc request. Can be overrode by a more specific config, such as readRpcTimeout + // or writeRpcTimeout. + private final long rpcTimeoutNs; + // timeout for each read rpc request private final long readRpcTimeoutNs; @@ -85,10 +89,12 @@ class AsyncConnectionConfiguration { conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); - this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, - conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); - this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, - conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); + this.rpcTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)); + this.readRpcTimeoutNs = + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs)); + this.writeRpcTimeoutNs = + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs)); this.pauseNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); @@ -111,6 +117,10 @@ class AsyncConnectionConfiguration { return operationTimeoutNs; } + long getRpcTimeoutNs() { + return rpcTimeoutNs; + } + long getReadRpcTimeoutNs() { return readRpcTimeoutNs; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index d660b02..692671b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; @@ -90,7 +88,6 @@ class AsyncConnectionImpl implements AsyncConnection { private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); - @SuppressWarnings("deprecation") public AsyncConnectionImpl(Configuration conf, User user) { this.conf = conf; this.user = user; @@ -105,7 +102,8 @@ class AsyncConnectionImpl implements AsyncConnection { this.rpcClient = RpcClientFactory.createClient(conf, clusterId); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); - this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT); + this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE, + TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); this.locator = new AsyncRegionLocator(this, RETRY_TIMER); this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { @@ -152,12 +150,25 @@ class AsyncConnectionImpl implements AsyncConnection { } @Override - public RawAsyncTable getRawTable(TableName tableName) { - return new RawAsyncTableImpl(this, tableName); + public AsyncTableBuilder createRawTable(TableName tableName) { + return new AsyncTableBuilderBase(tableName, connConf) { + + @Override + public RawAsyncTable build() { + return new RawAsyncTableImpl(AsyncConnectionImpl.this, this); + } + }; } @Override - public AsyncTable getTable(TableName tableName, ExecutorService pool) { - return new AsyncTableImpl(this, tableName, pool); + public AsyncTableBuilder createTable(TableName tableName, ExecutorService pool) { + return new AsyncTableBuilderBase(tableName, connConf) { + + @Override + public AsyncTable build() { + RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this); + return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool); + } + }; } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 55c56ab..694f346 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -46,7 +46,16 @@ class AsyncRpcRetryingCallerFactory { this.retryTimer = retryTimer; } - public class SingleRequestCallerBuilder { + private static abstract class BuilderBase { + + protected long pauseNs; + + protected int maxAttempts; + + protected int startLogErrorsCnt; + } + + public class SingleRequestCallerBuilder extends BuilderBase { private TableName tableName; @@ -70,8 +79,8 @@ class AsyncRpcRetryingCallerFactory { return this; } - public SingleRequestCallerBuilder action( - AsyncSingleRequestRpcRetryingCaller.Callable callable) { + public SingleRequestCallerBuilder + action(AsyncSingleRequestRpcRetryingCaller.Callable callable) { this.callable = callable; return this; } @@ -91,12 +100,26 @@ class AsyncRpcRetryingCallerFactory { return this; } + public SingleRequestCallerBuilder pause(long pause, TimeUnit unit) { + this.pauseNs = unit.toNanos(pause); + return this; + } + + public SingleRequestCallerBuilder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public SingleRequestCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { + this.startLogErrorsCnt = startLogErrorsCnt; + return this; + } + public AsyncSingleRequestRpcRetryingCaller build() { return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), - conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, - rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); + pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -114,7 +137,7 @@ class AsyncRpcRetryingCallerFactory { return new SingleRequestCallerBuilder<>(); } - public class SmallScanCallerBuilder { + public class SmallScanCallerBuilder extends BuilderBase { private TableName tableName; @@ -151,12 +174,27 @@ class AsyncRpcRetryingCallerFactory { return this; } + public SmallScanCallerBuilder pause(long pause, TimeUnit unit) { + this.pauseNs = unit.toNanos(pause); + return this; + } + + public SmallScanCallerBuilder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public SmallScanCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { + this.startLogErrorsCnt = startLogErrorsCnt; + return this; + } + public AsyncSmallScanRpcRetryingCaller build() { TableName tableName = checkNotNull(this.tableName, "tableName is null"); Scan scan = checkNotNull(this.scan, "scan is null"); checkArgument(limit > 0, "invalid limit %d", limit); - return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs, - rpcTimeoutNs); + return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, pauseNs, maxAttempts, + scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -174,7 +212,7 @@ class AsyncRpcRetryingCallerFactory { return new SmallScanCallerBuilder(); } - public class ScanSingleRegionCallerBuilder { + public class ScanSingleRegionCallerBuilder extends BuilderBase { private long scannerId = -1L; @@ -232,15 +270,29 @@ class AsyncRpcRetryingCallerFactory { return this; } + public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { + this.pauseNs = unit.toNanos(pause); + return this; + } + + public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { + this.startLogErrorsCnt = startLogErrorsCnt; + return this; + } + public AsyncScanSingleRegionRpcRetryingCaller build() { checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId); return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, checkNotNull(scan, "scan is null"), scannerId, checkNotNull(resultCache, "resultCache is null"), checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), - checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(), - conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs, - conn.connConf.getStartLogErrorsCnt()); + checkNotNull(loc, "location is null"), pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } /** @@ -258,7 +310,7 @@ class AsyncRpcRetryingCallerFactory { return new ScanSingleRegionCallerBuilder(); } - public class BatchCallerBuilder { + public class BatchCallerBuilder extends BuilderBase { private TableName tableName; @@ -266,9 +318,7 @@ class AsyncRpcRetryingCallerFactory { private long operationTimeoutNs = -1L; - private long readRpcTimeoutNs = -1L; - - private long writeRpcTimeoutNs = -1L; + private long rpcTimeoutNs = -1L; public BatchCallerBuilder table(TableName tableName) { this.tableName = tableName; @@ -285,20 +335,29 @@ class AsyncRpcRetryingCallerFactory { return this; } - public BatchCallerBuilder readRpcTimeout(long rpcTimeout, TimeUnit unit) { - this.readRpcTimeoutNs = unit.toNanos(rpcTimeout); + public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + return this; + } + + public BatchCallerBuilder pause(long pause, TimeUnit unit) { + this.pauseNs = unit.toNanos(pause); + return this; + } + + public BatchCallerBuilder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; return this; } - public BatchCallerBuilder writeRpcTimeout(long rpcTimeout, TimeUnit unit) { - this.writeRpcTimeoutNs = unit.toNanos(rpcTimeout); + public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { + this.startLogErrorsCnt = startLogErrorsCnt; return this; } public AsyncBatchRpcRetryingCaller build() { - return new AsyncBatchRpcRetryingCaller(retryTimer, conn, tableName, actions, - conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, - readRpcTimeoutNs, writeRpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); + return new AsyncBatchRpcRetryingCaller(retryTimer, conn, tableName, actions, pauseNs, + maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } public List> call() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index dae88a7..5d3b736 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -108,7 +108,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache, RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs, - int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; this.scannerId = scannerId; @@ -117,7 +117,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { this.stub = stub; this.loc = loc; this.pauseNs = pauseNs; - this.maxAttempts = retries2Attempts(maxRetries); + this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; this.startLogErrorsCnt = startLogErrorsCnt; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 04e69af..4ce6a18 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; -import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import io.netty.util.HashedWheelTimer; @@ -90,7 +89,7 @@ class AsyncSingleRequestRpcRetryingCaller { public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, RegionLocateType locateType, Callable callable, - long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, + long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; @@ -99,7 +98,7 @@ class AsyncSingleRequestRpcRetryingCaller { this.locateType = locateType; this.callable = callable; this.pauseNs = pauseNs; - this.maxAttempts = retries2Attempts(maxRetries); + this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; this.startLogErrorsCnt = startLogErrorsCnt; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java index 6ffa30a..98a276f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java @@ -57,6 +57,12 @@ class AsyncSmallScanRpcRetryingCaller { private final long rpcTimeoutNs; + private final long pauseNs; + + private final int maxAttempts; + + private final int startLogErrosCnt; + private final Function nextScan; private final List resultList; @@ -64,13 +70,17 @@ class AsyncSmallScanRpcRetryingCaller { private final CompletableFuture> future; public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan, - int limit, long scanTimeoutNs, long rpcTimeoutNs) { + int limit, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, + int startLogErrosCnt) { this.conn = conn; this.tableName = tableName; this.scan = scan; this.limit = limit; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; + this.pauseNs = pauseNs; + this.maxAttempts = maxAttempts; + this.startLogErrosCnt = startLogErrosCnt; if (scan.isReversed()) { this.nextScan = this::reversedNextScan; } else { @@ -146,7 +156,8 @@ class AsyncSmallScanRpcRetryingCaller { private void scan() { conn.callerFactory. single().table(tableName).row(scan.getStartRow()) .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::scan).call() + .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrosCnt).action(this::scan).call() .whenComplete((resp, error) -> { if (error != null) { future.completeExceptionally(error); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index 19a22c0..5585ef8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -18,9 +18,8 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf; import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; -import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatch; -import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatchAll; import com.google.common.base.Preconditions; @@ -39,8 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes; * The base interface for asynchronous version of Table. Obtain an instance from a * {@link AsyncConnection}. *

- * The implementation is NOT required to be thread safe. Do NOT access it from multiple threads - * concurrently. + * The implementation should be thread safe. *

* Usually the implementations will not throw any exception directly, you need to get the exception * from the returned {@link CompletableFuture}. @@ -62,12 +60,9 @@ public interface AsyncTableBase { Configuration getConfiguration(); /** - * Set timeout of each rpc read request in operations of this Table instance, will override the - * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too - * long, it will stop waiting and send a new request to retry until retries exhausted or operation - * timeout reached. + * Get timeout of each rpc request in this Table instance. */ - void setReadRpcTimeout(long timeout, TimeUnit unit); + long getRpcTimeout(TimeUnit unit); /** * Get timeout of each rpc read request in this Table instance. @@ -75,46 +70,16 @@ public interface AsyncTableBase { long getReadRpcTimeout(TimeUnit unit); /** - * Set timeout of each rpc write request in operations of this Table instance, will override the - * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too - * long, it will stop waiting and send a new request to retry until retries exhausted or operation - * timeout reached. - */ - void setWriteRpcTimeout(long timeout, TimeUnit unit); - - /** * Get timeout of each rpc write request in this Table instance. */ long getWriteRpcTimeout(TimeUnit unit); /** - * Set timeout of each operation in this Table instance, will override the value of - * {@code hbase.client.operation.timeout} in configuration. - *

- * Operation timeout is a top-level restriction that makes sure an operation will not be blocked - * more than this. In each operation, if rpc request fails because of timeout or other reason, it - * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed - * reach the operation timeout before retries exhausted, it will break early and throw - * SocketTimeoutException. - */ - void setOperationTimeout(long timeout, TimeUnit unit); - - /** * Get timeout of each operation in Table instance. */ long getOperationTimeout(TimeUnit unit); /** - * Set timeout of a single operation in a scan, such as openScanner and next. Will override the - * value {@code hbase.client.scanner.timeout.period} in configuration. - *

- * Generally a scan will never timeout after we add heartbeat support unless the region is - * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single - * operation in a scan. - */ - void setScanTimeout(long timeout, TimeUnit unit); - - /** * Get the timeout of a single operation in a scan. */ long getScanTimeout(TimeUnit unit); @@ -353,29 +318,6 @@ public interface AsyncTableBase { CompletableFuture> smallScan(Scan scan, int limit); /** - * Extracts certain cells from the given rows, in batch. - *

- * Notice that you may not get all the results with this function, which means some of the - * returned {@link CompletableFuture}s may succeed while some of the other returned - * {@link CompletableFuture}s may fail. - * @param gets The objects that specify what data to fetch and from which rows. - * @return A list of {@link CompletableFuture}s that represent the result for each get. - */ - default List> get(List gets) { - return batch(gets); - } - - /** - * A simple version for batch get. It will fail if there are any failures and you will get the - * whole result list at once if the operation is succeeded. - * @param gets The objects that specify what data to fetch and from which rows. - * @return A {@link CompletableFuture} that wrapper the result list. - */ - default CompletableFuture> getAll(List gets) { - return batchAll(gets); - } - - /** * Test for the existence of columns in the table, as specified by the Gets. *

* This will return a list of booleans. Each value will be true if the related Get matches one or @@ -397,8 +339,28 @@ public interface AsyncTableBase { * @return A {@link CompletableFuture} that wrapper the result boolean list. */ default CompletableFuture> existsAll(List gets) { - return getAll(toCheckExistenceOnly(gets)) - .thenApply(l -> l.stream().map(r -> r.getExists()).collect(toList())); + return allOf(exists(gets)); + } + + /** + * Extracts certain cells from the given rows, in batch. + *

+ * Notice that you may not get all the results with this function, which means some of the + * returned {@link CompletableFuture}s may succeed while some of the other returned + * {@link CompletableFuture}s may fail. + * @param gets The objects that specify what data to fetch and from which rows. + * @return A list of {@link CompletableFuture}s that represent the result for each get. + */ + List> get(List gets); + + /** + * A simple version for batch get. It will fail if there are any failures and you will get the + * whole result list at once if the operation is succeeded. + * @param gets The objects that specify what data to fetch and from which rows. + * @return A {@link CompletableFuture} that wrapper the result list. + */ + default CompletableFuture> getAll(List gets) { + return allOf(get(gets)); } /** @@ -406,9 +368,7 @@ public interface AsyncTableBase { * @param puts The list of mutations to apply. * @return A list of {@link CompletableFuture}s that represent the result for each put. */ - default List> put(List puts) { - return voidBatch(this, puts); - } + List> put(List puts); /** * A simple version of batch put. It will fail if there are any failures. @@ -416,7 +376,7 @@ public interface AsyncTableBase { * @return A {@link CompletableFuture} that always returns null when complete normally. */ default CompletableFuture putAll(List puts) { - return voidBatchAll(this, puts); + return allOf(put(puts)).thenApply(r -> null); } /** @@ -424,9 +384,7 @@ public interface AsyncTableBase { * @param deletes list of things to delete. * @return A list of {@link CompletableFuture}s that represent the result for each delete. */ - default List> delete(List deletes) { - return voidBatch(this, deletes); - } + List> delete(List deletes); /** * A simple version of batch delete. It will fail if there are any failures. @@ -434,7 +392,7 @@ public interface AsyncTableBase { * @return A {@link CompletableFuture} that always returns null when complete normally. */ default CompletableFuture deleteAll(List deletes) { - return voidBatchAll(this, deletes); + return allOf(delete(deletes)).thenApply(r -> null); } /** @@ -454,8 +412,6 @@ public interface AsyncTableBase { * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. */ default CompletableFuture> batchAll(List actions) { - List> futures = batch(actions); - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); + return allOf(batch(actions)); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java new file mode 100644 index 0000000..c166f7e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * For creating {@link AsyncTable} or {@link RawAsyncTable}. + *

+ * The implementation should have default configurations set before returning the builder to user. + * So users are free to only set the configs they care about to create a new + * AsyncTable/RawAsyncTable instance. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface AsyncTableBuilder { + + /** + * Set timeout for a whole operation such as get, put or delete. Notice that scan will not be + * effected by this value, see scanTimeoutNs. + *

+ * Operation timeout and max attempt times(or max retry times) are both limitations for retrying, + * we will stop retrying when we reach any of the limitations. + * @see #setMaxAttempts(int) + * @see #setMaxRetries(int) + * @see #setScanTimeout(long, TimeUnit) + */ + AsyncTableBuilder setOperationTimeout(long timeout, TimeUnit unit); + + /** + * As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is + * crash. The RS will always return something before the rpc timed out or scan timed out to tell + * the client that it is still alive. The scan timeout is used as operation timeout for every + * operation in a scan, such as openScanner or next. + * @see #setScanTimeout(long, TimeUnit) + */ + AsyncTableBuilder setScanTimeout(long timeout, TimeUnit unit); + + /** + * Set timeout for each rpc request. + *

+ * Notice that this will NOT change the rpc timeout for read(get, scan) read + * request and write request(put, delete). + */ + AsyncTableBuilder setRpcTimeout(long timeout, TimeUnit unit); + + /** + * Set timeout for each read rpc request. + */ + AsyncTableBuilder setReadRpcTimeout(long timeout, TimeUnit unit); + + /** + * Set timeout for each write rpc request. + */ + AsyncTableBuilder setWriteRpcTimeout(long timeout, TimeUnit unit); + + /** + * Set the base pause time for retrying. We use an exponential policy to generate sleep time when + * retrying. + */ + AsyncTableBuilder setRetryPause(long pause, TimeUnit unit); + + /** + * Set the max retry times for an operation. Usually it is the max attempt times minus 1. + *

+ * Operation timeout and max attempt times(or max retry times) are both limitations for retrying, + * we will stop retrying when we reach any of the limitations. + * @see #setMaxAttempts(int) + * @see #setOperationTimeout(long, TimeUnit) + */ + default AsyncTableBuilder setMaxRetries(int maxRetries) { + return setMaxAttempts(retries2Attempts(maxRetries)); + } + + /** + * Set the max attempt times for an operation. Usually it is the max retry times plus 1. Operation + * timeout and max attempt times(or max retry times) are both limitations for retrying, we will + * stop retrying when we reach any of the limitations. + * @see #setMaxRetries(int) + * @see #setOperationTimeout(long, TimeUnit) + */ + AsyncTableBuilder setMaxAttempts(int maxAttempts); + + /** + * Set the number of retries that are allowed before we start to log. + */ + AsyncTableBuilder setStartLogErrorsCnt(int startLogErrorsCnt); + + /** + * Create the {@link AsyncTable} or {@link RawAsyncTable} instance. + */ + T build(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java new file mode 100644 index 0000000..a4ddeee --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.*; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Base class for all asynchronous table builders. + */ +@InterfaceAudience.Private +abstract class AsyncTableBuilderBase implements AsyncTableBuilder { + + protected TableName tableName; + + protected long operationTimeoutNs; + + protected long scanTimeoutNs; + + protected long rpcTimeoutNs; + + protected long readRpcTimeoutNs; + + protected long writeRpcTimeoutNs; + + protected long pauseNs; + + protected int maxAttempts; + + protected int startLogErrorsCnt; + + AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) { + this.tableName = tableName; + this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs() + : connConf.getOperationTimeoutNs(); + this.scanTimeoutNs = connConf.getScanTimeoutNs(); + this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); + this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); + this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); + this.pauseNs = connConf.getPauseNs(); + this.maxAttempts = retries2Attempts(connConf.getMaxRetries()); + this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); + } + + @Override + public AsyncTableBuilderBase setOperationTimeout(long timeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(timeout); + return this; + } + + @Override + public AsyncTableBuilderBase setScanTimeout(long timeout, TimeUnit unit) { + this.scanTimeoutNs = unit.toNanos(timeout); + return this; + } + + @Override + public AsyncTableBuilderBase setRpcTimeout(long timeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(timeout); + return this; + } + + @Override + public AsyncTableBuilderBase setReadRpcTimeout(long timeout, TimeUnit unit) { + this.readRpcTimeoutNs = unit.toNanos(timeout); + return this; + } + + @Override + public AsyncTableBuilderBase setWriteRpcTimeout(long timeout, TimeUnit unit) { + this.writeRpcTimeoutNs = unit.toNanos(timeout); + return this; + } + + @Override + public AsyncTableBuilderBase setRetryPause(long pause, TimeUnit unit) { + this.pauseNs = unit.toNanos(pause); + return this; + } + + @Override + public AsyncTableBuilderBase setMaxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + @Override + public AsyncTableBuilderBase setStartLogErrorsCnt(int startLogErrorsCnt) { + this.startLogErrorsCnt = startLogErrorsCnt; + return this; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 7281185..4eda3e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import static java.util.stream.Collectors.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; @@ -42,8 +42,8 @@ class AsyncTableImpl implements AsyncTable { private final long defaultScannerMaxResultSize; - public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName, ExecutorService pool) { - this.rawTable = conn.getRawTable(tableName); + public AsyncTableImpl(AsyncConnectionImpl conn, RawAsyncTable rawTable, ExecutorService pool) { + this.rawTable = rawTable; this.pool = pool; this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); } @@ -59,8 +59,8 @@ class AsyncTableImpl implements AsyncTable { } @Override - public void setReadRpcTimeout(long timeout, TimeUnit unit) { - rawTable.setReadRpcTimeout(timeout, unit); + public long getRpcTimeout(TimeUnit unit) { + return rawTable.getRpcTimeout(unit); } @Override @@ -69,31 +69,16 @@ class AsyncTableImpl implements AsyncTable { } @Override - public void setWriteRpcTimeout(long timeout, TimeUnit unit) { - rawTable.setWriteRpcTimeout(timeout, unit); - } - - @Override public long getWriteRpcTimeout(TimeUnit unit) { return rawTable.getWriteRpcTimeout(unit); } @Override - public void setOperationTimeout(long timeout, TimeUnit unit) { - rawTable.setOperationTimeout(timeout, unit); - } - - @Override public long getOperationTimeout(TimeUnit unit) { return rawTable.getOperationTimeout(unit); } @Override - public void setScanTimeout(long timeout, TimeUnit unit) { - rawTable.setScanTimeout(timeout, unit); - } - - @Override public long getScanTimeout(TimeUnit unit) { return rawTable.getScanTimeout(unit); } @@ -194,7 +179,22 @@ class AsyncTableImpl implements AsyncTable { } @Override + public List> get(List gets) { + return rawTable.get(gets).stream().map(this::wrap).collect(toList()); + } + + @Override + public List> put(List puts) { + return rawTable.put(puts).stream().map(this::wrap).collect(toList()); + } + + @Override + public List> delete(List deletes) { + return rawTable.delete(deletes).stream().map(this::wrap).collect(toList()); + } + + @Override public List> batch(List actions) { - return rawTable. batch(actions).stream().map(this::wrap).collect(Collectors.toList()); + return rawTable. batch(actions).stream().map(this::wrap).collect(toList()); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 6f4a844..1abf3f2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -342,16 +342,6 @@ public final class ConnectionUtils { return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList()); } - static List> voidBatch(AsyncTableBase table, - List actions) { - return table. batch(actions).stream().map(f -> f. thenApply(r -> null)) - .collect(toList()); - } - - static CompletableFuture voidBatchAll(AsyncTableBase table, List actions) { - return table. batchAll(actions).thenApply(r -> null); - } - static RegionLocateType getLocateType(Scan scan) { if (scan.isReversed()) { if (isEmptyStartRow(scan.getStartRow())) { @@ -389,4 +379,9 @@ public final class ConnectionUtils { // the region. return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0; } + + static CompletableFuture> allOf(List> futures) { + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 347c85b..6bf2485 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import java.io.IOException; @@ -67,24 +68,35 @@ class RawAsyncTableImpl implements RawAsyncTable { private final long defaultScannerMaxResultSize; - private long readRpcTimeoutNs; + private final long rpcTimeoutNs; - private long writeRpcTimeoutNs; + private final long readRpcTimeoutNs; - private long operationTimeoutNs; + private final long writeRpcTimeoutNs; - private long scanTimeoutNs; + private final long operationTimeoutNs; - public RawAsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { + private final long scanTimeoutNs; + + private final long pauseNs; + + private final int maxAttempts; + + private final int startLogErrorsCnt; + + public RawAsyncTableImpl(AsyncConnectionImpl conn, AsyncTableBuilderBase builder) { this.conn = conn; - this.tableName = tableName; - this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs(); - this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs(); - this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs() - : conn.connConf.getOperationTimeoutNs(); + this.tableName = builder.tableName; + this.rpcTimeoutNs = builder.rpcTimeoutNs; + this.readRpcTimeoutNs = builder.readRpcTimeoutNs; + this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; + this.operationTimeoutNs = builder.operationTimeoutNs; + this.scanTimeoutNs = builder.scanTimeoutNs; + this.pauseNs = builder.pauseNs; + this.maxAttempts = builder.maxAttempts; + this.startLogErrorsCnt = builder.startLogErrorsCnt; this.defaultScannerCaching = conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); - this.scanTimeoutNs = conn.connConf.getScanTimeoutNs(); } @Override @@ -178,7 +190,9 @@ class RawAsyncTableImpl implements RawAsyncTable { private SingleRequestCallerBuilder newCaller(byte[] row, long rpcTimeoutNs) { return conn.callerFactory. single().table(tableName).row(row) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS); + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt); } private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { @@ -214,7 +228,7 @@ class RawAsyncTableImpl implements RawAsyncTable { @Override public CompletableFuture append(Append append) { checkHasFamilies(append); - return this. newCaller(append, writeRpcTimeoutNs) + return this. newCaller(append, rpcTimeoutNs) .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); @@ -223,7 +237,7 @@ class RawAsyncTableImpl implements RawAsyncTable { @Override public CompletableFuture increment(Increment increment) { checkHasFamilies(increment); - return this. newCaller(increment, writeRpcTimeoutNs) + return this. newCaller(increment, rpcTimeoutNs) .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); @@ -232,7 +246,7 @@ class RawAsyncTableImpl implements RawAsyncTable { @Override public CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, Put put) { - return this. newCaller(row, writeRpcTimeoutNs) + return this. newCaller(row, rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, put, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -244,7 +258,7 @@ class RawAsyncTableImpl implements RawAsyncTable { @Override public CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, Delete delete) { - return this. newCaller(row, writeRpcTimeoutNs) + return this. newCaller(row, rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, delete, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -303,20 +317,18 @@ class RawAsyncTableImpl implements RawAsyncTable { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); regionMutationBuilder.setAtomic(true); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - }, (resp) -> { - return null; - })).call(); + }, resp -> null)).call(); } @Override public CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, RowMutations mutation) { - return this. newCaller(mutation, writeRpcTimeoutNs) + return this. newCaller(mutation, rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm), - (resp) -> resp.getExists())) + resp -> resp.getExists())) .call(); } @@ -349,7 +361,8 @@ class RawAsyncTableImpl implements RawAsyncTable { } return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan)) .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) - .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); } public void scan(Scan scan, RawScanResultConsumer consumer) { @@ -362,55 +375,63 @@ class RawAsyncTableImpl implements RawAsyncTable { } } scan = setDefaultScanConfig(scan); - new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs) - .start(); + new AsyncClientScanner(scan, consumer, tableName, conn, pauseNs, maxAttempts, scanTimeoutNs, + readRpcTimeoutNs, startLogErrorsCnt).start(); } @Override - public void setReadRpcTimeout(long timeout, TimeUnit unit) { - this.readRpcTimeoutNs = unit.toNanos(timeout); + public List> get(List gets) { + return batch(gets, readRpcTimeoutNs); } @Override - public long getReadRpcTimeout(TimeUnit unit) { - return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); + public List> put(List puts) { + return voidMutate(puts); } - @Override - public void setWriteRpcTimeout(long timeout, TimeUnit unit) { - this.writeRpcTimeoutNs = unit.toNanos(timeout); + public List> delete(List deletes) { + return voidMutate(deletes); } @Override - public long getWriteRpcTimeout(TimeUnit unit) { - return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); + public List> batch(List actions) { + return batch(actions, rpcTimeoutNs); + } + + private List> voidMutate(List actions) { + return this. batch(actions, writeRpcTimeoutNs).stream() + .map(f -> f. thenApply(r -> null)).collect(toList()); + } + + private List> batch(List actions, long rpcTimeoutNs) { + return conn.callerFactory.batch().table(tableName).actions(actions) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); } @Override - public void setOperationTimeout(long timeout, TimeUnit unit) { - this.operationTimeoutNs = unit.toNanos(timeout); + public long getRpcTimeout(TimeUnit unit) { + return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); } @Override - public long getOperationTimeout(TimeUnit unit) { - return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); + public long getReadRpcTimeout(TimeUnit unit) { + return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); } @Override - public void setScanTimeout(long timeout, TimeUnit unit) { - this.scanTimeoutNs = unit.toNanos(timeout); + public long getWriteRpcTimeout(TimeUnit unit) { + return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); } @Override - public long getScanTimeout(TimeUnit unit) { - return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); + public long getOperationTimeout(TimeUnit unit) { + return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); } @Override - public List> batch(List actions) { - return conn.callerFactory.batch().table(tableName).actions(actions) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .readRpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS) - .writeRpcTimeout(writeRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + public long getScanTimeout(TimeUnit unit) { + return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); } } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 1eec691..609e9a5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -875,10 +875,7 @@ public final class HConstants { /** * timeout for each RPC - * @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY} - * instead. */ - @Deprecated public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index 4a391e0..86b7f05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -181,7 +181,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { return mockedLocator; } }) { - RawAsyncTable table = new RawAsyncTableImpl(mockedConn, TABLE_NAME); + RawAsyncTable table = mockedConn.getRawTable(TABLE_NAME); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); assertTrue(errorTriggered.get()); errorTriggered.set(false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java index 82fe3cd..70b0ecb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; -import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER; import static org.junit.Assert.assertEquals; @@ -33,6 +31,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -72,6 +71,8 @@ public class TestAsyncTableGetMultiThreaded { private static AsyncConnection CONN; + private static RawAsyncTable TABLE; + private static byte[][] SPLIT_KEYS; @BeforeClass @@ -79,14 +80,13 @@ public class TestAsyncTableGetMultiThreaded { setUp(HColumnDescriptor.MemoryCompaction.NONE); } - protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception { + protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) + throws Exception { TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); - TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L); - TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000); TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, - String.valueOf(memoryCompaction)); + String.valueOf(memoryCompaction)); TEST_UTIL.startMiniCluster(5); SPLIT_KEYS = new byte[8][]; @@ -96,10 +96,11 @@ public class TestAsyncTableGetMultiThreaded { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); - CONN.getRawTable(TABLE_NAME) - .putAll( - IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) - .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList())) + TABLE = CONN.createRawTable(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS) + .setMaxRetries(1000).build(); + TABLE.putAll( + IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList())) .get(); } @@ -112,11 +113,8 @@ public class TestAsyncTableGetMultiThreaded { private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { while (!stop.get()) { for (int i = 0; i < COUNT; i++) { - assertEquals(i, - Bytes.toInt( - CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))) - .get() - .getValue(FAMILY, QUALIFIER))); + assertEquals(i, Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(String.format("%03d", i)))) + .get().getValue(FAMILY, QUALIFIER))); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 270e3e1..9f3970b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Queue; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -122,10 +121,7 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { @Override protected List doScan(Scan scan) throws Exception { SimpleRawScanResultConsumer scanConsumer = new SimpleRawScanResultConsumer(); - RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); - table.setScanTimeout(1, TimeUnit.HOURS); - table.setReadRpcTimeout(1, TimeUnit.HOURS); - table.scan(scan, scanConsumer); + ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer); List results = new ArrayList<>(); for (Result result; (result = scanConsumer.take()) != null;) { results.add(result); -- 1.9.1